net/adapter/net/dataforts/blob/overflow.rs
1//! Active-overflow controller (v0.3 P2).
2//!
3//! Push-side complement of [`super::migration::BlobMigrationController`].
4//! Migration is *pull* (the local node decides to take an
5//! advertised hot blob); overflow is *push* (the local node
6//! decides to shed a cold blob and a remote node decides
7//! whether to accept). The two surfaces parallel each other —
8//! every reject reason on either side maps to a Prometheus
9//! counter label so operators can dashboard both directions.
10//!
11//! See [`DATAFORTS_BLOB_OVERFLOW_PLAN.md`] for the full design.
12//!
13//! # P2 scope
14//!
15//! Pure-logic controller + tick driver + hysteresis state
16//! machine. The actual wire push (`OverflowPush` RPC) lands in
17//! P3; this module abstracts that away behind the
18//! [`OverflowPushSink`] trait so the tick can be unit-tested
19//! against a recorder without spinning up a real mesh.
20//!
21//! [`DATAFORTS_BLOB_OVERFLOW_PLAN.md`]: ../../../../../docs/plans/DATAFORTS_BLOB_OVERFLOW_PLAN.md
22
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::time::Instant;
26
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29
30use super::admission::OverflowReject;
31use super::error::BlobError;
32use super::mesh::OverflowConfig;
33use super::refcount::BlobRefcountTable;
34use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
35use crate::adapter::net::behavior::{
36 is_blob_storage_unhealthy, BlobCapability, CapabilitySet, GravityCapability, TopologyScope,
37};
38use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
39
40/// Service-name token for the overflow-push nRPC channel.
41/// The sender constructs a request on
42/// `"{OVERFLOW_PUSH_SERVICE}.requests"` and listens on
43/// `"{OVERFLOW_PUSH_SERVICE}.replies.<origin>"`; the receiver
44/// registers a handler under the same service name via
45/// [`crate::adapter::net::MeshNode::serve_overflow_push`].
46///
47/// Held as a const so a typo on either side surfaces at
48/// compile time. The wire form is the literal string — no
49/// version suffix (per-tag versioning lives inside the wire
50/// payload, not the channel name).
51pub const OVERFLOW_PUSH_SERVICE: &str = "dataforts.blob.overflow_push";
52
53/// Wire request body for an overflow push. The sender encodes
54/// this via postcard + drops it into the nRPC payload; the
55/// receiver decodes, runs [`super::admission::should_accept_overflow_from`],
56/// and on Admit opens the chunk channel against the local
57/// adapter so the existing replication runtime can pull the
58/// bytes.
59///
60/// The chunk bytes themselves do NOT ride this request — the
61/// nRPC envelope carries the *nudge*, not the chunk payload.
62/// `size_bytes` is the resolved chunk size so the receive-side
63/// disk-gate can fire without round-tripping a `stat` call.
64///
65/// Wire layout: postcard's default `(field_order)` encoding.
66/// The field order is locked here for forward compatibility;
67/// adding new fields requires a versioned variant (the trait-
68/// object polymorphism on the postcard side is rigid). A
69/// future v2 would land as a separate type registered under
70/// a new service-name token, with v1 receivers ignoring the
71/// new channel.
72#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
73pub struct OverflowPush {
74 /// 32-byte BLAKE3 hash of the chunk to push.
75 pub blob_hash: [u8; 32],
76 /// Wire size of the chunk in bytes. Drives the receive-
77 /// side disk-gate.
78 pub size_bytes: u64,
79 /// Sender's canonical `node_id`. The receiver synthesizes
80 /// the sender's `CapabilitySet` from its local capability
81 /// fold keyed on this id; the admission check reads
82 /// `overflow_enabled` + scope tags from the synthesized
83 /// snapshot, not from the request body. Defends against a
84 /// sender forging its caps via the request — the only
85 /// authority is the verified fold state.
86 pub sender_node_id: u64,
87}
88
89/// Wire response body. Sender-side observes the result and
90/// either records the admission outcome (`Accepted`) or
91/// dispatches the typed reject reason to the per-reason
92/// counter family. The chunk-channel open on the receive
93/// side happens *during* `Accepted` — by the time the sender
94/// observes `Accepted`, the receiver has either successfully
95/// opened the channel or returned a typed error variant.
96#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
97pub enum OverflowPushAck {
98 /// Receiver ran admission, returned Admit, and the
99 /// chunk channel open returned Ok. The bytes are now in
100 /// flight via the existing replication runtime; the
101 /// durability watermark observation (post-tick) is the
102 /// sender's signal to drop the local copy.
103 Accepted,
104 /// Receiver ran admission, returned Reject. Carries the
105 /// typed reason so the sender can break out per-reason
106 /// counters + decide whether to retry against the same
107 /// peer (e.g. `InsufficientDisk` won't change quickly; a
108 /// different target is the right move) or pick a new one.
109 Rejected(OverflowReject),
110 /// Receiver ran admission, returned Admit, but the
111 /// chunk channel open itself failed (the replication
112 /// runtime couldn't spawn, a transient disk error, etc.).
113 /// Wire-distinct from `Rejected` because the failure
114 /// mode is "we wanted to take it, our local plumbing
115 /// broke" rather than "we won't take it." Operators
116 /// alarm on `OpenChunkFailed` more aggressively.
117 OpenChunkFailed,
118}
119
120/// Output of [`BlobOverflowController::candidate_batch`]: the
121/// list of candidates to push this tick, plus the precise
122/// count of hashes that were attempted for target selection
123/// but found no eligible peer. The pair lets the tick driver
124/// report `rejected_no_target` accurately — distinguishing
125/// "we tried and no peer qualified" from "we hit the per-tick
126/// push cap and never tried the rest."
127#[derive(Clone, Debug, Default)]
128pub struct OverflowCandidateBatch {
129 /// Candidates with a selected target peer, truncated to
130 /// `config.max_pushes_per_tick`.
131 pub candidates: Vec<BlobOverflowCandidate>,
132 /// Number of hashes the controller attempted target
133 /// selection for and got `None` back. Bounded above by
134 /// `config.max_pushes_per_tick` (the loop breaks once
135 /// `candidates.len()` reaches the cap, so further
136 /// hashes are never tried). The tick driver routes
137 /// this directly to `rejected_no_target` without
138 /// double-counting truncated hashes.
139 pub no_target_count: usize,
140}
141
142/// One overflow-push candidate the controller is considering
143/// for this tick. The push controller already selected a
144/// target peer for `hash`; the tick driver routes the actual
145/// push call through [`OverflowPushSink::push`].
146///
147/// Equivalent shape to [`super::migration::BlobMigrationCandidate`]
148/// with the direction reversed — `target_node_id` is the
149/// receive-side, not the publisher.
150#[derive(Clone, Debug)]
151pub struct BlobOverflowCandidate {
152 /// 32-byte chunk hash to push.
153 pub hash: [u8; 32],
154 /// Wire size of the chunk in bytes. Drives the receiver's
155 /// `disk_free_gb` admission gate; the sender supplies it
156 /// here so the receiver doesn't have to round-trip a
157 /// `stat` call first.
158 pub size_bytes: u64,
159 /// node_id of the selected receive-side peer.
160 pub target_node_id: u64,
161 /// Snapshot of the target's capability set at selection
162 /// time. The receive-side admission decision will re-read
163 /// the index fresh; this snapshot is for the sender's
164 /// dashboards / debug logs.
165 pub target_caps: CapabilitySet,
166 /// Decayed heat rate of `hash` at controller-tick time.
167 /// Coldest candidates come first when the tick truncates
168 /// to `max_pushes_per_tick`. `0.0` for hashes that haven't
169 /// been read since their last full decay window — these
170 /// are the prime overflow targets.
171 pub cold_rate: f64,
172}
173
174/// Per-tick report. Each field maps to a Prometheus counter
175/// label (`dataforts_blob_overflow_*`) so operators can
176/// dashboard the loop without hand-coding per-reason metrics.
177/// Pre-tick state lives in [`step_overflow_hysteresis`]; this
178/// report captures *only* the actions this tick took.
179#[derive(Clone, Debug, Default, PartialEq)]
180pub struct BlobOverflowTickReport {
181 /// Candidates that passed the controller's filters AND
182 /// the sink's push call returned `Ok`. Counts pushes
183 /// the wire-side accepted; durability watermark observation
184 /// is the next layer (P3).
185 pub admitted: u64,
186 /// Candidates the controller computed but no overflow-
187 /// enabled peer was reachable for. Bumps once per cold
188 /// hash that found no target — the same hash on the next
189 /// tick may find one as caps propagate.
190 pub rejected_no_target: u64,
191 /// Sink returned an error. Includes the receive-side
192 /// admission rejections that P3 maps to typed
193 /// `OverflowReject` variants, plus RPC transport errors,
194 /// plus the local-side chunk-open errors. Operators
195 /// disambiguate via the underlying wire counters; this
196 /// counter is the aggregated send-side view.
197 pub push_errors: u64,
198 /// The hysteresis state at the start of the tick. `true`
199 /// = the controller was already firing on the prior tick.
200 /// Useful for telemetry: the difference between "ticks
201 /// where overflow was firing the whole time" and "ticks
202 /// where the state just transitioned high" is operator-
203 /// meaningful (the latter signals a workload spike).
204 pub was_active_at_start: bool,
205 /// The hysteresis state at the end of the tick. The pair
206 /// `(was_active_at_start, is_active_at_end)` documents
207 /// the state machine transition this tick took.
208 pub is_active_at_end: bool,
209 /// Local disk-usage ratio at the start of the tick.
210 /// `disk_used_bytes / disk_total_bytes`. Surfaced for
211 /// operator dashboards.
212 pub disk_ratio_at_start: f64,
213 /// Local disk-usage ratio at the end of the tick. Equal
214 /// to `disk_ratio_at_start` in P2 (the actual freed-bytes
215 /// accounting needs the durability watermark observation
216 /// from P3 + a re-poll of disk stats). Reserved for P3.
217 pub disk_ratio_at_end: f64,
218 /// Total bytes pushed this tick. Sum of `size_bytes` of
219 /// every admitted candidate. P2 reports the *pushed*
220 /// volume; *reclaimed* volume waits for P3's durability
221 /// observation.
222 pub pushed_bytes: u64,
223}
224
225/// Pure-logic hysteresis state machine. Given the prior
226/// `active` state + the current `disk_ratio` + the two
227/// thresholds, return whether this tick should fire pushes
228/// and update `active` in place. Mirrors the existing
229/// [`super::metrics::evaluate_health_gate`] discipline
230/// (which uses identical 95 % / 85 % shape but for the
231/// health-gate tag).
232///
233/// Hysteresis rule:
234///
235/// - `disk_ratio >= high_water` → active = `true`.
236/// - `disk_ratio <= low_water` → active = `false`.
237/// - `low_water < disk_ratio < high_water` → active = prior
238/// value (stay where we were; the hysteresis band).
239///
240/// `low_water >= high_water` degenerates to "fire whenever
241/// disk_ratio >= high_water, clear whenever disk_ratio <= low_water"
242/// — operator misconfiguration but not unsafe; the active
243/// state just doesn't get the hysteresis benefit.
244///
245/// Returns the post-tick `active` state. The function reads
246/// from + writes to `active` under `Relaxed` ordering — the
247/// caller is the single tick driver, so no cross-thread
248/// ordering is needed; the atomic is for visibility across
249/// adapter clones (operator dashboard reads from one clone,
250/// tick fires on another).
251pub fn step_overflow_hysteresis(
252 active: &AtomicBool,
253 disk_ratio: f64,
254 high_water: f64,
255 low_water: f64,
256) -> bool {
257 let was_active = active.load(Ordering::Relaxed);
258 let now_active = if disk_ratio >= high_water {
259 true
260 } else if disk_ratio <= low_water {
261 false
262 } else {
263 was_active
264 };
265 if now_active != was_active {
266 active.store(now_active, Ordering::Relaxed);
267 }
268 now_active
269}
270
271/// Receive-side handler for the overflow nRPC. Implements
272/// [`cortex::RpcHandler`] so it slots into [`MeshNode::serve_rpc`]
273/// (under the [`OVERFLOW_PUSH_SERVICE`] service name). On each
274/// incoming request:
275///
276/// 1. Decode the postcard-encoded [`OverflowPush`].
277/// 2. Look up `sender_caps` in `capability_index` keyed on
278/// `request.sender_node_id`.
279/// 3. Run [`super::admission::should_accept_overflow_from`]
280/// against the live `local_caps` snapshot + the sender's
281/// caps + the chunk size.
282/// 4. On Admit: build a [`super::blob_ref::BlobRef::small`]
283/// from `(blob_hash, size_bytes)` and call
284/// [`super::adapter::BlobAdapter::prefetch`] — this opens
285/// the chunk channel with replication armed and the
286/// existing per-chunk replication runtime pulls the
287/// bytes from whoever advertises `causal:<hash>`
288/// (typically the sender). Returns
289/// [`OverflowPushAck::Accepted`] on success,
290/// [`OverflowPushAck::OpenChunkFailed`] on local-plumbing
291/// error.
292/// 5. On Reject: wrap the typed [`OverflowReject`] in
293/// [`OverflowPushAck::Rejected`] and return.
294///
295/// The handler holds `Arc<MeshNode>` so it reads live local
296/// caps + the capability index at each call rather than a
297/// build-time snapshot. Toggling `overflow_enabled` on the
298/// adapter is observable immediately on the next inbound
299/// push.
300///
301/// [`cortex::RpcHandler`]: crate::adapter::net::cortex::RpcHandler
302/// [`MeshNode::serve_rpc`]: crate::adapter::net::MeshNode::serve_rpc
303#[cfg(feature = "cortex")]
304pub struct OverflowPushHandler {
305 /// Reference to the local mesh node. Used for the
306 /// capability-index lookup + the local-caps snapshot.
307 /// Holds an `Arc` rather than a borrow because the
308 /// handler is registered into the nRPC fold which owns
309 /// it via `Arc<dyn RpcHandler>` — the handler outlives
310 /// any single tick.
311 pub mesh: Arc<crate::adapter::net::MeshNode>,
312 /// The local blob adapter. The handler calls
313 /// `adapter.prefetch(BlobRef)` on Admit to open the
314 /// chunk channel. Held by `Arc` for the same reason as
315 /// `mesh`; cheap to clone (the adapter is `Arc`-internal
316 /// throughout).
317 pub adapter: Arc<super::mesh::MeshBlobAdapter>,
318}
319
320#[cfg(feature = "cortex")]
321impl OverflowPushHandler {
322 /// Construct a handler. Operators wire this into the
323 /// receiver-side via
324 /// [`crate::adapter::net::MeshNode::serve_overflow_push`].
325 pub fn new(
326 mesh: Arc<crate::adapter::net::MeshNode>,
327 adapter: Arc<super::mesh::MeshBlobAdapter>,
328 ) -> Self {
329 Self { mesh, adapter }
330 }
331
332 /// Pure typed handler logic. Decoded request goes in,
333 /// typed ack comes out. Separate from the
334 /// [`crate::adapter::net::cortex::RpcHandler`] impl so
335 /// tests can drive the admission path without
336 /// constructing an [`crate::adapter::net::cortex::RpcContext`].
337 ///
338 /// Reads live `user_caps_snapshot` + capability-fold
339 /// state on each call, so an operator toggling
340 /// `overflow_enabled` on the local node is observed by
341 /// the next inbound push.
342 pub async fn handle(&self, request: OverflowPush) -> OverflowPushAck {
343 use super::adapter::BlobAdapter;
344 use super::admission::{should_accept_overflow_from, OverflowVerdict};
345 use super::blob_ref::BlobRef;
346
347 // Synthesize sender caps from the capability fold.
348 // Absent → use the empty default (which has
349 // `overflow_enabled = false`); the admission gate
350 // will then return `SenderNotOverflowing`.
351 let sender_caps =
352 super::super::super::behavior::fold::capability_bridge::synthesize_capability_set(
353 self.mesh.capability_fold(),
354 request.sender_node_id,
355 );
356
357 // Snapshot local caps fresh per request so a
358 // concurrent `set_overflow_enabled(false)` is
359 // observed immediately.
360 let local_caps = self.mesh.user_caps_snapshot();
361
362 let verdict = should_accept_overflow_from(&local_caps, &sender_caps, request.size_bytes);
363 match verdict {
364 OverflowVerdict::Reject(reason) => {
365 // Bump the per-reason rejection counter on
366 // the receive side. The sender's controller
367 // bumps `push_errors_total` separately;
368 // dashboards aggregate both surfaces.
369 self.adapter.record_overflow_reject(reason);
370 OverflowPushAck::Rejected(reason)
371 }
372 OverflowVerdict::Admit => {
373 // Build the BlobRef::Small the prefetch path
374 // wants. The URI is `mesh://<hex>` — opaque
375 // to the adapter (content-hash is the
376 // authoritative address) but the convention
377 // matches existing migration code.
378 let mut hex = String::with_capacity(64);
379 for b in request.blob_hash {
380 use std::fmt::Write;
381 let _ = write!(&mut hex, "{:02x}", b);
382 }
383 let blob_ref = BlobRef::small(
384 format!("mesh://{}", hex),
385 request.blob_hash,
386 request.size_bytes,
387 );
388 match self.adapter.prefetch(&blob_ref).await {
389 Ok(()) => OverflowPushAck::Accepted,
390 Err(e) => {
391 tracing::warn!(
392 error = %e,
393 hash = %hex,
394 sender = request.sender_node_id,
395 "overflow push: prefetch failed after admit",
396 );
397 OverflowPushAck::OpenChunkFailed
398 }
399 }
400 }
401 }
402 }
403}
404
405#[cfg(feature = "cortex")]
406#[async_trait]
407impl crate::adapter::net::cortex::RpcHandler for OverflowPushHandler {
408 async fn call(
409 &self,
410 ctx: crate::adapter::net::cortex::RpcContext,
411 ) -> Result<
412 crate::adapter::net::cortex::RpcResponsePayload,
413 crate::adapter::net::cortex::RpcHandlerError,
414 > {
415 use crate::adapter::net::cortex::{RpcHandlerError, RpcResponsePayload, RpcStatus};
416
417 // Decode the request body. Malformed bytes surface
418 // as a typed Internal error — the caller sees
419 // `RpcStatus::Internal` with a short diagnostic,
420 // distinct from `Application(code)` which we use for
421 // typed admission rejections.
422 let request: OverflowPush = postcard::from_bytes(&ctx.payload.body)
423 .map_err(|e| RpcHandlerError::Internal(format!("overflow push: decode failed: {e}")))?;
424
425 let ack = self.handle(request).await;
426
427 // Encode the ack into the response body. Encoding
428 // failure is an internal bug (postcard for our typed
429 // enum is total); surface as Internal.
430 let body = postcard::to_allocvec(&ack).map_err(|e| {
431 RpcHandlerError::Internal(format!("overflow push: encode ack failed: {e}"))
432 })?;
433 Ok(RpcResponsePayload {
434 status: RpcStatus::Ok,
435 headers: Vec::new(),
436 body: bytes::Bytes::from(body),
437 })
438 }
439}
440
441/// Concrete [`OverflowPushSink`] implementation backed by a
442/// [`MeshNode`]. Wraps the sender-side nRPC call: each
443/// `push` invocation encodes the request, dispatches via
444/// [`MeshNode::call`] under the [`OVERFLOW_PUSH_SERVICE`]
445/// service name, decodes the typed [`OverflowPushAck`], and
446/// maps the outcome to the [`OverflowPushSink::push`]
447/// `Result` shape the controller expects.
448///
449/// Construct once per operator scheduler (the sink is cheap
450/// to clone — holds an `Arc<MeshNode>`). Pass to
451/// [`drive_blob_overflow_tick`] as `&dyn OverflowPushSink`.
452///
453/// [`MeshNode`]: crate::adapter::net::MeshNode
454/// [`MeshNode::call`]: crate::adapter::net::MeshNode::call
455#[cfg(feature = "cortex")]
456pub struct MeshNodeOverflowPushSink {
457 /// Reference to the local mesh. `Arc<MeshNode>` because
458 /// `MeshNode::call` is defined on `&Arc<Self>` — the
459 /// nRPC path needs the Arc to register the per-call
460 /// reply-channel subscription.
461 pub mesh: Arc<crate::adapter::net::MeshNode>,
462}
463
464#[cfg(feature = "cortex")]
465impl MeshNodeOverflowPushSink {
466 /// Wrap an existing mesh node as an overflow-push sink.
467 /// `Arc::clone` is cheap; one sink per operator scheduler
468 /// is the typical shape.
469 pub fn new(mesh: Arc<crate::adapter::net::MeshNode>) -> Self {
470 Self { mesh }
471 }
472}
473
474#[cfg(feature = "cortex")]
475#[async_trait]
476impl OverflowPushSink for MeshNodeOverflowPushSink {
477 async fn push(
478 &self,
479 hash: [u8; 32],
480 size_bytes: u64,
481 target_node_id: u64,
482 ) -> Result<(), BlobError> {
483 // Map an `OverflowPushAck::Rejected(reason)` /
484 // `OverflowPushAck::OpenChunkFailed` to a typed
485 // BlobError so the controller's `push_errors` counter
486 // gets bumped uniformly. `Accepted` returns Ok.
487 let ack = self
488 .mesh
489 .send_overflow_push(target_node_id, hash, size_bytes)
490 .await?;
491 match ack {
492 OverflowPushAck::Accepted => Ok(()),
493 OverflowPushAck::Rejected(reason) => Err(BlobError::Backend(format!(
494 "overflow push to {target_node_id:#x} rejected: {reason:?}"
495 ))),
496 OverflowPushAck::OpenChunkFailed => Err(BlobError::Backend(format!(
497 "overflow push to {target_node_id:#x} admitted but chunk open failed"
498 ))),
499 }
500 }
501}
502
503/// Sink trait for the actual push action. P3 wires the
504/// [`MeshNode`]-backed implementation that sends an
505/// `OverflowPush` RPC and waits for the durability
506/// watermark; P2 ships the trait + a recorder mock for
507/// unit tests.
508///
509/// `push` is fire-once-per-tick per `(hash, target_node_id)`
510/// pair — the controller dedups by hash before calling the
511/// sink. Idempotent on the receive side anyway (an
512/// already-stored chunk is a no-op store).
513///
514/// [`MeshNode`]: crate::adapter::net::MeshNode
515#[async_trait]
516pub trait OverflowPushSink: Send + Sync {
517 /// Push `hash` (`size_bytes`) to the receive-side peer
518 /// identified by `target_node_id`. Returns `Ok(())` when
519 /// the wire-side acknowledgement landed; `Err(BlobError)`
520 /// when the send failed for any reason (RPC transport
521 /// error, receive-side admission rejection, chunk-open
522 /// failure). The tick driver aggregates errors into the
523 /// `push_errors` counter without disambiguating — wire-
524 /// level counters break out per reason.
525 async fn push(
526 &self,
527 hash: [u8; 32],
528 size_bytes: u64,
529 target_node_id: u64,
530 ) -> Result<(), BlobError>;
531}
532
533/// Active-overflow controller. Borrows the inputs it needs
534/// (local caps, capability index, heat registry, refcount
535/// table, config); the controller itself is stateless. The
536/// hysteresis state lives in the caller as an `AtomicBool`
537/// passed into [`drive_blob_overflow_tick`].
538///
539/// Lifetimes:
540/// - `'a` — the controller's borrows. Typically the operator
541/// constructs the controller per tick inside the scheduler
542/// loop; the borrows are valid for the lifetime of the
543/// tick await.
544pub struct BlobOverflowController<'a> {
545 /// Local node's capability set. Read for the local
546 /// gravity scope (target-selection scope filter) and
547 /// for the overflow-enabled self-check inside
548 /// [`drive_blob_overflow_tick`] (skip the tick until the
549 /// local `dataforts.blob.overflow` tag is visible on the
550 /// snapshot — otherwise every push would round-trip an
551 /// RPC and come back `Rejected(SenderNotOverflowing)`
552 /// while the announce propagates).
553 pub local_caps: &'a CapabilitySet,
554 /// Fold of peer capability sets. The controller walks
555 /// every overflow-enabled peer to score target selection.
556 /// Migrated off the legacy CapabilityIndex per Phase 3b.
557 pub capability_fold: &'a Fold<CapabilityFold>,
558 /// Per-chunk heat registry. The controller walks every
559 /// tracked hash, decays each rate to `now`, and ranks
560 /// candidates coldest-first.
561 pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
562 /// Per-hash refcount + pin table. Candidates are
563 /// filtered against this: only `refcount == 0 &&
564 /// !pinned` hashes are eligible for push in P2. The
565 /// richer "all-references-are-cache" rule (which would
566 /// allow shedding a chunk still held by a greedy cache
567 /// entry) lands when per-source refcount inspection is
568 /// added to [`BlobRefcountTable`].
569 pub refcount: &'a BlobRefcountTable,
570 /// Operator-tunable knobs. Read for `scope`,
571 /// `max_pushes_per_tick`, and the high/low water
572 /// thresholds (consumed by the hysteresis state machine).
573 pub config: &'a OverflowConfig,
574}
575
576impl<'a> BlobOverflowController<'a> {
577 /// Construct a controller from borrows. `new` is sugar
578 /// for the struct literal — operators that prefer the
579 /// builder shape can call this; tests usually use the
580 /// literal for clarity.
581 pub fn new(
582 local_caps: &'a CapabilitySet,
583 capability_fold: &'a Fold<CapabilityFold>,
584 heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
585 refcount: &'a BlobRefcountTable,
586 config: &'a OverflowConfig,
587 ) -> Self {
588 Self {
589 local_caps,
590 capability_fold,
591 heat_registry,
592 refcount,
593 config,
594 }
595 }
596
597 /// Compute every candidate for this tick — coldest first,
598 /// truncated to `config.max_pushes_per_tick`. Convenience
599 /// wrapper around [`Self::candidate_batch`] that drops the
600 /// `no_target_count` companion when the caller only wants
601 /// the push list.
602 pub fn candidates(
603 &self,
604 now: Instant,
605 size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
606 ) -> Vec<BlobOverflowCandidate> {
607 self.candidate_batch(now, size_for_hash).candidates
608 }
609
610 /// Compute candidates + the precise `no_target` accounting
611 /// for this tick. `size_for_hash` is an operator-supplied
612 /// resolver (the controller doesn't know chunk sizes
613 /// directly; `MeshBlobAdapter::stat_chunk` or an equivalent
614 /// answers this).
615 ///
616 /// The function:
617 ///
618 /// 1. Snapshots `(hash, decayed_rate)` from the heat
619 /// registry under a brief read lock.
620 /// 2. Filters out pinned hashes + hashes with nonzero
621 /// refcount + hashes whose `size_for_hash` returns
622 /// `None` (controller can't run the disk-gate without
623 /// a size; abstain rather than guess).
624 /// 3. Sorts ascending by `(decayed_rate, hash)` — ties
625 /// broken by hash bytes for determinism.
626 /// 4. For each candidate hash (in cold-first order),
627 /// walks the capability index for an overflow-
628 /// enabled peer with `disk_free_gb >= ceil(size /
629 /// 1 GiB)` matching the local gravity scope; picks
630 /// the peer with the highest `disk_free_gb` (ties
631 /// broken by lowest `node_id`).
632 /// 5. Counts a hash as `no_target` only when target
633 /// selection was actually *attempted* and failed —
634 /// hashes past the `max_pushes_per_tick` truncation
635 /// point were never tried and are NOT no-target.
636 /// 6. Stops walking once
637 /// `candidates.len() >= config.max_pushes_per_tick`.
638 pub fn candidate_batch(
639 &self,
640 now: Instant,
641 size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
642 ) -> OverflowCandidateBatch {
643 // Step 1: snapshot heat-registry entries.
644 let snap: Vec<([u8; 32], f64)> = {
645 let guard = self.heat_registry.lock();
646 guard
647 .iter()
648 .map(|(h, c)| {
649 // Compute decayed rate without mutating
650 // the counter — keeps the iteration
651 // read-only so a concurrent fetch path
652 // bumping a different hash isn't
653 // blocked.
654 let elapsed = now.saturating_duration_since(c.last_update());
655 let half_life_s = c.half_life().as_secs_f64();
656 let rate = if half_life_s == 0.0 || c.rate() == 0.0 {
657 c.rate()
658 } else {
659 let half_lives = elapsed.as_secs_f64() / half_life_s;
660 if half_lives > 64.0 {
661 0.0
662 } else {
663 c.rate() * 0.5_f64.powf(half_lives)
664 }
665 };
666 (*h, rate)
667 })
668 .collect()
669 };
670
671 // Step 2: filter on pin / refcount / size. A hash
672 // with `refcount > 0` is not pushable in P2 — the
673 // per-source refcount split (cache vs fold) is a
674 // future refinement.
675 let mut filtered: Vec<([u8; 32], f64, u64)> = snap
676 .into_iter()
677 .filter_map(|(h, rate)| {
678 let entry = self.refcount.get(&h)?;
679 if entry.pinned {
680 return None;
681 }
682 if entry.refcount > 0 {
683 return None;
684 }
685 let size = size_for_hash(h)?;
686 Some((h, rate, size))
687 })
688 .collect();
689
690 // Step 3: stable sort coldest-first. Ties broken by
691 // hash bytes for determinism. NaN is impossible
692 // here (decayed rate is always finite + non-negative
693 // when input is finite, and the heat-counter ensures
694 // finite input), so `partial_cmp` is safe.
695 filtered.sort_by(|a, b| {
696 a.1.partial_cmp(&b.1)
697 .unwrap_or(std::cmp::Ordering::Equal)
698 .then_with(|| a.0.cmp(&b.0))
699 });
700
701 // Step 4-5: target selection per hash. `no_target`
702 // counts only hashes we ACTUALLY tried — the loop
703 // breaks at `max_pushes_per_tick` so the tail of
704 // `filtered` is never attempted and must not bump
705 // the counter.
706 let local_gravity = GravityCapability::from_capability_set(self.local_caps);
707 let mut candidates: Vec<BlobOverflowCandidate> = Vec::new();
708 let mut no_target_count: usize = 0;
709 for (hash, cold_rate, size_bytes) in filtered {
710 match self.pick_target(size_bytes, local_gravity.scope) {
711 Some((target_node_id, target_caps)) => {
712 candidates.push(BlobOverflowCandidate {
713 hash,
714 size_bytes,
715 target_node_id,
716 target_caps,
717 cold_rate,
718 });
719 }
720 None => {
721 no_target_count += 1;
722 }
723 }
724 if candidates.len() >= self.config.max_pushes_per_tick {
725 break;
726 }
727 }
728 OverflowCandidateBatch {
729 candidates,
730 no_target_count,
731 }
732 }
733
734 /// Find the best overflow-receiver peer for a chunk of
735 /// `size_bytes`. Filter rule:
736 ///
737 /// - `cap.blob.storage = true`
738 /// - `cap.blob.overflow_enabled = true`
739 /// - `cap.blob.disk_free_gb >= ceil(size / 1 GiB)`
740 /// - peer's `cap.gravity.scope` covers `local_scope`
741 /// (the local node won't push outside its own scope
742 /// bound)
743 /// - peer is not `dataforts:blob-storage-unhealthy`
744 ///
745 /// Ranking: highest `disk_free_gb` wins (greedy spread
746 /// across peers); ties broken by lowest `node_id` for
747 /// determinism.
748 fn pick_target(
749 &self,
750 size_bytes: u64,
751 local_scope: TopologyScope,
752 ) -> Option<(u64, CapabilitySet)> {
753 let required_gb = size_bytes.div_ceil(1 << 30);
754 let mut best: Option<(u64, u64, CapabilitySet)> = None; // (disk_free_gb, node_id, caps)
755 let publishers: Vec<u64> = self
756 .capability_fold
757 .with_state(|state| state.by_node.keys().copied().collect());
758 for node_id in publishers {
759 // Synthesize a CapabilitySet from the fold's tag set
760 // for `node_id`. The downstream BlobCapability /
761 // GravityCapability projections read tags via
762 // `Tag::AxisPresent` / `Tag::AxisValue` patterns that
763 // round-trip through CapabilitySet::add_tag, so
764 // tag-based reads (storage / overflow / scope /
765 // disk_total_gb / disk_free_gb) work identically.
766 let caps = capability_bridge::synthesize_capability_set(self.capability_fold, node_id);
767 let peer_blob = BlobCapability::from_capability_set(&caps);
768 if !peer_blob.storage || !peer_blob.overflow_enabled {
769 continue;
770 }
771 if peer_blob.disk_free_gb < required_gb {
772 continue;
773 }
774 if is_blob_storage_unhealthy(&caps) {
775 continue;
776 }
777 // Local-scope-covers-peer-scope check. We're
778 // pushing OUT, so the local node's scope bound
779 // is the gate — peers outside our scope can't
780 // receive our overflow.
781 let peer_gravity = GravityCapability::from_capability_set(&caps);
782 if !scope_covers(local_scope, peer_gravity.scope) {
783 continue;
784 }
785 // Update best by disk_free_gb desc, then
786 // node_id asc.
787 match &best {
788 None => best = Some((peer_blob.disk_free_gb, node_id, caps)),
789 Some((d, n, _)) => {
790 let is_better = peer_blob.disk_free_gb > *d
791 || (peer_blob.disk_free_gb == *d && node_id < *n);
792 if is_better {
793 best = Some((peer_blob.disk_free_gb, node_id, caps));
794 }
795 }
796 }
797 }
798 best.map(|(_, node_id, caps)| (node_id, caps))
799 }
800}
801
802/// Operator-supplied environmental borrows the
803/// [`super::mesh::MeshBlobAdapter::drive_overflow_tick`]
804/// convenience method threads through. Decouples the
805/// per-tick wiring (capability index, heat registry, sink,
806/// local caps, disk stats) from the adapter so the adapter
807/// stays a stateless slot-in.
808///
809/// All fields are borrows. The lifetime parameter `'a` ties
810/// the context to a single tick's await; operators
811/// reconstruct the context each tick from the live state.
812pub struct OverflowTickContext<'a> {
813 /// The mesh's capability fold — read for target peer
814 /// selection (overflow tag + scope + disk_free + health
815 /// gate). Migrated off the legacy CapabilityIndex per
816 /// Phase 3b.
817 pub capability_fold: &'a Fold<CapabilityFold>,
818 /// Per-chunk heat registry. The controller walks every
819 /// tracked hash, decays each rate to `now`, and ranks
820 /// candidates coldest-first.
821 pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
822 /// Sink for the actual push action. Production wiring
823 /// uses [`MeshNodeOverflowPushSink`]; tests use a
824 /// recorder.
825 pub sink: &'a dyn OverflowPushSink,
826 /// Local caps snapshot — read for the local gravity
827 /// scope (target-selection scope filter).
828 pub local_caps: &'a CapabilitySet,
829 /// Local disk usage in bytes. Numerator of the
830 /// `disk_ratio` hysteresis input.
831 pub disk_used_bytes: u64,
832 /// Local disk total in bytes. Denominator. `0`
833 /// short-circuits the tick.
834 pub disk_total_bytes: u64,
835}
836
837/// Per-tick observables threaded through
838/// [`drive_blob_overflow_tick`]. Bundles the inputs that
839/// change every tick (disk stats + hysteresis handle + the
840/// clock value) so the tick driver stays a 4-arg signature
841/// even as the inputs grow.
842///
843/// Borrow-only: nothing here is owned. The hysteresis atomic
844/// is shared with the adapter's `overflow_active` field
845/// (P4); operator-driven tests can wire a fresh
846/// `AtomicBool` for isolation. `now` is captured at the
847/// tick call site so deterministic-simulation harnesses can
848/// inject a fixed `Instant` without mocking the system clock.
849pub struct OverflowTickObservation<'a> {
850 /// Local disk usage in bytes — the numerator of the
851 /// `disk_ratio` hysteresis input. `disk_used > disk_total`
852 /// is clamped inside the driver (defense against
853 /// misconfiguration).
854 pub disk_used_bytes: u64,
855 /// Local disk total in bytes — the denominator. `0`
856 /// short-circuits the tick to "never fire" (an
857 /// unconfigured disk cap shouldn't trigger pushes the
858 /// moment any chunk lands).
859 pub disk_total_bytes: u64,
860 /// Shared hysteresis state. Read at tick start, updated
861 /// by [`step_overflow_hysteresis`] to the post-tick
862 /// state. Wired to [`super::mesh::MeshBlobAdapter`]'s
863 /// `overflow_active` field in the production path.
864 pub hysteresis_active: &'a AtomicBool,
865 /// Clock value used to decay heat-registry rates. Pass
866 /// `Instant::now()` in production; tests can fix this
867 /// for reproducibility.
868 pub now: Instant,
869}
870
871/// Drive one overflow tick.
872///
873/// Composes the hysteresis state machine + the controller's
874/// candidate computation + the sink's push action into a
875/// single async entry point. Operators call this from a
876/// periodic task at `config.tick_interval_ms` cadence; the
877/// function is idempotent against repeated calls (the
878/// hysteresis state filters out spurious ticks).
879///
880/// Returns a [`BlobOverflowTickReport`] with per-reason
881/// counters. Operators aggregate the report into Prometheus
882/// metrics via
883/// [`super::metrics::BlobMetrics::record_overflow_tick`].
884pub async fn drive_blob_overflow_tick(
885 controller: &BlobOverflowController<'_>,
886 sink: &dyn OverflowPushSink,
887 observation: OverflowTickObservation<'_>,
888 size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
889) -> BlobOverflowTickReport {
890 let OverflowTickObservation {
891 disk_used_bytes,
892 disk_total_bytes,
893 hysteresis_active,
894 now,
895 } = observation;
896 let mut report = BlobOverflowTickReport::default();
897 let disk_ratio = if disk_total_bytes == 0 {
898 // Adapter without a configured disk-cap reports
899 // `disk_total = 0`; the safe default is "never
900 // fire" rather than "always fire" (the latter
901 // would push the moment any chunk lands on a
902 // mis-configured node).
903 0.0
904 } else {
905 disk_used_bytes as f64 / disk_total_bytes as f64
906 };
907 report.disk_ratio_at_start = disk_ratio;
908 report.was_active_at_start = hysteresis_active.load(Ordering::Relaxed);
909
910 let fire = step_overflow_hysteresis(
911 hysteresis_active,
912 disk_ratio,
913 controller.config.high_water_ratio,
914 controller.config.low_water_ratio,
915 );
916 report.is_active_at_end = fire;
917
918 // Master switch gate. Even if disk crossed the high
919 // water mark, a disabled overflow config means we
920 // never push. Pin this so toggling `enabled = false`
921 // is a hard stop.
922 if !controller.config.enabled || !fire {
923 report.disk_ratio_at_end = disk_ratio;
924 return report;
925 }
926
927 // Sender-side self-check (plan § "Open design questions"
928 // #5): the controller's `config.enabled` says the operator
929 // wants overflow on, but the wire-level contract is
930 // gated on `cap.blob.overflow` propagating through the
931 // capability index. If the local node hasn't yet
932 // advertised the tag (announce hasn't fired since
933 // `set_overflow_enabled(true)`, or `local_caps` was
934 // never rebuilt), every push would round-trip an RPC
935 // just to get rejected `SenderNotOverflowing` by every
936 // peer. Skip the tick cleanly until the tag is visible
937 // on the sender's own caps snapshot; the next
938 // `announce_capabilities` rebroadcast resolves the race.
939 let local_blob = BlobCapability::from_capability_set(controller.local_caps);
940 if !local_blob.overflow_enabled {
941 tracing::debug!(
942 "blob overflow: master switch on but local cap.blob.overflow not yet advertised; \
943 skipping tick until announce_capabilities propagates the tag"
944 );
945 report.disk_ratio_at_end = disk_ratio;
946 return report;
947 }
948
949 // Compute candidates in one pass. `candidate_batch`
950 // tracks the no-target count inside its target-selection
951 // loop, so truncated-by-`max_pushes_per_tick` hashes never
952 // bump the counter (they were never tried).
953 let batch = controller.candidate_batch(now, &size_for_hash);
954 report.rejected_no_target = batch.no_target_count as u64;
955
956 // Fire pushes. `max_pushes_per_tick = 0` is a valid
957 // "trigger only, no real pushes" mode — the candidates
958 // list will be empty so we drop straight through.
959 for candidate in batch.candidates {
960 match sink
961 .push(
962 candidate.hash,
963 candidate.size_bytes,
964 candidate.target_node_id,
965 )
966 .await
967 {
968 Ok(()) => {
969 report.admitted += 1;
970 report.pushed_bytes = report.pushed_bytes.saturating_add(candidate.size_bytes);
971 }
972 Err(e) => {
973 tracing::trace!(
974 error = ?e,
975 hash = ?candidate.hash,
976 target = candidate.target_node_id,
977 "blob overflow: push failed; counted"
978 );
979 report.push_errors += 1;
980 }
981 }
982 }
983
984 // disk_ratio_at_end stays equal to start in P2; P3
985 // wires the durability watermark + a fresh disk-stat
986 // poll to surface the post-tick reclaim.
987 report.disk_ratio_at_end = disk_ratio;
988 report
989}
990
991/// `local` scope covers `peer` iff a push from a node with
992/// scope `local` can land on a node with scope `peer`. The
993/// rule mirrors the migration controller's
994/// `scope_at_least_as_narrow`: a Zone-scoped local node can
995/// push to Zone / Region / Mesh peers (the peer's scope
996/// covers the local one), but a Mesh-scoped local node
997/// can't push to a Node-scoped peer (the peer's scope is
998/// narrower and won't accept the cross-scope artifact).
999///
1000/// `local == Mesh` covers any peer scope (mesh is the widest
1001/// scope — any peer is reachable). `local == Node` is the
1002/// degenerate case: only same-node receivers qualify; in
1003/// practice this is the "never push" config.
1004fn scope_covers(local: TopologyScope, peer: TopologyScope) -> bool {
1005 use TopologyScope::*;
1006 matches!(
1007 (local, peer),
1008 (Mesh, _) | (Region, Region | Mesh) | (Zone, Zone | Region | Mesh) | (Node, Node)
1009 )
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::*;
1015 use crate::adapter::net::behavior::capability::CapabilityAnnouncement;
1016 use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
1017 use crate::adapter::net::identity::EntityId;
1018 use std::sync::atomic::AtomicU64;
1019 use std::time::Duration;
1020
1021 fn hex64(byte: u8) -> ([u8; 32], String) {
1022 let mut h = [0u8; 32];
1023 h.fill(byte);
1024 let hex: String = h.iter().map(|b| format!("{:02x}", b)).collect();
1025 (h, hex)
1026 }
1027
1028 /// Build a `CapabilitySet` for an overflow-enabled peer
1029 /// with `disk_free_gb` headroom and mesh-wide gravity.
1030 fn overflow_peer_caps(disk_free_gb: u64) -> CapabilitySet {
1031 CapabilitySet::new()
1032 .add_tag("dataforts.blob.storage")
1033 .add_tag("dataforts.blob.disk_total_gb=100")
1034 .add_tag(format!("dataforts.blob.disk_free_gb={}", disk_free_gb))
1035 .add_tag("dataforts.blob.overflow")
1036 .add_tag("dataforts.gravity.enabled")
1037 .add_tag("dataforts.gravity.scope=mesh")
1038 .add_tag("dataforts.gravity.proximity=128")
1039 }
1040
1041 /// Local node config: overflow-enabled, mesh scope.
1042 fn overflow_enabled_local_caps() -> CapabilitySet {
1043 CapabilitySet::new()
1044 .add_tag("dataforts.blob.storage")
1045 .add_tag("dataforts.blob.overflow")
1046 .add_tag("dataforts.gravity.enabled")
1047 .add_tag("dataforts.gravity.scope=mesh")
1048 .add_tag("dataforts.gravity.proximity=128")
1049 }
1050
1051 /// One recorded push call captured by the mock sink:
1052 /// `(hash, size, target_node_id)`. Named so the type
1053 /// stays readable in the `OverflowPushRecorder` field
1054 /// + the `calls()` snapshot return.
1055 type RecordedPushCall = ([u8; 32], u64, u64);
1056
1057 /// Shared call-log container the recorder mutates from
1058 /// inside an `&self` push method. `Arc<Mutex<Vec<_>>>`
1059 /// across clones so a test can hand a clone to the
1060 /// sink + inspect from the test body.
1061 type RecordedCallLog = Arc<parking_lot::Mutex<Vec<RecordedPushCall>>>;
1062
1063 /// Recorder sink — records every push call's
1064 /// `(hash, size, target)` tuple. The `fail_count` toggle
1065 /// lets tests inject sink errors to exercise the
1066 /// `push_errors` counter.
1067 struct OverflowPushRecorder {
1068 calls: RecordedCallLog,
1069 fail_count: Arc<AtomicU64>,
1070 }
1071
1072 impl OverflowPushRecorder {
1073 fn new() -> Self {
1074 Self {
1075 calls: Arc::new(parking_lot::Mutex::new(Vec::new())),
1076 fail_count: Arc::new(AtomicU64::new(0)),
1077 }
1078 }
1079
1080 fn calls(&self) -> Vec<RecordedPushCall> {
1081 self.calls.lock().clone()
1082 }
1083 }
1084
1085 #[async_trait]
1086 impl OverflowPushSink for OverflowPushRecorder {
1087 async fn push(
1088 &self,
1089 hash: [u8; 32],
1090 size_bytes: u64,
1091 target_node_id: u64,
1092 ) -> Result<(), BlobError> {
1093 if self.fail_count.load(Ordering::Relaxed) > 0 {
1094 self.fail_count.fetch_sub(1, Ordering::Relaxed);
1095 return Err(BlobError::NotFound("simulated push failure".to_string()));
1096 }
1097 self.calls.lock().push((hash, size_bytes, target_node_id));
1098 Ok(())
1099 }
1100 }
1101
1102 /// Build a `BlobHeatRegistry` with a list of `(hash, rate)`
1103 /// pairs at the supplied `now` instant. Each entry is
1104 /// freshly seeded and bumped to its target rate via direct
1105 /// counter access.
1106 fn heat_registry_with(
1107 now: Instant,
1108 entries: &[([u8; 32], f64)],
1109 ) -> Arc<parking_lot::Mutex<BlobHeatRegistry>> {
1110 let mut reg = BlobHeatRegistry::new();
1111 for (hash, rate) in entries {
1112 let counter = reg.entry_mut(*hash, Duration::from_secs(60), now);
1113 // Bump `*rate` times to reach the target rate;
1114 // each bump adds 1.0 after decay. `now` is the
1115 // same for every bump so no decay happens.
1116 for _ in 0..(*rate as usize) {
1117 counter.bump(now);
1118 }
1119 }
1120 Arc::new(parking_lot::Mutex::new(reg))
1121 }
1122
1123 /// Refcount table where every supplied hash is
1124 /// `refcount = 0, !pinned` — eligible for overflow.
1125 /// `store_observed` is the cheapest way to land an
1126 /// entry at refcount 0 + a recorded `first_seen` time.
1127 fn refcount_with_zero(hashes: &[[u8; 32]], now_ms: u64) -> BlobRefcountTable {
1128 let rc = BlobRefcountTable::new();
1129 for h in hashes {
1130 rc.store_observed(*h, 0, now_ms);
1131 }
1132 rc
1133 }
1134
1135 fn cap_index_with(peers: &[(u64, [u8; 32], CapabilitySet)]) -> Fold<CapabilityFold> {
1136 let fold = Fold::<CapabilityFold>::with_sweep_interval(std::time::Duration::ZERO);
1137 for (idx, (node_id, entity_bytes, caps)) in peers.iter().enumerate() {
1138 let entity = EntityId::from_bytes(*entity_bytes);
1139 capability_bridge::apply_legacy_announcement(
1140 &fold,
1141 CapabilityAnnouncement::new(*node_id, entity, 1 + idx as u64, caps.clone()),
1142 )
1143 .expect("apply legacy announcement in fixture");
1144 }
1145 fold
1146 }
1147
1148 // ========================================================================
1149 // step_overflow_hysteresis (pure-logic state machine)
1150 // ========================================================================
1151
1152 #[test]
1153 fn hysteresis_fires_above_high_water() {
1154 let active = AtomicBool::new(false);
1155 assert!(step_overflow_hysteresis(&active, 0.90, 0.85, 0.70));
1156 assert!(active.load(Ordering::Relaxed));
1157 }
1158
1159 #[test]
1160 fn hysteresis_clears_below_low_water() {
1161 let active = AtomicBool::new(true);
1162 assert!(!step_overflow_hysteresis(&active, 0.65, 0.85, 0.70));
1163 assert!(!active.load(Ordering::Relaxed));
1164 }
1165
1166 #[test]
1167 fn hysteresis_holds_state_in_band() {
1168 // Between low (0.70) and high (0.85): hold prior
1169 // state regardless of which boundary was last
1170 // crossed.
1171 let active = AtomicBool::new(true);
1172 assert!(step_overflow_hysteresis(&active, 0.80, 0.85, 0.70));
1173 assert!(active.load(Ordering::Relaxed));
1174
1175 let inactive = AtomicBool::new(false);
1176 assert!(!step_overflow_hysteresis(&inactive, 0.80, 0.85, 0.70));
1177 assert!(!inactive.load(Ordering::Relaxed));
1178 }
1179
1180 #[test]
1181 fn hysteresis_boundary_inclusive() {
1182 // disk_ratio == high_water fires (>=);
1183 // disk_ratio == low_water clears (<=).
1184 let active = AtomicBool::new(false);
1185 assert!(step_overflow_hysteresis(&active, 0.85, 0.85, 0.70));
1186 let active2 = AtomicBool::new(true);
1187 assert!(!step_overflow_hysteresis(&active2, 0.70, 0.85, 0.70));
1188 }
1189
1190 // ========================================================================
1191 // BlobOverflowController::candidates
1192 // ========================================================================
1193
1194 #[test]
1195 fn controller_candidates_returns_coldest_first() {
1196 // Three hashes with rates 0.0 / 1.0 / 5.0 →
1197 // ordering A (cold) / B (warm) / C (hot).
1198 let now = Instant::now();
1199 let (a, _) = hex64(0xAA);
1200 let (b, _) = hex64(0xBB);
1201 let (c, _) = hex64(0xCC);
1202 let heat = heat_registry_with(now, &[(a, 0.0), (b, 1.0), (c, 5.0)]);
1203 let refcount = refcount_with_zero(&[a, b, c], 1_000_000);
1204 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1205 let fold = cap_index_with(&[peer]);
1206 let local = overflow_enabled_local_caps();
1207 let cfg = OverflowConfig {
1208 enabled: true,
1209 max_pushes_per_tick: 16,
1210 ..Default::default()
1211 };
1212 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1213
1214 let cands = controller.candidates(now, |_| Some(1024));
1215 assert_eq!(cands.len(), 3);
1216 // a (rate 0.0) first; c (rate 5.0) last.
1217 assert_eq!(cands[0].hash, a);
1218 assert_eq!(cands[2].hash, c);
1219 }
1220
1221 #[test]
1222 fn controller_skips_pinned_hashes() {
1223 let now = Instant::now();
1224 let (a, _) = hex64(0xAA);
1225 let (b, _) = hex64(0xBB);
1226 let heat = heat_registry_with(now, &[(a, 0.0), (b, 0.0)]);
1227 let refcount = BlobRefcountTable::new();
1228 refcount.store_observed(a, 0, 1_000_000);
1229 refcount.pin(a, 1_000_000);
1230 refcount.store_observed(b, 0, 1_000_000);
1231 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1232 let fold = cap_index_with(&[peer]);
1233 let local = overflow_enabled_local_caps();
1234 let cfg = OverflowConfig {
1235 enabled: true,
1236 max_pushes_per_tick: 16,
1237 ..Default::default()
1238 };
1239 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1240
1241 let cands = controller.candidates(now, |_| Some(1024));
1242 // Pinned `a` skipped; only unpinned `b` surfaces.
1243 assert_eq!(cands.len(), 1);
1244 assert_eq!(cands[0].hash, b);
1245 }
1246
1247 #[test]
1248 fn controller_skips_hashes_with_nonzero_refcount() {
1249 let now = Instant::now();
1250 let (a, _) = hex64(0xAA);
1251 let heat = heat_registry_with(now, &[(a, 0.0)]);
1252 let refcount = BlobRefcountTable::new();
1253 refcount.incr(a, 1_000_000); // refcount = 1, not droppable
1254 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1255 let fold = cap_index_with(&[peer]);
1256 let local = overflow_enabled_local_caps();
1257 let cfg = OverflowConfig {
1258 enabled: true,
1259 max_pushes_per_tick: 16,
1260 ..Default::default()
1261 };
1262 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1263 assert!(controller.candidates(now, |_| Some(1024)).is_empty());
1264 }
1265
1266 #[test]
1267 fn controller_picks_highest_disk_free_target() {
1268 // Two peers, both overflow-enabled. Peer 99 has 40
1269 // GiB free; peer 88 has 80 GiB free. Greedy spread
1270 // → peer 88 wins.
1271 let now = Instant::now();
1272 let (a, _) = hex64(0xAA);
1273 let heat = heat_registry_with(now, &[(a, 0.0)]);
1274 let refcount = refcount_with_zero(&[a], 1_000_000);
1275 let peer_low = (99u64, [0x11; 32], overflow_peer_caps(40));
1276 let peer_high = (88u64, [0x22; 32], overflow_peer_caps(80));
1277 let fold = cap_index_with(&[peer_low, peer_high]);
1278 let local = overflow_enabled_local_caps();
1279 let cfg = OverflowConfig {
1280 enabled: true,
1281 max_pushes_per_tick: 16,
1282 ..Default::default()
1283 };
1284 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1285
1286 let cands = controller.candidates(now, |_| Some(1024));
1287 assert_eq!(cands.len(), 1);
1288 assert_eq!(cands[0].target_node_id, 88);
1289 }
1290
1291 #[test]
1292 fn controller_skips_peers_without_overflow_tag() {
1293 // Peer has storage + disk + scope BUT no overflow
1294 // tag → not a valid target.
1295 let now = Instant::now();
1296 let (a, _) = hex64(0xAA);
1297 let heat = heat_registry_with(now, &[(a, 0.0)]);
1298 let refcount = refcount_with_zero(&[a], 1_000_000);
1299 let no_overflow_peer_caps = CapabilitySet::new()
1300 .add_tag("dataforts.blob.storage")
1301 .add_tag("dataforts.blob.disk_total_gb=100")
1302 .add_tag("dataforts.blob.disk_free_gb=80")
1303 .add_tag("dataforts.gravity.enabled")
1304 .add_tag("dataforts.gravity.scope=mesh")
1305 .add_tag("dataforts.gravity.proximity=128");
1306 let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
1307 let fold = cap_index_with(&[peer]);
1308 let local = overflow_enabled_local_caps();
1309 let cfg = OverflowConfig {
1310 enabled: true,
1311 max_pushes_per_tick: 16,
1312 ..Default::default()
1313 };
1314 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1315 assert!(controller.candidates(now, |_| Some(1024)).is_empty());
1316 }
1317
1318 #[test]
1319 fn controller_skips_peers_with_insufficient_disk() {
1320 // Peer has 1 GiB free; we're pushing a 4 GiB blob →
1321 // no target.
1322 let now = Instant::now();
1323 let (a, _) = hex64(0xAA);
1324 let heat = heat_registry_with(now, &[(a, 0.0)]);
1325 let refcount = refcount_with_zero(&[a], 1_000_000);
1326 let peer = (99u64, [0x11; 32], overflow_peer_caps(1));
1327 let fold = cap_index_with(&[peer]);
1328 let local = overflow_enabled_local_caps();
1329 let cfg = OverflowConfig {
1330 enabled: true,
1331 max_pushes_per_tick: 16,
1332 ..Default::default()
1333 };
1334 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1335 let four_gib: u64 = 4 * (1 << 30);
1336 assert!(controller.candidates(now, |_| Some(four_gib)).is_empty());
1337 }
1338
1339 #[test]
1340 fn controller_truncates_to_max_pushes_per_tick() {
1341 let now = Instant::now();
1342 let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
1343 let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
1344 let heat = heat_registry_with(now, &entries);
1345 let refcount = refcount_with_zero(&hashes, 1_000_000);
1346 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1347 let fold = cap_index_with(&[peer]);
1348 let local = overflow_enabled_local_caps();
1349 let cfg = OverflowConfig {
1350 enabled: true,
1351 max_pushes_per_tick: 2,
1352 ..Default::default()
1353 };
1354 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1355 let cands = controller.candidates(now, |_| Some(1024));
1356 assert_eq!(
1357 cands.len(),
1358 2,
1359 "max_pushes_per_tick caps the candidate list"
1360 );
1361 }
1362
1363 // ========================================================================
1364 // drive_blob_overflow_tick — end-to-end against the recorder sink
1365 // ========================================================================
1366
1367 #[tokio::test]
1368 async fn tick_no_op_when_below_low_water() {
1369 let now = Instant::now();
1370 let (a, _) = hex64(0xAA);
1371 let heat = heat_registry_with(now, &[(a, 0.0)]);
1372 let refcount = refcount_with_zero(&[a], 1_000_000);
1373 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1374 let fold = cap_index_with(&[peer]);
1375 let local = overflow_enabled_local_caps();
1376 let cfg = OverflowConfig {
1377 enabled: true,
1378 ..Default::default()
1379 };
1380 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1381 let active = AtomicBool::new(false);
1382 let sink = OverflowPushRecorder::new();
1383
1384 // disk_ratio = 0.50 — below low_water (0.70).
1385 let report = drive_blob_overflow_tick(
1386 &controller,
1387 &sink,
1388 OverflowTickObservation {
1389 disk_used_bytes: 500,
1390 disk_total_bytes: 1000,
1391 hysteresis_active: &active,
1392 now,
1393 },
1394 |_| Some(1024),
1395 )
1396 .await;
1397 assert_eq!(report.admitted, 0);
1398 assert!(!report.is_active_at_end);
1399 assert_eq!(sink.calls().len(), 0);
1400 }
1401
1402 #[tokio::test]
1403 async fn tick_fires_above_high_water_and_pushes_to_recorder() {
1404 let now = Instant::now();
1405 let (a, _) = hex64(0xAA);
1406 let heat = heat_registry_with(now, &[(a, 0.0)]);
1407 let refcount = refcount_with_zero(&[a], 1_000_000);
1408 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1409 let fold = cap_index_with(&[peer]);
1410 let local = overflow_enabled_local_caps();
1411 let cfg = OverflowConfig {
1412 enabled: true,
1413 ..Default::default()
1414 };
1415 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1416 let active = AtomicBool::new(false);
1417 let sink = OverflowPushRecorder::new();
1418
1419 // disk_ratio = 0.90 — above high_water (0.85).
1420 let report = drive_blob_overflow_tick(
1421 &controller,
1422 &sink,
1423 OverflowTickObservation {
1424 disk_used_bytes: 900,
1425 disk_total_bytes: 1000,
1426 hysteresis_active: &active,
1427 now,
1428 },
1429 |_| Some(1024),
1430 )
1431 .await;
1432 assert_eq!(report.admitted, 1);
1433 assert!(report.is_active_at_end);
1434 assert_eq!(report.pushed_bytes, 1024);
1435 let calls = sink.calls();
1436 assert_eq!(calls.len(), 1);
1437 assert_eq!(calls[0].0, a);
1438 assert_eq!(calls[0].2, 99);
1439 }
1440
1441 #[tokio::test]
1442 async fn tick_master_switch_off_skips_pushes_even_above_high_water() {
1443 let now = Instant::now();
1444 let (a, _) = hex64(0xAA);
1445 let heat = heat_registry_with(now, &[(a, 0.0)]);
1446 let refcount = refcount_with_zero(&[a], 1_000_000);
1447 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1448 let fold = cap_index_with(&[peer]);
1449 let local = overflow_enabled_local_caps();
1450 let cfg = OverflowConfig {
1451 enabled: false, // master switch off
1452 ..Default::default()
1453 };
1454 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1455 let active = AtomicBool::new(false);
1456 let sink = OverflowPushRecorder::new();
1457
1458 let report = drive_blob_overflow_tick(
1459 &controller,
1460 &sink,
1461 OverflowTickObservation {
1462 disk_used_bytes: 900,
1463 disk_total_bytes: 1000,
1464 hysteresis_active: &active,
1465 now,
1466 },
1467 |_| Some(1024),
1468 )
1469 .await;
1470 // Hysteresis still transitions (the disk state machine
1471 // is independent of the master switch — operators
1472 // dashboarding the active gauge should see it climb
1473 // before they enable). Pushes don't fire.
1474 assert!(report.is_active_at_end);
1475 assert_eq!(report.admitted, 0);
1476 assert_eq!(sink.calls().len(), 0);
1477 }
1478
1479 #[tokio::test]
1480 async fn tick_records_push_errors_when_sink_fails() {
1481 let now = Instant::now();
1482 let (a, _) = hex64(0xAA);
1483 let heat = heat_registry_with(now, &[(a, 0.0)]);
1484 let refcount = refcount_with_zero(&[a], 1_000_000);
1485 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1486 let fold = cap_index_with(&[peer]);
1487 let local = overflow_enabled_local_caps();
1488 let cfg = OverflowConfig {
1489 enabled: true,
1490 ..Default::default()
1491 };
1492 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1493 let active = AtomicBool::new(false);
1494 let sink = OverflowPushRecorder::new();
1495 sink.fail_count.store(1, Ordering::Relaxed);
1496
1497 let report = drive_blob_overflow_tick(
1498 &controller,
1499 &sink,
1500 OverflowTickObservation {
1501 disk_used_bytes: 900,
1502 disk_total_bytes: 1000,
1503 hysteresis_active: &active,
1504 now,
1505 },
1506 |_| Some(1024),
1507 )
1508 .await;
1509 assert_eq!(report.admitted, 0);
1510 assert_eq!(report.push_errors, 1);
1511 assert_eq!(report.pushed_bytes, 0);
1512 }
1513
1514 #[tokio::test]
1515 async fn tick_records_no_target_when_no_overflow_enabled_peer() {
1516 let now = Instant::now();
1517 let (a, _) = hex64(0xAA);
1518 let heat = heat_registry_with(now, &[(a, 0.0)]);
1519 let refcount = refcount_with_zero(&[a], 1_000_000);
1520 // Peer has no overflow tag.
1521 let no_overflow_peer_caps = CapabilitySet::new()
1522 .add_tag("dataforts.blob.storage")
1523 .add_tag("dataforts.blob.disk_total_gb=100")
1524 .add_tag("dataforts.blob.disk_free_gb=80")
1525 .add_tag("dataforts.gravity.enabled")
1526 .add_tag("dataforts.gravity.scope=mesh");
1527 let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
1528 let fold = cap_index_with(&[peer]);
1529 let local = overflow_enabled_local_caps();
1530 let cfg = OverflowConfig {
1531 enabled: true,
1532 ..Default::default()
1533 };
1534 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1535 let active = AtomicBool::new(false);
1536 let sink = OverflowPushRecorder::new();
1537
1538 let report = drive_blob_overflow_tick(
1539 &controller,
1540 &sink,
1541 OverflowTickObservation {
1542 disk_used_bytes: 900,
1543 disk_total_bytes: 1000,
1544 hysteresis_active: &active,
1545 now,
1546 },
1547 |_| Some(1024),
1548 )
1549 .await;
1550 assert_eq!(report.admitted, 0);
1551 assert_eq!(report.rejected_no_target, 1);
1552 assert_eq!(sink.calls().len(), 0);
1553 }
1554
1555 #[tokio::test]
1556 async fn tick_skips_when_local_overflow_tag_not_advertised() {
1557 // `OverflowConfig.enabled = true` but `local_caps`
1558 // doesn't carry `dataforts.blob.overflow` — the
1559 // operator flipped the master switch on the adapter
1560 // but `announce_capabilities` hasn't rebuilt the
1561 // local caps snapshot yet. Sender-side self-check
1562 // (plan § Open design Q #5) must skip the tick:
1563 // every push would round-trip an RPC and come back
1564 // `Rejected(SenderNotOverflowing)`, wasting wire
1565 // and bumping `push_errors` without making progress.
1566 let now = Instant::now();
1567 let (a, _) = hex64(0xAA);
1568 let heat = heat_registry_with(now, &[(a, 0.0)]);
1569 let refcount = refcount_with_zero(&[a], 1_000_000);
1570 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1571 let fold = cap_index_with(&[peer]);
1572 // Local caps WITHOUT `dataforts.blob.overflow` tag.
1573 let local = CapabilitySet::new()
1574 .add_tag("dataforts.blob.storage")
1575 .add_tag("dataforts.gravity.enabled")
1576 .add_tag("dataforts.gravity.scope=mesh")
1577 .add_tag("dataforts.gravity.proximity=128");
1578 let cfg = OverflowConfig {
1579 enabled: true,
1580 ..Default::default()
1581 };
1582 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1583 let active = AtomicBool::new(false);
1584 let sink = OverflowPushRecorder::new();
1585
1586 let report = drive_blob_overflow_tick(
1587 &controller,
1588 &sink,
1589 OverflowTickObservation {
1590 disk_used_bytes: 900,
1591 disk_total_bytes: 1000,
1592 hysteresis_active: &active,
1593 now,
1594 },
1595 |_| Some(1024),
1596 )
1597 .await;
1598 // Hysteresis still flips (disk genuinely crossed
1599 // the high-water mark — the gauge should reflect
1600 // that), but no pushes fire and no rejections
1601 // count.
1602 assert!(report.is_active_at_end);
1603 assert_eq!(report.admitted, 0);
1604 assert_eq!(report.push_errors, 0);
1605 assert_eq!(report.rejected_no_target, 0);
1606 assert_eq!(sink.calls().len(), 0);
1607 }
1608
1609 #[tokio::test]
1610 async fn tick_no_target_excludes_truncated_hashes() {
1611 // Pre-pick count exceeds `max_pushes_per_tick`: every
1612 // attempted hash finds a target (so `rejected_no_target`
1613 // must stay 0), the truncated tail is never tried, and
1614 // exactly `max_pushes_per_tick` pushes fire. Regression
1615 // against the prior `pre_pick - candidates.len()` math
1616 // that conflated truncation with no-target.
1617 let now = Instant::now();
1618 let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
1619 let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
1620 let heat = heat_registry_with(now, &entries);
1621 let refcount = refcount_with_zero(&hashes, 1_000_000);
1622 let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
1623 let fold = cap_index_with(&[peer]);
1624 let local = overflow_enabled_local_caps();
1625 let cfg = OverflowConfig {
1626 enabled: true,
1627 max_pushes_per_tick: 2,
1628 ..Default::default()
1629 };
1630 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1631 let active = AtomicBool::new(false);
1632 let sink = OverflowPushRecorder::new();
1633
1634 let report = drive_blob_overflow_tick(
1635 &controller,
1636 &sink,
1637 OverflowTickObservation {
1638 disk_used_bytes: 900,
1639 disk_total_bytes: 1000,
1640 hysteresis_active: &active,
1641 now,
1642 },
1643 |_| Some(1024),
1644 )
1645 .await;
1646 assert_eq!(report.admitted, 2);
1647 assert_eq!(
1648 report.rejected_no_target, 0,
1649 "truncated hashes (never attempted) must NOT bump rejected_no_target"
1650 );
1651 assert_eq!(sink.calls().len(), 2);
1652 }
1653
1654 #[tokio::test]
1655 async fn tick_no_target_counts_only_attempted_failures() {
1656 // Mix: two hashes need 4 GiB; peer offers 80 GiB so
1657 // both find targets. Two more hashes need 100 GiB; no
1658 // peer can take them → both bump `rejected_no_target`.
1659 // With max_pushes_per_tick=3 the loop stops after the
1660 // 3rd successful push attempt, so we should see
1661 // admitted=2 and no_target=2 (both attempted) — NOT a
1662 // capped diff that would mis-attribute the truncation.
1663 let now = Instant::now();
1664 // Order by hash bytes (sort is coldest-first, ties by
1665 // hash). Use distinct first bytes so order is
1666 // predictable.
1667 let (small1, _) = hex64(0x01);
1668 let (big1, _) = hex64(0x02);
1669 let (small2, _) = hex64(0x03);
1670 let (big2, _) = hex64(0x04);
1671 let heat = heat_registry_with(
1672 now,
1673 &[(small1, 0.0), (big1, 0.0), (small2, 0.0), (big2, 0.0)],
1674 );
1675 let refcount = refcount_with_zero(&[small1, big1, small2, big2], 1_000_000);
1676 let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
1677 let fold = cap_index_with(&[peer]);
1678 let local = overflow_enabled_local_caps();
1679 let cfg = OverflowConfig {
1680 enabled: true,
1681 max_pushes_per_tick: 3,
1682 ..Default::default()
1683 };
1684 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1685 let active = AtomicBool::new(false);
1686 let sink = OverflowPushRecorder::new();
1687
1688 let size_for_hash = move |h: [u8; 32]| -> Option<u64> {
1689 if h == big1 || h == big2 {
1690 Some(100 * (1 << 30)) // 100 GiB — over peer's free
1691 } else {
1692 Some(1024) // tiny
1693 }
1694 };
1695 let report = drive_blob_overflow_tick(
1696 &controller,
1697 &sink,
1698 OverflowTickObservation {
1699 disk_used_bytes: 900,
1700 disk_total_bytes: 1000,
1701 hysteresis_active: &active,
1702 now,
1703 },
1704 size_for_hash,
1705 )
1706 .await;
1707 // 2 small hashes fit (admitted), 2 big hashes have no
1708 // target. The loop never hits the `max=3` cap because
1709 // there are only 4 candidates total.
1710 assert_eq!(report.admitted, 2);
1711 assert_eq!(report.rejected_no_target, 2);
1712 }
1713
1714 #[tokio::test]
1715 async fn tick_zero_disk_total_never_fires() {
1716 // disk_total = 0 → ratio = 0.0 → always below high
1717 // water. Defends against misconfigured nodes that
1718 // would push the moment any chunk lands.
1719 let now = Instant::now();
1720 let (a, _) = hex64(0xAA);
1721 let heat = heat_registry_with(now, &[(a, 0.0)]);
1722 let refcount = refcount_with_zero(&[a], 1_000_000);
1723 let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1724 let fold = cap_index_with(&[peer]);
1725 let local = overflow_enabled_local_caps();
1726 let cfg = OverflowConfig {
1727 enabled: true,
1728 ..Default::default()
1729 };
1730 let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1731 let active = AtomicBool::new(false);
1732 let sink = OverflowPushRecorder::new();
1733
1734 let report = drive_blob_overflow_tick(
1735 &controller,
1736 &sink,
1737 OverflowTickObservation {
1738 disk_used_bytes: 500,
1739 disk_total_bytes: 0, // disk_total = 0
1740 hysteresis_active: &active,
1741 now,
1742 },
1743 |_| Some(1024),
1744 )
1745 .await;
1746 assert_eq!(report.disk_ratio_at_start, 0.0);
1747 assert!(!report.is_active_at_end);
1748 assert_eq!(sink.calls().len(), 0);
1749 }
1750
1751 // ========================================================================
1752 // scope_covers
1753 // ========================================================================
1754
1755 #[test]
1756 fn scope_covers_mesh_covers_everything() {
1757 use TopologyScope::*;
1758 for peer in [Node, Zone, Region, Mesh] {
1759 assert!(scope_covers(Mesh, peer));
1760 }
1761 }
1762
1763 #[test]
1764 fn scope_covers_zone_does_not_cover_node() {
1765 // Zone-scoped sender can't push to a Node-scoped
1766 // peer (peer is narrower; won't accept cross-scope).
1767 assert!(!scope_covers(TopologyScope::Zone, TopologyScope::Node));
1768 // But Zone-scoped sender CAN push to Zone / Region /
1769 // Mesh peers (peer's scope covers the sender's).
1770 assert!(scope_covers(TopologyScope::Zone, TopologyScope::Zone));
1771 assert!(scope_covers(TopologyScope::Zone, TopologyScope::Region));
1772 assert!(scope_covers(TopologyScope::Zone, TopologyScope::Mesh));
1773 }
1774
1775 // ========================================================================
1776 // Wire types (P3) — postcard round-trip
1777 //
1778 // The receive side decodes `OverflowPush` from the nRPC payload and
1779 // encodes `OverflowPushAck` back; the sender's
1780 // `MeshNodeOverflowPushSink` does the inverse. Verify the encode +
1781 // decode are total inverses for every typed variant so a sender +
1782 // receiver on different builds can't observe wire-format divergence.
1783 // ========================================================================
1784
1785 #[test]
1786 fn overflow_push_request_round_trips_postcard() {
1787 let req = OverflowPush {
1788 blob_hash: [0xAA; 32],
1789 size_bytes: 4 * (1 << 20),
1790 sender_node_id: 0xDEAD_BEEF_u64,
1791 };
1792 let bytes = postcard::to_allocvec(&req).expect("encode");
1793 let decoded: OverflowPush = postcard::from_bytes(&bytes).expect("decode");
1794 assert_eq!(decoded, req);
1795 }
1796
1797 #[test]
1798 fn overflow_push_ack_accepted_round_trips() {
1799 let ack = OverflowPushAck::Accepted;
1800 let bytes = postcard::to_allocvec(&ack).expect("encode");
1801 let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1802 assert_eq!(decoded, ack);
1803 }
1804
1805 #[test]
1806 fn overflow_push_ack_rejected_carries_typed_reason() {
1807 // Every `OverflowReject` variant round-trips inside
1808 // the ack. Operators dashboard the typed reason on
1809 // the sender side; the wire form must preserve it.
1810 for reason in [
1811 OverflowReject::NoStorageCap,
1812 OverflowReject::NotParticipating,
1813 OverflowReject::SenderNotOverflowing,
1814 OverflowReject::Unhealthy,
1815 OverflowReject::ScopeMismatch,
1816 OverflowReject::InsufficientDisk,
1817 ] {
1818 let ack = OverflowPushAck::Rejected(reason);
1819 let bytes = postcard::to_allocvec(&ack).expect("encode");
1820 let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1821 assert_eq!(decoded, ack, "ack with {:?} must round-trip", reason);
1822 }
1823 }
1824
1825 #[test]
1826 fn overflow_push_ack_open_chunk_failed_round_trips() {
1827 let ack = OverflowPushAck::OpenChunkFailed;
1828 let bytes = postcard::to_allocvec(&ack).expect("encode");
1829 let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1830 assert_eq!(decoded, ack);
1831 }
1832
1833 #[test]
1834 fn overflow_push_service_name_is_stable() {
1835 // Pin the wire-level service-name token. A change here
1836 // would silently break sender/receiver compatibility
1837 // across builds (both pieces are gated by feature flag
1838 // but ship in the same crate).
1839 assert_eq!(OVERFLOW_PUSH_SERVICE, "dataforts.blob.overflow_push");
1840 }
1841}