Skip to main content

net/adapter/net/dataforts/blob/
overflow.rs

1//! Active-overflow controller (v0.3 P2).
2//!
3//! Push-side complement of [`super::migration::BlobMigrationController`].
4//! Migration is *pull* (the local node decides to take an
5//! advertised hot blob); overflow is *push* (the local node
6//! decides to shed a cold blob and a remote node decides
7//! whether to accept). The two surfaces parallel each other —
8//! every reject reason on either side maps to a Prometheus
9//! counter label so operators can dashboard both directions.
10//!
11//! See [`DATAFORTS_BLOB_OVERFLOW_PLAN.md`] for the full design.
12//!
13//! # P2 scope
14//!
15//! Pure-logic controller + tick driver + hysteresis state
16//! machine. The actual wire push (`OverflowPush` RPC) lands in
17//! P3; this module abstracts that away behind the
18//! [`OverflowPushSink`] trait so the tick can be unit-tested
19//! against a recorder without spinning up a real mesh.
20//!
21//! [`DATAFORTS_BLOB_OVERFLOW_PLAN.md`]: ../../../../../docs/plans/DATAFORTS_BLOB_OVERFLOW_PLAN.md
22
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::time::Instant;
26
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29
30use super::admission::OverflowReject;
31use super::error::BlobError;
32use super::mesh::OverflowConfig;
33use super::refcount::BlobRefcountTable;
34use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
35use crate::adapter::net::behavior::{
36    is_blob_storage_unhealthy, BlobCapability, CapabilitySet, GravityCapability, TopologyScope,
37};
38use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
39
40/// Service-name token for the overflow-push nRPC channel.
41/// The sender constructs a request on
42/// `"{OVERFLOW_PUSH_SERVICE}.requests"` and listens on
43/// `"{OVERFLOW_PUSH_SERVICE}.replies.<origin>"`; the receiver
44/// registers a handler under the same service name via
45/// [`crate::adapter::net::MeshNode::serve_overflow_push`].
46///
47/// Held as a const so a typo on either side surfaces at
48/// compile time. The wire form is the literal string — no
49/// version suffix (per-tag versioning lives inside the wire
50/// payload, not the channel name).
51pub const OVERFLOW_PUSH_SERVICE: &str = "dataforts.blob.overflow_push";
52
53/// Wire request body for an overflow push. The sender encodes
54/// this via postcard + drops it into the nRPC payload; the
55/// receiver decodes, runs [`super::admission::should_accept_overflow_from`],
56/// and on Admit opens the chunk channel against the local
57/// adapter so the existing replication runtime can pull the
58/// bytes.
59///
60/// The chunk bytes themselves do NOT ride this request — the
61/// nRPC envelope carries the *nudge*, not the chunk payload.
62/// `size_bytes` is the resolved chunk size so the receive-side
63/// disk-gate can fire without round-tripping a `stat` call.
64///
65/// Wire layout: postcard's default `(field_order)` encoding.
66/// The field order is locked here for forward compatibility;
67/// adding new fields requires a versioned variant (the trait-
68/// object polymorphism on the postcard side is rigid). A
69/// future v2 would land as a separate type registered under
70/// a new service-name token, with v1 receivers ignoring the
71/// new channel.
72#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
73pub struct OverflowPush {
74    /// 32-byte BLAKE3 hash of the chunk to push.
75    pub blob_hash: [u8; 32],
76    /// Wire size of the chunk in bytes. Drives the receive-
77    /// side disk-gate.
78    pub size_bytes: u64,
79    /// Sender's canonical `node_id`. The receiver synthesizes
80    /// the sender's `CapabilitySet` from its local capability
81    /// fold keyed on this id; the admission check reads
82    /// `overflow_enabled` + scope tags from the synthesized
83    /// snapshot, not from the request body. Defends against a
84    /// sender forging its caps via the request — the only
85    /// authority is the verified fold state.
86    pub sender_node_id: u64,
87}
88
89/// Wire response body. Sender-side observes the result and
90/// either records the admission outcome (`Accepted`) or
91/// dispatches the typed reject reason to the per-reason
92/// counter family. The chunk-channel open on the receive
93/// side happens *during* `Accepted` — by the time the sender
94/// observes `Accepted`, the receiver has either successfully
95/// opened the channel or returned a typed error variant.
96#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
97pub enum OverflowPushAck {
98    /// Receiver ran admission, returned Admit, and the
99    /// chunk channel open returned Ok. The bytes are now in
100    /// flight via the existing replication runtime; the
101    /// durability watermark observation (post-tick) is the
102    /// sender's signal to drop the local copy.
103    Accepted,
104    /// Receiver ran admission, returned Reject. Carries the
105    /// typed reason so the sender can break out per-reason
106    /// counters + decide whether to retry against the same
107    /// peer (e.g. `InsufficientDisk` won't change quickly; a
108    /// different target is the right move) or pick a new one.
109    Rejected(OverflowReject),
110    /// Receiver ran admission, returned Admit, but the
111    /// chunk channel open itself failed (the replication
112    /// runtime couldn't spawn, a transient disk error, etc.).
113    /// Wire-distinct from `Rejected` because the failure
114    /// mode is "we wanted to take it, our local plumbing
115    /// broke" rather than "we won't take it." Operators
116    /// alarm on `OpenChunkFailed` more aggressively.
117    OpenChunkFailed,
118}
119
120/// Output of [`BlobOverflowController::candidate_batch`]: the
121/// list of candidates to push this tick, plus the precise
122/// count of hashes that were attempted for target selection
123/// but found no eligible peer. The pair lets the tick driver
124/// report `rejected_no_target` accurately — distinguishing
125/// "we tried and no peer qualified" from "we hit the per-tick
126/// push cap and never tried the rest."
127#[derive(Clone, Debug, Default)]
128pub struct OverflowCandidateBatch {
129    /// Candidates with a selected target peer, truncated to
130    /// `config.max_pushes_per_tick`.
131    pub candidates: Vec<BlobOverflowCandidate>,
132    /// Number of hashes the controller attempted target
133    /// selection for and got `None` back. Bounded above by
134    /// `config.max_pushes_per_tick` (the loop breaks once
135    /// `candidates.len()` reaches the cap, so further
136    /// hashes are never tried). The tick driver routes
137    /// this directly to `rejected_no_target` without
138    /// double-counting truncated hashes.
139    pub no_target_count: usize,
140}
141
142/// One overflow-push candidate the controller is considering
143/// for this tick. The push controller already selected a
144/// target peer for `hash`; the tick driver routes the actual
145/// push call through [`OverflowPushSink::push`].
146///
147/// Equivalent shape to [`super::migration::BlobMigrationCandidate`]
148/// with the direction reversed — `target_node_id` is the
149/// receive-side, not the publisher.
150#[derive(Clone, Debug)]
151pub struct BlobOverflowCandidate {
152    /// 32-byte chunk hash to push.
153    pub hash: [u8; 32],
154    /// Wire size of the chunk in bytes. Drives the receiver's
155    /// `disk_free_gb` admission gate; the sender supplies it
156    /// here so the receiver doesn't have to round-trip a
157    /// `stat` call first.
158    pub size_bytes: u64,
159    /// node_id of the selected receive-side peer.
160    pub target_node_id: u64,
161    /// Snapshot of the target's capability set at selection
162    /// time. The receive-side admission decision will re-read
163    /// the index fresh; this snapshot is for the sender's
164    /// dashboards / debug logs.
165    pub target_caps: CapabilitySet,
166    /// Decayed heat rate of `hash` at controller-tick time.
167    /// Coldest candidates come first when the tick truncates
168    /// to `max_pushes_per_tick`. `0.0` for hashes that haven't
169    /// been read since their last full decay window — these
170    /// are the prime overflow targets.
171    pub cold_rate: f64,
172}
173
174/// Per-tick report. Each field maps to a Prometheus counter
175/// label (`dataforts_blob_overflow_*`) so operators can
176/// dashboard the loop without hand-coding per-reason metrics.
177/// Pre-tick state lives in [`step_overflow_hysteresis`]; this
178/// report captures *only* the actions this tick took.
179#[derive(Clone, Debug, Default, PartialEq)]
180pub struct BlobOverflowTickReport {
181    /// Candidates that passed the controller's filters AND
182    /// the sink's push call returned `Ok`. Counts pushes
183    /// the wire-side accepted; durability watermark observation
184    /// is the next layer (P3).
185    pub admitted: u64,
186    /// Candidates the controller computed but no overflow-
187    /// enabled peer was reachable for. Bumps once per cold
188    /// hash that found no target — the same hash on the next
189    /// tick may find one as caps propagate.
190    pub rejected_no_target: u64,
191    /// Sink returned an error. Includes the receive-side
192    /// admission rejections that P3 maps to typed
193    /// `OverflowReject` variants, plus RPC transport errors,
194    /// plus the local-side chunk-open errors. Operators
195    /// disambiguate via the underlying wire counters; this
196    /// counter is the aggregated send-side view.
197    pub push_errors: u64,
198    /// The hysteresis state at the start of the tick. `true`
199    /// = the controller was already firing on the prior tick.
200    /// Useful for telemetry: the difference between "ticks
201    /// where overflow was firing the whole time" and "ticks
202    /// where the state just transitioned high" is operator-
203    /// meaningful (the latter signals a workload spike).
204    pub was_active_at_start: bool,
205    /// The hysteresis state at the end of the tick. The pair
206    /// `(was_active_at_start, is_active_at_end)` documents
207    /// the state machine transition this tick took.
208    pub is_active_at_end: bool,
209    /// Local disk-usage ratio at the start of the tick.
210    /// `disk_used_bytes / disk_total_bytes`. Surfaced for
211    /// operator dashboards.
212    pub disk_ratio_at_start: f64,
213    /// Local disk-usage ratio at the end of the tick. Equal
214    /// to `disk_ratio_at_start` in P2 (the actual freed-bytes
215    /// accounting needs the durability watermark observation
216    /// from P3 + a re-poll of disk stats). Reserved for P3.
217    pub disk_ratio_at_end: f64,
218    /// Total bytes pushed this tick. Sum of `size_bytes` of
219    /// every admitted candidate. P2 reports the *pushed*
220    /// volume; *reclaimed* volume waits for P3's durability
221    /// observation.
222    pub pushed_bytes: u64,
223}
224
225/// Pure-logic hysteresis state machine. Given the prior
226/// `active` state + the current `disk_ratio` + the two
227/// thresholds, return whether this tick should fire pushes
228/// and update `active` in place. Mirrors the existing
229/// [`super::metrics::evaluate_health_gate`] discipline
230/// (which uses identical 95 % / 85 % shape but for the
231/// health-gate tag).
232///
233/// Hysteresis rule:
234///
235/// - `disk_ratio >= high_water` → active = `true`.
236/// - `disk_ratio <= low_water` → active = `false`.
237/// - `low_water < disk_ratio < high_water` → active = prior
238///   value (stay where we were; the hysteresis band).
239///
240/// `low_water >= high_water` degenerates to "fire whenever
241/// disk_ratio >= high_water, clear whenever disk_ratio <= low_water"
242/// — operator misconfiguration but not unsafe; the active
243/// state just doesn't get the hysteresis benefit.
244///
245/// Returns the post-tick `active` state. The function reads
246/// from + writes to `active` under `Relaxed` ordering — the
247/// caller is the single tick driver, so no cross-thread
248/// ordering is needed; the atomic is for visibility across
249/// adapter clones (operator dashboard reads from one clone,
250/// tick fires on another).
251pub fn step_overflow_hysteresis(
252    active: &AtomicBool,
253    disk_ratio: f64,
254    high_water: f64,
255    low_water: f64,
256) -> bool {
257    let was_active = active.load(Ordering::Relaxed);
258    let now_active = if disk_ratio >= high_water {
259        true
260    } else if disk_ratio <= low_water {
261        false
262    } else {
263        was_active
264    };
265    if now_active != was_active {
266        active.store(now_active, Ordering::Relaxed);
267    }
268    now_active
269}
270
271/// Receive-side handler for the overflow nRPC. Implements
272/// [`cortex::RpcHandler`] so it slots into [`MeshNode::serve_rpc`]
273/// (under the [`OVERFLOW_PUSH_SERVICE`] service name). On each
274/// incoming request:
275///
276/// 1. Decode the postcard-encoded [`OverflowPush`].
277/// 2. Look up `sender_caps` in `capability_index` keyed on
278///    `request.sender_node_id`.
279/// 3. Run [`super::admission::should_accept_overflow_from`]
280///    against the live `local_caps` snapshot + the sender's
281///    caps + the chunk size.
282/// 4. On Admit: build a [`super::blob_ref::BlobRef::small`]
283///    from `(blob_hash, size_bytes)` and call
284///    [`super::adapter::BlobAdapter::prefetch`] — this opens
285///    the chunk channel with replication armed and the
286///    existing per-chunk replication runtime pulls the
287///    bytes from whoever advertises `causal:<hash>`
288///    (typically the sender). Returns
289///    [`OverflowPushAck::Accepted`] on success,
290///    [`OverflowPushAck::OpenChunkFailed`] on local-plumbing
291///    error.
292/// 5. On Reject: wrap the typed [`OverflowReject`] in
293///    [`OverflowPushAck::Rejected`] and return.
294///
295/// The handler holds `Arc<MeshNode>` so it reads live local
296/// caps + the capability index at each call rather than a
297/// build-time snapshot. Toggling `overflow_enabled` on the
298/// adapter is observable immediately on the next inbound
299/// push.
300///
301/// [`cortex::RpcHandler`]: crate::adapter::net::cortex::RpcHandler
302/// [`MeshNode::serve_rpc`]: crate::adapter::net::MeshNode::serve_rpc
303#[cfg(feature = "cortex")]
304pub struct OverflowPushHandler {
305    /// Reference to the local mesh node. Used for the
306    /// capability-index lookup + the local-caps snapshot.
307    /// Holds an `Arc` rather than a borrow because the
308    /// handler is registered into the nRPC fold which owns
309    /// it via `Arc<dyn RpcHandler>` — the handler outlives
310    /// any single tick.
311    pub mesh: Arc<crate::adapter::net::MeshNode>,
312    /// The local blob adapter. The handler calls
313    /// `adapter.prefetch(BlobRef)` on Admit to open the
314    /// chunk channel. Held by `Arc` for the same reason as
315    /// `mesh`; cheap to clone (the adapter is `Arc`-internal
316    /// throughout).
317    pub adapter: Arc<super::mesh::MeshBlobAdapter>,
318}
319
320#[cfg(feature = "cortex")]
321impl OverflowPushHandler {
322    /// Construct a handler. Operators wire this into the
323    /// receiver-side via
324    /// [`crate::adapter::net::MeshNode::serve_overflow_push`].
325    pub fn new(
326        mesh: Arc<crate::adapter::net::MeshNode>,
327        adapter: Arc<super::mesh::MeshBlobAdapter>,
328    ) -> Self {
329        Self { mesh, adapter }
330    }
331
332    /// Pure typed handler logic. Decoded request goes in,
333    /// typed ack comes out. Separate from the
334    /// [`crate::adapter::net::cortex::RpcHandler`] impl so
335    /// tests can drive the admission path without
336    /// constructing an [`crate::adapter::net::cortex::RpcContext`].
337    ///
338    /// Reads live `user_caps_snapshot` + capability-fold
339    /// state on each call, so an operator toggling
340    /// `overflow_enabled` on the local node is observed by
341    /// the next inbound push.
342    pub async fn handle(&self, request: OverflowPush) -> OverflowPushAck {
343        use super::adapter::BlobAdapter;
344        use super::admission::{should_accept_overflow_from, OverflowVerdict};
345        use super::blob_ref::BlobRef;
346
347        // Synthesize sender caps from the capability fold.
348        // Absent → use the empty default (which has
349        // `overflow_enabled = false`); the admission gate
350        // will then return `SenderNotOverflowing`.
351        let sender_caps =
352            super::super::super::behavior::fold::capability_bridge::synthesize_capability_set(
353                self.mesh.capability_fold(),
354                request.sender_node_id,
355            );
356
357        // Snapshot local caps fresh per request so a
358        // concurrent `set_overflow_enabled(false)` is
359        // observed immediately.
360        let local_caps = self.mesh.user_caps_snapshot();
361
362        let verdict = should_accept_overflow_from(&local_caps, &sender_caps, request.size_bytes);
363        match verdict {
364            OverflowVerdict::Reject(reason) => {
365                // Bump the per-reason rejection counter on
366                // the receive side. The sender's controller
367                // bumps `push_errors_total` separately;
368                // dashboards aggregate both surfaces.
369                self.adapter.record_overflow_reject(reason);
370                OverflowPushAck::Rejected(reason)
371            }
372            OverflowVerdict::Admit => {
373                // Build the BlobRef::Small the prefetch path
374                // wants. The URI is `mesh://<hex>` — opaque
375                // to the adapter (content-hash is the
376                // authoritative address) but the convention
377                // matches existing migration code.
378                let mut hex = String::with_capacity(64);
379                for b in request.blob_hash {
380                    use std::fmt::Write;
381                    let _ = write!(&mut hex, "{:02x}", b);
382                }
383                let blob_ref = BlobRef::small(
384                    format!("mesh://{}", hex),
385                    request.blob_hash,
386                    request.size_bytes,
387                );
388                match self.adapter.prefetch(&blob_ref).await {
389                    Ok(()) => OverflowPushAck::Accepted,
390                    Err(e) => {
391                        tracing::warn!(
392                            error = %e,
393                            hash = %hex,
394                            sender = request.sender_node_id,
395                            "overflow push: prefetch failed after admit",
396                        );
397                        OverflowPushAck::OpenChunkFailed
398                    }
399                }
400            }
401        }
402    }
403}
404
405#[cfg(feature = "cortex")]
406#[async_trait]
407impl crate::adapter::net::cortex::RpcHandler for OverflowPushHandler {
408    async fn call(
409        &self,
410        ctx: crate::adapter::net::cortex::RpcContext,
411    ) -> Result<
412        crate::adapter::net::cortex::RpcResponsePayload,
413        crate::adapter::net::cortex::RpcHandlerError,
414    > {
415        use crate::adapter::net::cortex::{RpcHandlerError, RpcResponsePayload, RpcStatus};
416
417        // Decode the request body. Malformed bytes surface
418        // as a typed Internal error — the caller sees
419        // `RpcStatus::Internal` with a short diagnostic,
420        // distinct from `Application(code)` which we use for
421        // typed admission rejections.
422        let request: OverflowPush = postcard::from_bytes(&ctx.payload.body)
423            .map_err(|e| RpcHandlerError::Internal(format!("overflow push: decode failed: {e}")))?;
424
425        let ack = self.handle(request).await;
426
427        // Encode the ack into the response body. Encoding
428        // failure is an internal bug (postcard for our typed
429        // enum is total); surface as Internal.
430        let body = postcard::to_allocvec(&ack).map_err(|e| {
431            RpcHandlerError::Internal(format!("overflow push: encode ack failed: {e}"))
432        })?;
433        Ok(RpcResponsePayload {
434            status: RpcStatus::Ok,
435            headers: Vec::new(),
436            body: bytes::Bytes::from(body),
437        })
438    }
439}
440
441/// Concrete [`OverflowPushSink`] implementation backed by a
442/// [`MeshNode`]. Wraps the sender-side nRPC call: each
443/// `push` invocation encodes the request, dispatches via
444/// [`MeshNode::call`] under the [`OVERFLOW_PUSH_SERVICE`]
445/// service name, decodes the typed [`OverflowPushAck`], and
446/// maps the outcome to the [`OverflowPushSink::push`]
447/// `Result` shape the controller expects.
448///
449/// Construct once per operator scheduler (the sink is cheap
450/// to clone — holds an `Arc<MeshNode>`). Pass to
451/// [`drive_blob_overflow_tick`] as `&dyn OverflowPushSink`.
452///
453/// [`MeshNode`]: crate::adapter::net::MeshNode
454/// [`MeshNode::call`]: crate::adapter::net::MeshNode::call
455#[cfg(feature = "cortex")]
456pub struct MeshNodeOverflowPushSink {
457    /// Reference to the local mesh. `Arc<MeshNode>` because
458    /// `MeshNode::call` is defined on `&Arc<Self>` — the
459    /// nRPC path needs the Arc to register the per-call
460    /// reply-channel subscription.
461    pub mesh: Arc<crate::adapter::net::MeshNode>,
462}
463
464#[cfg(feature = "cortex")]
465impl MeshNodeOverflowPushSink {
466    /// Wrap an existing mesh node as an overflow-push sink.
467    /// `Arc::clone` is cheap; one sink per operator scheduler
468    /// is the typical shape.
469    pub fn new(mesh: Arc<crate::adapter::net::MeshNode>) -> Self {
470        Self { mesh }
471    }
472}
473
474#[cfg(feature = "cortex")]
475#[async_trait]
476impl OverflowPushSink for MeshNodeOverflowPushSink {
477    async fn push(
478        &self,
479        hash: [u8; 32],
480        size_bytes: u64,
481        target_node_id: u64,
482    ) -> Result<(), BlobError> {
483        // Map an `OverflowPushAck::Rejected(reason)` /
484        // `OverflowPushAck::OpenChunkFailed` to a typed
485        // BlobError so the controller's `push_errors` counter
486        // gets bumped uniformly. `Accepted` returns Ok.
487        let ack = self
488            .mesh
489            .send_overflow_push(target_node_id, hash, size_bytes)
490            .await?;
491        match ack {
492            OverflowPushAck::Accepted => Ok(()),
493            OverflowPushAck::Rejected(reason) => Err(BlobError::Backend(format!(
494                "overflow push to {target_node_id:#x} rejected: {reason:?}"
495            ))),
496            OverflowPushAck::OpenChunkFailed => Err(BlobError::Backend(format!(
497                "overflow push to {target_node_id:#x} admitted but chunk open failed"
498            ))),
499        }
500    }
501}
502
503/// Sink trait for the actual push action. P3 wires the
504/// [`MeshNode`]-backed implementation that sends an
505/// `OverflowPush` RPC and waits for the durability
506/// watermark; P2 ships the trait + a recorder mock for
507/// unit tests.
508///
509/// `push` is fire-once-per-tick per `(hash, target_node_id)`
510/// pair — the controller dedups by hash before calling the
511/// sink. Idempotent on the receive side anyway (an
512/// already-stored chunk is a no-op store).
513///
514/// [`MeshNode`]: crate::adapter::net::MeshNode
515#[async_trait]
516pub trait OverflowPushSink: Send + Sync {
517    /// Push `hash` (`size_bytes`) to the receive-side peer
518    /// identified by `target_node_id`. Returns `Ok(())` when
519    /// the wire-side acknowledgement landed; `Err(BlobError)`
520    /// when the send failed for any reason (RPC transport
521    /// error, receive-side admission rejection, chunk-open
522    /// failure). The tick driver aggregates errors into the
523    /// `push_errors` counter without disambiguating — wire-
524    /// level counters break out per reason.
525    async fn push(
526        &self,
527        hash: [u8; 32],
528        size_bytes: u64,
529        target_node_id: u64,
530    ) -> Result<(), BlobError>;
531}
532
533/// Active-overflow controller. Borrows the inputs it needs
534/// (local caps, capability index, heat registry, refcount
535/// table, config); the controller itself is stateless. The
536/// hysteresis state lives in the caller as an `AtomicBool`
537/// passed into [`drive_blob_overflow_tick`].
538///
539/// Lifetimes:
540/// - `'a` — the controller's borrows. Typically the operator
541///   constructs the controller per tick inside the scheduler
542///   loop; the borrows are valid for the lifetime of the
543///   tick await.
544pub struct BlobOverflowController<'a> {
545    /// Local node's capability set. Read for the local
546    /// gravity scope (target-selection scope filter) and
547    /// for the overflow-enabled self-check inside
548    /// [`drive_blob_overflow_tick`] (skip the tick until the
549    /// local `dataforts.blob.overflow` tag is visible on the
550    /// snapshot — otherwise every push would round-trip an
551    /// RPC and come back `Rejected(SenderNotOverflowing)`
552    /// while the announce propagates).
553    pub local_caps: &'a CapabilitySet,
554    /// Fold of peer capability sets. The controller walks
555    /// every overflow-enabled peer to score target selection.
556    /// Migrated off the legacy CapabilityIndex per Phase 3b.
557    pub capability_fold: &'a Fold<CapabilityFold>,
558    /// Per-chunk heat registry. The controller walks every
559    /// tracked hash, decays each rate to `now`, and ranks
560    /// candidates coldest-first.
561    pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
562    /// Per-hash refcount + pin table. Candidates are
563    /// filtered against this: only `refcount == 0 &&
564    /// !pinned` hashes are eligible for push in P2. The
565    /// richer "all-references-are-cache" rule (which would
566    /// allow shedding a chunk still held by a greedy cache
567    /// entry) lands when per-source refcount inspection is
568    /// added to [`BlobRefcountTable`].
569    pub refcount: &'a BlobRefcountTable,
570    /// Operator-tunable knobs. Read for `scope`,
571    /// `max_pushes_per_tick`, and the high/low water
572    /// thresholds (consumed by the hysteresis state machine).
573    pub config: &'a OverflowConfig,
574}
575
576impl<'a> BlobOverflowController<'a> {
577    /// Construct a controller from borrows. `new` is sugar
578    /// for the struct literal — operators that prefer the
579    /// builder shape can call this; tests usually use the
580    /// literal for clarity.
581    pub fn new(
582        local_caps: &'a CapabilitySet,
583        capability_fold: &'a Fold<CapabilityFold>,
584        heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
585        refcount: &'a BlobRefcountTable,
586        config: &'a OverflowConfig,
587    ) -> Self {
588        Self {
589            local_caps,
590            capability_fold,
591            heat_registry,
592            refcount,
593            config,
594        }
595    }
596
597    /// Compute every candidate for this tick — coldest first,
598    /// truncated to `config.max_pushes_per_tick`. Convenience
599    /// wrapper around [`Self::candidate_batch`] that drops the
600    /// `no_target_count` companion when the caller only wants
601    /// the push list.
602    pub fn candidates(
603        &self,
604        now: Instant,
605        size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
606    ) -> Vec<BlobOverflowCandidate> {
607        self.candidate_batch(now, size_for_hash).candidates
608    }
609
610    /// Compute candidates + the precise `no_target` accounting
611    /// for this tick. `size_for_hash` is an operator-supplied
612    /// resolver (the controller doesn't know chunk sizes
613    /// directly; `MeshBlobAdapter::stat_chunk` or an equivalent
614    /// answers this).
615    ///
616    /// The function:
617    ///
618    /// 1. Snapshots `(hash, decayed_rate)` from the heat
619    ///    registry under a brief read lock.
620    /// 2. Filters out pinned hashes + hashes with nonzero
621    ///    refcount + hashes whose `size_for_hash` returns
622    ///    `None` (controller can't run the disk-gate without
623    ///    a size; abstain rather than guess).
624    /// 3. Sorts ascending by `(decayed_rate, hash)` — ties
625    ///    broken by hash bytes for determinism.
626    /// 4. For each candidate hash (in cold-first order),
627    ///    walks the capability index for an overflow-
628    ///    enabled peer with `disk_free_gb >= ceil(size /
629    ///    1 GiB)` matching the local gravity scope; picks
630    ///    the peer with the highest `disk_free_gb` (ties
631    ///    broken by lowest `node_id`).
632    /// 5. Counts a hash as `no_target` only when target
633    ///    selection was actually *attempted* and failed —
634    ///    hashes past the `max_pushes_per_tick` truncation
635    ///    point were never tried and are NOT no-target.
636    /// 6. Stops walking once
637    ///    `candidates.len() >= config.max_pushes_per_tick`.
638    pub fn candidate_batch(
639        &self,
640        now: Instant,
641        size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
642    ) -> OverflowCandidateBatch {
643        // Step 1: snapshot heat-registry entries.
644        let snap: Vec<([u8; 32], f64)> = {
645            let guard = self.heat_registry.lock();
646            guard
647                .iter()
648                .map(|(h, c)| {
649                    // Compute decayed rate without mutating
650                    // the counter — keeps the iteration
651                    // read-only so a concurrent fetch path
652                    // bumping a different hash isn't
653                    // blocked.
654                    let elapsed = now.saturating_duration_since(c.last_update());
655                    let half_life_s = c.half_life().as_secs_f64();
656                    let rate = if half_life_s == 0.0 || c.rate() == 0.0 {
657                        c.rate()
658                    } else {
659                        let half_lives = elapsed.as_secs_f64() / half_life_s;
660                        if half_lives > 64.0 {
661                            0.0
662                        } else {
663                            c.rate() * 0.5_f64.powf(half_lives)
664                        }
665                    };
666                    (*h, rate)
667                })
668                .collect()
669        };
670
671        // Step 2: filter on pin / refcount / size. A hash
672        // with `refcount > 0` is not pushable in P2 — the
673        // per-source refcount split (cache vs fold) is a
674        // future refinement.
675        let mut filtered: Vec<([u8; 32], f64, u64)> = snap
676            .into_iter()
677            .filter_map(|(h, rate)| {
678                let entry = self.refcount.get(&h)?;
679                if entry.pinned {
680                    return None;
681                }
682                if entry.refcount > 0 {
683                    return None;
684                }
685                let size = size_for_hash(h)?;
686                Some((h, rate, size))
687            })
688            .collect();
689
690        // Step 3: stable sort coldest-first. Ties broken by
691        // hash bytes for determinism. NaN is impossible
692        // here (decayed rate is always finite + non-negative
693        // when input is finite, and the heat-counter ensures
694        // finite input), so `partial_cmp` is safe.
695        filtered.sort_by(|a, b| {
696            a.1.partial_cmp(&b.1)
697                .unwrap_or(std::cmp::Ordering::Equal)
698                .then_with(|| a.0.cmp(&b.0))
699        });
700
701        // Step 4-5: target selection per hash. `no_target`
702        // counts only hashes we ACTUALLY tried — the loop
703        // breaks at `max_pushes_per_tick` so the tail of
704        // `filtered` is never attempted and must not bump
705        // the counter.
706        let local_gravity = GravityCapability::from_capability_set(self.local_caps);
707        let mut candidates: Vec<BlobOverflowCandidate> = Vec::new();
708        let mut no_target_count: usize = 0;
709        for (hash, cold_rate, size_bytes) in filtered {
710            match self.pick_target(size_bytes, local_gravity.scope) {
711                Some((target_node_id, target_caps)) => {
712                    candidates.push(BlobOverflowCandidate {
713                        hash,
714                        size_bytes,
715                        target_node_id,
716                        target_caps,
717                        cold_rate,
718                    });
719                }
720                None => {
721                    no_target_count += 1;
722                }
723            }
724            if candidates.len() >= self.config.max_pushes_per_tick {
725                break;
726            }
727        }
728        OverflowCandidateBatch {
729            candidates,
730            no_target_count,
731        }
732    }
733
734    /// Find the best overflow-receiver peer for a chunk of
735    /// `size_bytes`. Filter rule:
736    ///
737    /// - `cap.blob.storage = true`
738    /// - `cap.blob.overflow_enabled = true`
739    /// - `cap.blob.disk_free_gb >= ceil(size / 1 GiB)`
740    /// - peer's `cap.gravity.scope` covers `local_scope`
741    ///   (the local node won't push outside its own scope
742    ///   bound)
743    /// - peer is not `dataforts:blob-storage-unhealthy`
744    ///
745    /// Ranking: highest `disk_free_gb` wins (greedy spread
746    /// across peers); ties broken by lowest `node_id` for
747    /// determinism.
748    fn pick_target(
749        &self,
750        size_bytes: u64,
751        local_scope: TopologyScope,
752    ) -> Option<(u64, CapabilitySet)> {
753        let required_gb = size_bytes.div_ceil(1 << 30);
754        let mut best: Option<(u64, u64, CapabilitySet)> = None; // (disk_free_gb, node_id, caps)
755        let publishers: Vec<u64> = self
756            .capability_fold
757            .with_state(|state| state.by_node.keys().copied().collect());
758        for node_id in publishers {
759            // Synthesize a CapabilitySet from the fold's tag set
760            // for `node_id`. The downstream BlobCapability /
761            // GravityCapability projections read tags via
762            // `Tag::AxisPresent` / `Tag::AxisValue` patterns that
763            // round-trip through CapabilitySet::add_tag, so
764            // tag-based reads (storage / overflow / scope /
765            // disk_total_gb / disk_free_gb) work identically.
766            let caps = capability_bridge::synthesize_capability_set(self.capability_fold, node_id);
767            let peer_blob = BlobCapability::from_capability_set(&caps);
768            if !peer_blob.storage || !peer_blob.overflow_enabled {
769                continue;
770            }
771            if peer_blob.disk_free_gb < required_gb {
772                continue;
773            }
774            if is_blob_storage_unhealthy(&caps) {
775                continue;
776            }
777            // Local-scope-covers-peer-scope check. We're
778            // pushing OUT, so the local node's scope bound
779            // is the gate — peers outside our scope can't
780            // receive our overflow.
781            let peer_gravity = GravityCapability::from_capability_set(&caps);
782            if !scope_covers(local_scope, peer_gravity.scope) {
783                continue;
784            }
785            // Update best by disk_free_gb desc, then
786            // node_id asc.
787            match &best {
788                None => best = Some((peer_blob.disk_free_gb, node_id, caps)),
789                Some((d, n, _)) => {
790                    let is_better = peer_blob.disk_free_gb > *d
791                        || (peer_blob.disk_free_gb == *d && node_id < *n);
792                    if is_better {
793                        best = Some((peer_blob.disk_free_gb, node_id, caps));
794                    }
795                }
796            }
797        }
798        best.map(|(_, node_id, caps)| (node_id, caps))
799    }
800}
801
802/// Operator-supplied environmental borrows the
803/// [`super::mesh::MeshBlobAdapter::drive_overflow_tick`]
804/// convenience method threads through. Decouples the
805/// per-tick wiring (capability index, heat registry, sink,
806/// local caps, disk stats) from the adapter so the adapter
807/// stays a stateless slot-in.
808///
809/// All fields are borrows. The lifetime parameter `'a` ties
810/// the context to a single tick's await; operators
811/// reconstruct the context each tick from the live state.
812pub struct OverflowTickContext<'a> {
813    /// The mesh's capability fold — read for target peer
814    /// selection (overflow tag + scope + disk_free + health
815    /// gate). Migrated off the legacy CapabilityIndex per
816    /// Phase 3b.
817    pub capability_fold: &'a Fold<CapabilityFold>,
818    /// Per-chunk heat registry. The controller walks every
819    /// tracked hash, decays each rate to `now`, and ranks
820    /// candidates coldest-first.
821    pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
822    /// Sink for the actual push action. Production wiring
823    /// uses [`MeshNodeOverflowPushSink`]; tests use a
824    /// recorder.
825    pub sink: &'a dyn OverflowPushSink,
826    /// Local caps snapshot — read for the local gravity
827    /// scope (target-selection scope filter).
828    pub local_caps: &'a CapabilitySet,
829    /// Local disk usage in bytes. Numerator of the
830    /// `disk_ratio` hysteresis input.
831    pub disk_used_bytes: u64,
832    /// Local disk total in bytes. Denominator. `0`
833    /// short-circuits the tick.
834    pub disk_total_bytes: u64,
835}
836
837/// Per-tick observables threaded through
838/// [`drive_blob_overflow_tick`]. Bundles the inputs that
839/// change every tick (disk stats + hysteresis handle + the
840/// clock value) so the tick driver stays a 4-arg signature
841/// even as the inputs grow.
842///
843/// Borrow-only: nothing here is owned. The hysteresis atomic
844/// is shared with the adapter's `overflow_active` field
845/// (P4); operator-driven tests can wire a fresh
846/// `AtomicBool` for isolation. `now` is captured at the
847/// tick call site so deterministic-simulation harnesses can
848/// inject a fixed `Instant` without mocking the system clock.
849pub struct OverflowTickObservation<'a> {
850    /// Local disk usage in bytes — the numerator of the
851    /// `disk_ratio` hysteresis input. `disk_used > disk_total`
852    /// is clamped inside the driver (defense against
853    /// misconfiguration).
854    pub disk_used_bytes: u64,
855    /// Local disk total in bytes — the denominator. `0`
856    /// short-circuits the tick to "never fire" (an
857    /// unconfigured disk cap shouldn't trigger pushes the
858    /// moment any chunk lands).
859    pub disk_total_bytes: u64,
860    /// Shared hysteresis state. Read at tick start, updated
861    /// by [`step_overflow_hysteresis`] to the post-tick
862    /// state. Wired to [`super::mesh::MeshBlobAdapter`]'s
863    /// `overflow_active` field in the production path.
864    pub hysteresis_active: &'a AtomicBool,
865    /// Clock value used to decay heat-registry rates. Pass
866    /// `Instant::now()` in production; tests can fix this
867    /// for reproducibility.
868    pub now: Instant,
869}
870
871/// Drive one overflow tick.
872///
873/// Composes the hysteresis state machine + the controller's
874/// candidate computation + the sink's push action into a
875/// single async entry point. Operators call this from a
876/// periodic task at `config.tick_interval_ms` cadence; the
877/// function is idempotent against repeated calls (the
878/// hysteresis state filters out spurious ticks).
879///
880/// Returns a [`BlobOverflowTickReport`] with per-reason
881/// counters. Operators aggregate the report into Prometheus
882/// metrics via
883/// [`super::metrics::BlobMetrics::record_overflow_tick`].
884pub async fn drive_blob_overflow_tick(
885    controller: &BlobOverflowController<'_>,
886    sink: &dyn OverflowPushSink,
887    observation: OverflowTickObservation<'_>,
888    size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
889) -> BlobOverflowTickReport {
890    let OverflowTickObservation {
891        disk_used_bytes,
892        disk_total_bytes,
893        hysteresis_active,
894        now,
895    } = observation;
896    let mut report = BlobOverflowTickReport::default();
897    let disk_ratio = if disk_total_bytes == 0 {
898        // Adapter without a configured disk-cap reports
899        // `disk_total = 0`; the safe default is "never
900        // fire" rather than "always fire" (the latter
901        // would push the moment any chunk lands on a
902        // mis-configured node).
903        0.0
904    } else {
905        disk_used_bytes as f64 / disk_total_bytes as f64
906    };
907    report.disk_ratio_at_start = disk_ratio;
908    report.was_active_at_start = hysteresis_active.load(Ordering::Relaxed);
909
910    let fire = step_overflow_hysteresis(
911        hysteresis_active,
912        disk_ratio,
913        controller.config.high_water_ratio,
914        controller.config.low_water_ratio,
915    );
916    report.is_active_at_end = fire;
917
918    // Master switch gate. Even if disk crossed the high
919    // water mark, a disabled overflow config means we
920    // never push. Pin this so toggling `enabled = false`
921    // is a hard stop.
922    if !controller.config.enabled || !fire {
923        report.disk_ratio_at_end = disk_ratio;
924        return report;
925    }
926
927    // Sender-side self-check (plan § "Open design questions"
928    // #5): the controller's `config.enabled` says the operator
929    // wants overflow on, but the wire-level contract is
930    // gated on `cap.blob.overflow` propagating through the
931    // capability index. If the local node hasn't yet
932    // advertised the tag (announce hasn't fired since
933    // `set_overflow_enabled(true)`, or `local_caps` was
934    // never rebuilt), every push would round-trip an RPC
935    // just to get rejected `SenderNotOverflowing` by every
936    // peer. Skip the tick cleanly until the tag is visible
937    // on the sender's own caps snapshot; the next
938    // `announce_capabilities` rebroadcast resolves the race.
939    let local_blob = BlobCapability::from_capability_set(controller.local_caps);
940    if !local_blob.overflow_enabled {
941        tracing::debug!(
942            "blob overflow: master switch on but local cap.blob.overflow not yet advertised; \
943             skipping tick until announce_capabilities propagates the tag"
944        );
945        report.disk_ratio_at_end = disk_ratio;
946        return report;
947    }
948
949    // Compute candidates in one pass. `candidate_batch`
950    // tracks the no-target count inside its target-selection
951    // loop, so truncated-by-`max_pushes_per_tick` hashes never
952    // bump the counter (they were never tried).
953    let batch = controller.candidate_batch(now, &size_for_hash);
954    report.rejected_no_target = batch.no_target_count as u64;
955
956    // Fire pushes. `max_pushes_per_tick = 0` is a valid
957    // "trigger only, no real pushes" mode — the candidates
958    // list will be empty so we drop straight through.
959    for candidate in batch.candidates {
960        match sink
961            .push(
962                candidate.hash,
963                candidate.size_bytes,
964                candidate.target_node_id,
965            )
966            .await
967        {
968            Ok(()) => {
969                report.admitted += 1;
970                report.pushed_bytes = report.pushed_bytes.saturating_add(candidate.size_bytes);
971            }
972            Err(e) => {
973                tracing::trace!(
974                    error = ?e,
975                    hash = ?candidate.hash,
976                    target = candidate.target_node_id,
977                    "blob overflow: push failed; counted"
978                );
979                report.push_errors += 1;
980            }
981        }
982    }
983
984    // disk_ratio_at_end stays equal to start in P2; P3
985    // wires the durability watermark + a fresh disk-stat
986    // poll to surface the post-tick reclaim.
987    report.disk_ratio_at_end = disk_ratio;
988    report
989}
990
991/// `local` scope covers `peer` iff a push from a node with
992/// scope `local` can land on a node with scope `peer`. The
993/// rule mirrors the migration controller's
994/// `scope_at_least_as_narrow`: a Zone-scoped local node can
995/// push to Zone / Region / Mesh peers (the peer's scope
996/// covers the local one), but a Mesh-scoped local node
997/// can't push to a Node-scoped peer (the peer's scope is
998/// narrower and won't accept the cross-scope artifact).
999///
1000/// `local == Mesh` covers any peer scope (mesh is the widest
1001/// scope — any peer is reachable). `local == Node` is the
1002/// degenerate case: only same-node receivers qualify; in
1003/// practice this is the "never push" config.
1004fn scope_covers(local: TopologyScope, peer: TopologyScope) -> bool {
1005    use TopologyScope::*;
1006    matches!(
1007        (local, peer),
1008        (Mesh, _) | (Region, Region | Mesh) | (Zone, Zone | Region | Mesh) | (Node, Node)
1009    )
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::*;
1015    use crate::adapter::net::behavior::capability::CapabilityAnnouncement;
1016    use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
1017    use crate::adapter::net::identity::EntityId;
1018    use std::sync::atomic::AtomicU64;
1019    use std::time::Duration;
1020
1021    fn hex64(byte: u8) -> ([u8; 32], String) {
1022        let mut h = [0u8; 32];
1023        h.fill(byte);
1024        let hex: String = h.iter().map(|b| format!("{:02x}", b)).collect();
1025        (h, hex)
1026    }
1027
1028    /// Build a `CapabilitySet` for an overflow-enabled peer
1029    /// with `disk_free_gb` headroom and mesh-wide gravity.
1030    fn overflow_peer_caps(disk_free_gb: u64) -> CapabilitySet {
1031        CapabilitySet::new()
1032            .add_tag("dataforts.blob.storage")
1033            .add_tag("dataforts.blob.disk_total_gb=100")
1034            .add_tag(format!("dataforts.blob.disk_free_gb={}", disk_free_gb))
1035            .add_tag("dataforts.blob.overflow")
1036            .add_tag("dataforts.gravity.enabled")
1037            .add_tag("dataforts.gravity.scope=mesh")
1038            .add_tag("dataforts.gravity.proximity=128")
1039    }
1040
1041    /// Local node config: overflow-enabled, mesh scope.
1042    fn overflow_enabled_local_caps() -> CapabilitySet {
1043        CapabilitySet::new()
1044            .add_tag("dataforts.blob.storage")
1045            .add_tag("dataforts.blob.overflow")
1046            .add_tag("dataforts.gravity.enabled")
1047            .add_tag("dataforts.gravity.scope=mesh")
1048            .add_tag("dataforts.gravity.proximity=128")
1049    }
1050
1051    /// One recorded push call captured by the mock sink:
1052    /// `(hash, size, target_node_id)`. Named so the type
1053    /// stays readable in the `OverflowPushRecorder` field
1054    /// + the `calls()` snapshot return.
1055    type RecordedPushCall = ([u8; 32], u64, u64);
1056
1057    /// Shared call-log container the recorder mutates from
1058    /// inside an `&self` push method. `Arc<Mutex<Vec<_>>>`
1059    /// across clones so a test can hand a clone to the
1060    /// sink + inspect from the test body.
1061    type RecordedCallLog = Arc<parking_lot::Mutex<Vec<RecordedPushCall>>>;
1062
1063    /// Recorder sink — records every push call's
1064    /// `(hash, size, target)` tuple. The `fail_count` toggle
1065    /// lets tests inject sink errors to exercise the
1066    /// `push_errors` counter.
1067    struct OverflowPushRecorder {
1068        calls: RecordedCallLog,
1069        fail_count: Arc<AtomicU64>,
1070    }
1071
1072    impl OverflowPushRecorder {
1073        fn new() -> Self {
1074            Self {
1075                calls: Arc::new(parking_lot::Mutex::new(Vec::new())),
1076                fail_count: Arc::new(AtomicU64::new(0)),
1077            }
1078        }
1079
1080        fn calls(&self) -> Vec<RecordedPushCall> {
1081            self.calls.lock().clone()
1082        }
1083    }
1084
1085    #[async_trait]
1086    impl OverflowPushSink for OverflowPushRecorder {
1087        async fn push(
1088            &self,
1089            hash: [u8; 32],
1090            size_bytes: u64,
1091            target_node_id: u64,
1092        ) -> Result<(), BlobError> {
1093            if self.fail_count.load(Ordering::Relaxed) > 0 {
1094                self.fail_count.fetch_sub(1, Ordering::Relaxed);
1095                return Err(BlobError::NotFound("simulated push failure".to_string()));
1096            }
1097            self.calls.lock().push((hash, size_bytes, target_node_id));
1098            Ok(())
1099        }
1100    }
1101
1102    /// Build a `BlobHeatRegistry` with a list of `(hash, rate)`
1103    /// pairs at the supplied `now` instant. Each entry is
1104    /// freshly seeded and bumped to its target rate via direct
1105    /// counter access.
1106    fn heat_registry_with(
1107        now: Instant,
1108        entries: &[([u8; 32], f64)],
1109    ) -> Arc<parking_lot::Mutex<BlobHeatRegistry>> {
1110        let mut reg = BlobHeatRegistry::new();
1111        for (hash, rate) in entries {
1112            let counter = reg.entry_mut(*hash, Duration::from_secs(60), now);
1113            // Bump `*rate` times to reach the target rate;
1114            // each bump adds 1.0 after decay. `now` is the
1115            // same for every bump so no decay happens.
1116            for _ in 0..(*rate as usize) {
1117                counter.bump(now);
1118            }
1119        }
1120        Arc::new(parking_lot::Mutex::new(reg))
1121    }
1122
1123    /// Refcount table where every supplied hash is
1124    /// `refcount = 0, !pinned` — eligible for overflow.
1125    /// `store_observed` is the cheapest way to land an
1126    /// entry at refcount 0 + a recorded `first_seen` time.
1127    fn refcount_with_zero(hashes: &[[u8; 32]], now_ms: u64) -> BlobRefcountTable {
1128        let rc = BlobRefcountTable::new();
1129        for h in hashes {
1130            rc.store_observed(*h, 0, now_ms);
1131        }
1132        rc
1133    }
1134
1135    fn cap_index_with(peers: &[(u64, [u8; 32], CapabilitySet)]) -> Fold<CapabilityFold> {
1136        let fold = Fold::<CapabilityFold>::with_sweep_interval(std::time::Duration::ZERO);
1137        for (idx, (node_id, entity_bytes, caps)) in peers.iter().enumerate() {
1138            let entity = EntityId::from_bytes(*entity_bytes);
1139            capability_bridge::apply_legacy_announcement(
1140                &fold,
1141                CapabilityAnnouncement::new(*node_id, entity, 1 + idx as u64, caps.clone()),
1142            )
1143            .expect("apply legacy announcement in fixture");
1144        }
1145        fold
1146    }
1147
1148    // ========================================================================
1149    // step_overflow_hysteresis (pure-logic state machine)
1150    // ========================================================================
1151
1152    #[test]
1153    fn hysteresis_fires_above_high_water() {
1154        let active = AtomicBool::new(false);
1155        assert!(step_overflow_hysteresis(&active, 0.90, 0.85, 0.70));
1156        assert!(active.load(Ordering::Relaxed));
1157    }
1158
1159    #[test]
1160    fn hysteresis_clears_below_low_water() {
1161        let active = AtomicBool::new(true);
1162        assert!(!step_overflow_hysteresis(&active, 0.65, 0.85, 0.70));
1163        assert!(!active.load(Ordering::Relaxed));
1164    }
1165
1166    #[test]
1167    fn hysteresis_holds_state_in_band() {
1168        // Between low (0.70) and high (0.85): hold prior
1169        // state regardless of which boundary was last
1170        // crossed.
1171        let active = AtomicBool::new(true);
1172        assert!(step_overflow_hysteresis(&active, 0.80, 0.85, 0.70));
1173        assert!(active.load(Ordering::Relaxed));
1174
1175        let inactive = AtomicBool::new(false);
1176        assert!(!step_overflow_hysteresis(&inactive, 0.80, 0.85, 0.70));
1177        assert!(!inactive.load(Ordering::Relaxed));
1178    }
1179
1180    #[test]
1181    fn hysteresis_boundary_inclusive() {
1182        // disk_ratio == high_water fires (>=);
1183        // disk_ratio == low_water clears (<=).
1184        let active = AtomicBool::new(false);
1185        assert!(step_overflow_hysteresis(&active, 0.85, 0.85, 0.70));
1186        let active2 = AtomicBool::new(true);
1187        assert!(!step_overflow_hysteresis(&active2, 0.70, 0.85, 0.70));
1188    }
1189
1190    // ========================================================================
1191    // BlobOverflowController::candidates
1192    // ========================================================================
1193
1194    #[test]
1195    fn controller_candidates_returns_coldest_first() {
1196        // Three hashes with rates 0.0 / 1.0 / 5.0 →
1197        // ordering A (cold) / B (warm) / C (hot).
1198        let now = Instant::now();
1199        let (a, _) = hex64(0xAA);
1200        let (b, _) = hex64(0xBB);
1201        let (c, _) = hex64(0xCC);
1202        let heat = heat_registry_with(now, &[(a, 0.0), (b, 1.0), (c, 5.0)]);
1203        let refcount = refcount_with_zero(&[a, b, c], 1_000_000);
1204        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1205        let fold = cap_index_with(&[peer]);
1206        let local = overflow_enabled_local_caps();
1207        let cfg = OverflowConfig {
1208            enabled: true,
1209            max_pushes_per_tick: 16,
1210            ..Default::default()
1211        };
1212        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1213
1214        let cands = controller.candidates(now, |_| Some(1024));
1215        assert_eq!(cands.len(), 3);
1216        // a (rate 0.0) first; c (rate 5.0) last.
1217        assert_eq!(cands[0].hash, a);
1218        assert_eq!(cands[2].hash, c);
1219    }
1220
1221    #[test]
1222    fn controller_skips_pinned_hashes() {
1223        let now = Instant::now();
1224        let (a, _) = hex64(0xAA);
1225        let (b, _) = hex64(0xBB);
1226        let heat = heat_registry_with(now, &[(a, 0.0), (b, 0.0)]);
1227        let refcount = BlobRefcountTable::new();
1228        refcount.store_observed(a, 0, 1_000_000);
1229        refcount.pin(a, 1_000_000);
1230        refcount.store_observed(b, 0, 1_000_000);
1231        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1232        let fold = cap_index_with(&[peer]);
1233        let local = overflow_enabled_local_caps();
1234        let cfg = OverflowConfig {
1235            enabled: true,
1236            max_pushes_per_tick: 16,
1237            ..Default::default()
1238        };
1239        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1240
1241        let cands = controller.candidates(now, |_| Some(1024));
1242        // Pinned `a` skipped; only unpinned `b` surfaces.
1243        assert_eq!(cands.len(), 1);
1244        assert_eq!(cands[0].hash, b);
1245    }
1246
1247    #[test]
1248    fn controller_skips_hashes_with_nonzero_refcount() {
1249        let now = Instant::now();
1250        let (a, _) = hex64(0xAA);
1251        let heat = heat_registry_with(now, &[(a, 0.0)]);
1252        let refcount = BlobRefcountTable::new();
1253        refcount.incr(a, 1_000_000); // refcount = 1, not droppable
1254        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1255        let fold = cap_index_with(&[peer]);
1256        let local = overflow_enabled_local_caps();
1257        let cfg = OverflowConfig {
1258            enabled: true,
1259            max_pushes_per_tick: 16,
1260            ..Default::default()
1261        };
1262        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1263        assert!(controller.candidates(now, |_| Some(1024)).is_empty());
1264    }
1265
1266    #[test]
1267    fn controller_picks_highest_disk_free_target() {
1268        // Two peers, both overflow-enabled. Peer 99 has 40
1269        // GiB free; peer 88 has 80 GiB free. Greedy spread
1270        // → peer 88 wins.
1271        let now = Instant::now();
1272        let (a, _) = hex64(0xAA);
1273        let heat = heat_registry_with(now, &[(a, 0.0)]);
1274        let refcount = refcount_with_zero(&[a], 1_000_000);
1275        let peer_low = (99u64, [0x11; 32], overflow_peer_caps(40));
1276        let peer_high = (88u64, [0x22; 32], overflow_peer_caps(80));
1277        let fold = cap_index_with(&[peer_low, peer_high]);
1278        let local = overflow_enabled_local_caps();
1279        let cfg = OverflowConfig {
1280            enabled: true,
1281            max_pushes_per_tick: 16,
1282            ..Default::default()
1283        };
1284        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1285
1286        let cands = controller.candidates(now, |_| Some(1024));
1287        assert_eq!(cands.len(), 1);
1288        assert_eq!(cands[0].target_node_id, 88);
1289    }
1290
1291    #[test]
1292    fn controller_skips_peers_without_overflow_tag() {
1293        // Peer has storage + disk + scope BUT no overflow
1294        // tag → not a valid target.
1295        let now = Instant::now();
1296        let (a, _) = hex64(0xAA);
1297        let heat = heat_registry_with(now, &[(a, 0.0)]);
1298        let refcount = refcount_with_zero(&[a], 1_000_000);
1299        let no_overflow_peer_caps = CapabilitySet::new()
1300            .add_tag("dataforts.blob.storage")
1301            .add_tag("dataforts.blob.disk_total_gb=100")
1302            .add_tag("dataforts.blob.disk_free_gb=80")
1303            .add_tag("dataforts.gravity.enabled")
1304            .add_tag("dataforts.gravity.scope=mesh")
1305            .add_tag("dataforts.gravity.proximity=128");
1306        let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
1307        let fold = cap_index_with(&[peer]);
1308        let local = overflow_enabled_local_caps();
1309        let cfg = OverflowConfig {
1310            enabled: true,
1311            max_pushes_per_tick: 16,
1312            ..Default::default()
1313        };
1314        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1315        assert!(controller.candidates(now, |_| Some(1024)).is_empty());
1316    }
1317
1318    #[test]
1319    fn controller_skips_peers_with_insufficient_disk() {
1320        // Peer has 1 GiB free; we're pushing a 4 GiB blob →
1321        // no target.
1322        let now = Instant::now();
1323        let (a, _) = hex64(0xAA);
1324        let heat = heat_registry_with(now, &[(a, 0.0)]);
1325        let refcount = refcount_with_zero(&[a], 1_000_000);
1326        let peer = (99u64, [0x11; 32], overflow_peer_caps(1));
1327        let fold = cap_index_with(&[peer]);
1328        let local = overflow_enabled_local_caps();
1329        let cfg = OverflowConfig {
1330            enabled: true,
1331            max_pushes_per_tick: 16,
1332            ..Default::default()
1333        };
1334        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1335        let four_gib: u64 = 4 * (1 << 30);
1336        assert!(controller.candidates(now, |_| Some(four_gib)).is_empty());
1337    }
1338
1339    #[test]
1340    fn controller_truncates_to_max_pushes_per_tick() {
1341        let now = Instant::now();
1342        let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
1343        let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
1344        let heat = heat_registry_with(now, &entries);
1345        let refcount = refcount_with_zero(&hashes, 1_000_000);
1346        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1347        let fold = cap_index_with(&[peer]);
1348        let local = overflow_enabled_local_caps();
1349        let cfg = OverflowConfig {
1350            enabled: true,
1351            max_pushes_per_tick: 2,
1352            ..Default::default()
1353        };
1354        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1355        let cands = controller.candidates(now, |_| Some(1024));
1356        assert_eq!(
1357            cands.len(),
1358            2,
1359            "max_pushes_per_tick caps the candidate list"
1360        );
1361    }
1362
1363    // ========================================================================
1364    // drive_blob_overflow_tick — end-to-end against the recorder sink
1365    // ========================================================================
1366
1367    #[tokio::test]
1368    async fn tick_no_op_when_below_low_water() {
1369        let now = Instant::now();
1370        let (a, _) = hex64(0xAA);
1371        let heat = heat_registry_with(now, &[(a, 0.0)]);
1372        let refcount = refcount_with_zero(&[a], 1_000_000);
1373        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1374        let fold = cap_index_with(&[peer]);
1375        let local = overflow_enabled_local_caps();
1376        let cfg = OverflowConfig {
1377            enabled: true,
1378            ..Default::default()
1379        };
1380        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1381        let active = AtomicBool::new(false);
1382        let sink = OverflowPushRecorder::new();
1383
1384        // disk_ratio = 0.50 — below low_water (0.70).
1385        let report = drive_blob_overflow_tick(
1386            &controller,
1387            &sink,
1388            OverflowTickObservation {
1389                disk_used_bytes: 500,
1390                disk_total_bytes: 1000,
1391                hysteresis_active: &active,
1392                now,
1393            },
1394            |_| Some(1024),
1395        )
1396        .await;
1397        assert_eq!(report.admitted, 0);
1398        assert!(!report.is_active_at_end);
1399        assert_eq!(sink.calls().len(), 0);
1400    }
1401
1402    #[tokio::test]
1403    async fn tick_fires_above_high_water_and_pushes_to_recorder() {
1404        let now = Instant::now();
1405        let (a, _) = hex64(0xAA);
1406        let heat = heat_registry_with(now, &[(a, 0.0)]);
1407        let refcount = refcount_with_zero(&[a], 1_000_000);
1408        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1409        let fold = cap_index_with(&[peer]);
1410        let local = overflow_enabled_local_caps();
1411        let cfg = OverflowConfig {
1412            enabled: true,
1413            ..Default::default()
1414        };
1415        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1416        let active = AtomicBool::new(false);
1417        let sink = OverflowPushRecorder::new();
1418
1419        // disk_ratio = 0.90 — above high_water (0.85).
1420        let report = drive_blob_overflow_tick(
1421            &controller,
1422            &sink,
1423            OverflowTickObservation {
1424                disk_used_bytes: 900,
1425                disk_total_bytes: 1000,
1426                hysteresis_active: &active,
1427                now,
1428            },
1429            |_| Some(1024),
1430        )
1431        .await;
1432        assert_eq!(report.admitted, 1);
1433        assert!(report.is_active_at_end);
1434        assert_eq!(report.pushed_bytes, 1024);
1435        let calls = sink.calls();
1436        assert_eq!(calls.len(), 1);
1437        assert_eq!(calls[0].0, a);
1438        assert_eq!(calls[0].2, 99);
1439    }
1440
1441    #[tokio::test]
1442    async fn tick_master_switch_off_skips_pushes_even_above_high_water() {
1443        let now = Instant::now();
1444        let (a, _) = hex64(0xAA);
1445        let heat = heat_registry_with(now, &[(a, 0.0)]);
1446        let refcount = refcount_with_zero(&[a], 1_000_000);
1447        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1448        let fold = cap_index_with(&[peer]);
1449        let local = overflow_enabled_local_caps();
1450        let cfg = OverflowConfig {
1451            enabled: false, // master switch off
1452            ..Default::default()
1453        };
1454        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1455        let active = AtomicBool::new(false);
1456        let sink = OverflowPushRecorder::new();
1457
1458        let report = drive_blob_overflow_tick(
1459            &controller,
1460            &sink,
1461            OverflowTickObservation {
1462                disk_used_bytes: 900,
1463                disk_total_bytes: 1000,
1464                hysteresis_active: &active,
1465                now,
1466            },
1467            |_| Some(1024),
1468        )
1469        .await;
1470        // Hysteresis still transitions (the disk state machine
1471        // is independent of the master switch — operators
1472        // dashboarding the active gauge should see it climb
1473        // before they enable). Pushes don't fire.
1474        assert!(report.is_active_at_end);
1475        assert_eq!(report.admitted, 0);
1476        assert_eq!(sink.calls().len(), 0);
1477    }
1478
1479    #[tokio::test]
1480    async fn tick_records_push_errors_when_sink_fails() {
1481        let now = Instant::now();
1482        let (a, _) = hex64(0xAA);
1483        let heat = heat_registry_with(now, &[(a, 0.0)]);
1484        let refcount = refcount_with_zero(&[a], 1_000_000);
1485        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1486        let fold = cap_index_with(&[peer]);
1487        let local = overflow_enabled_local_caps();
1488        let cfg = OverflowConfig {
1489            enabled: true,
1490            ..Default::default()
1491        };
1492        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1493        let active = AtomicBool::new(false);
1494        let sink = OverflowPushRecorder::new();
1495        sink.fail_count.store(1, Ordering::Relaxed);
1496
1497        let report = drive_blob_overflow_tick(
1498            &controller,
1499            &sink,
1500            OverflowTickObservation {
1501                disk_used_bytes: 900,
1502                disk_total_bytes: 1000,
1503                hysteresis_active: &active,
1504                now,
1505            },
1506            |_| Some(1024),
1507        )
1508        .await;
1509        assert_eq!(report.admitted, 0);
1510        assert_eq!(report.push_errors, 1);
1511        assert_eq!(report.pushed_bytes, 0);
1512    }
1513
1514    #[tokio::test]
1515    async fn tick_records_no_target_when_no_overflow_enabled_peer() {
1516        let now = Instant::now();
1517        let (a, _) = hex64(0xAA);
1518        let heat = heat_registry_with(now, &[(a, 0.0)]);
1519        let refcount = refcount_with_zero(&[a], 1_000_000);
1520        // Peer has no overflow tag.
1521        let no_overflow_peer_caps = CapabilitySet::new()
1522            .add_tag("dataforts.blob.storage")
1523            .add_tag("dataforts.blob.disk_total_gb=100")
1524            .add_tag("dataforts.blob.disk_free_gb=80")
1525            .add_tag("dataforts.gravity.enabled")
1526            .add_tag("dataforts.gravity.scope=mesh");
1527        let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
1528        let fold = cap_index_with(&[peer]);
1529        let local = overflow_enabled_local_caps();
1530        let cfg = OverflowConfig {
1531            enabled: true,
1532            ..Default::default()
1533        };
1534        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1535        let active = AtomicBool::new(false);
1536        let sink = OverflowPushRecorder::new();
1537
1538        let report = drive_blob_overflow_tick(
1539            &controller,
1540            &sink,
1541            OverflowTickObservation {
1542                disk_used_bytes: 900,
1543                disk_total_bytes: 1000,
1544                hysteresis_active: &active,
1545                now,
1546            },
1547            |_| Some(1024),
1548        )
1549        .await;
1550        assert_eq!(report.admitted, 0);
1551        assert_eq!(report.rejected_no_target, 1);
1552        assert_eq!(sink.calls().len(), 0);
1553    }
1554
1555    #[tokio::test]
1556    async fn tick_skips_when_local_overflow_tag_not_advertised() {
1557        // `OverflowConfig.enabled = true` but `local_caps`
1558        // doesn't carry `dataforts.blob.overflow` — the
1559        // operator flipped the master switch on the adapter
1560        // but `announce_capabilities` hasn't rebuilt the
1561        // local caps snapshot yet. Sender-side self-check
1562        // (plan § Open design Q #5) must skip the tick:
1563        // every push would round-trip an RPC and come back
1564        // `Rejected(SenderNotOverflowing)`, wasting wire
1565        // and bumping `push_errors` without making progress.
1566        let now = Instant::now();
1567        let (a, _) = hex64(0xAA);
1568        let heat = heat_registry_with(now, &[(a, 0.0)]);
1569        let refcount = refcount_with_zero(&[a], 1_000_000);
1570        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1571        let fold = cap_index_with(&[peer]);
1572        // Local caps WITHOUT `dataforts.blob.overflow` tag.
1573        let local = CapabilitySet::new()
1574            .add_tag("dataforts.blob.storage")
1575            .add_tag("dataforts.gravity.enabled")
1576            .add_tag("dataforts.gravity.scope=mesh")
1577            .add_tag("dataforts.gravity.proximity=128");
1578        let cfg = OverflowConfig {
1579            enabled: true,
1580            ..Default::default()
1581        };
1582        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1583        let active = AtomicBool::new(false);
1584        let sink = OverflowPushRecorder::new();
1585
1586        let report = drive_blob_overflow_tick(
1587            &controller,
1588            &sink,
1589            OverflowTickObservation {
1590                disk_used_bytes: 900,
1591                disk_total_bytes: 1000,
1592                hysteresis_active: &active,
1593                now,
1594            },
1595            |_| Some(1024),
1596        )
1597        .await;
1598        // Hysteresis still flips (disk genuinely crossed
1599        // the high-water mark — the gauge should reflect
1600        // that), but no pushes fire and no rejections
1601        // count.
1602        assert!(report.is_active_at_end);
1603        assert_eq!(report.admitted, 0);
1604        assert_eq!(report.push_errors, 0);
1605        assert_eq!(report.rejected_no_target, 0);
1606        assert_eq!(sink.calls().len(), 0);
1607    }
1608
1609    #[tokio::test]
1610    async fn tick_no_target_excludes_truncated_hashes() {
1611        // Pre-pick count exceeds `max_pushes_per_tick`: every
1612        // attempted hash finds a target (so `rejected_no_target`
1613        // must stay 0), the truncated tail is never tried, and
1614        // exactly `max_pushes_per_tick` pushes fire. Regression
1615        // against the prior `pre_pick - candidates.len()` math
1616        // that conflated truncation with no-target.
1617        let now = Instant::now();
1618        let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
1619        let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
1620        let heat = heat_registry_with(now, &entries);
1621        let refcount = refcount_with_zero(&hashes, 1_000_000);
1622        let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
1623        let fold = cap_index_with(&[peer]);
1624        let local = overflow_enabled_local_caps();
1625        let cfg = OverflowConfig {
1626            enabled: true,
1627            max_pushes_per_tick: 2,
1628            ..Default::default()
1629        };
1630        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1631        let active = AtomicBool::new(false);
1632        let sink = OverflowPushRecorder::new();
1633
1634        let report = drive_blob_overflow_tick(
1635            &controller,
1636            &sink,
1637            OverflowTickObservation {
1638                disk_used_bytes: 900,
1639                disk_total_bytes: 1000,
1640                hysteresis_active: &active,
1641                now,
1642            },
1643            |_| Some(1024),
1644        )
1645        .await;
1646        assert_eq!(report.admitted, 2);
1647        assert_eq!(
1648            report.rejected_no_target, 0,
1649            "truncated hashes (never attempted) must NOT bump rejected_no_target"
1650        );
1651        assert_eq!(sink.calls().len(), 2);
1652    }
1653
1654    #[tokio::test]
1655    async fn tick_no_target_counts_only_attempted_failures() {
1656        // Mix: two hashes need 4 GiB; peer offers 80 GiB so
1657        // both find targets. Two more hashes need 100 GiB; no
1658        // peer can take them → both bump `rejected_no_target`.
1659        // With max_pushes_per_tick=3 the loop stops after the
1660        // 3rd successful push attempt, so we should see
1661        // admitted=2 and no_target=2 (both attempted) — NOT a
1662        // capped diff that would mis-attribute the truncation.
1663        let now = Instant::now();
1664        // Order by hash bytes (sort is coldest-first, ties by
1665        // hash). Use distinct first bytes so order is
1666        // predictable.
1667        let (small1, _) = hex64(0x01);
1668        let (big1, _) = hex64(0x02);
1669        let (small2, _) = hex64(0x03);
1670        let (big2, _) = hex64(0x04);
1671        let heat = heat_registry_with(
1672            now,
1673            &[(small1, 0.0), (big1, 0.0), (small2, 0.0), (big2, 0.0)],
1674        );
1675        let refcount = refcount_with_zero(&[small1, big1, small2, big2], 1_000_000);
1676        let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
1677        let fold = cap_index_with(&[peer]);
1678        let local = overflow_enabled_local_caps();
1679        let cfg = OverflowConfig {
1680            enabled: true,
1681            max_pushes_per_tick: 3,
1682            ..Default::default()
1683        };
1684        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1685        let active = AtomicBool::new(false);
1686        let sink = OverflowPushRecorder::new();
1687
1688        let size_for_hash = move |h: [u8; 32]| -> Option<u64> {
1689            if h == big1 || h == big2 {
1690                Some(100 * (1 << 30)) // 100 GiB — over peer's free
1691            } else {
1692                Some(1024) // tiny
1693            }
1694        };
1695        let report = drive_blob_overflow_tick(
1696            &controller,
1697            &sink,
1698            OverflowTickObservation {
1699                disk_used_bytes: 900,
1700                disk_total_bytes: 1000,
1701                hysteresis_active: &active,
1702                now,
1703            },
1704            size_for_hash,
1705        )
1706        .await;
1707        // 2 small hashes fit (admitted), 2 big hashes have no
1708        // target. The loop never hits the `max=3` cap because
1709        // there are only 4 candidates total.
1710        assert_eq!(report.admitted, 2);
1711        assert_eq!(report.rejected_no_target, 2);
1712    }
1713
1714    #[tokio::test]
1715    async fn tick_zero_disk_total_never_fires() {
1716        // disk_total = 0 → ratio = 0.0 → always below high
1717        // water. Defends against misconfigured nodes that
1718        // would push the moment any chunk lands.
1719        let now = Instant::now();
1720        let (a, _) = hex64(0xAA);
1721        let heat = heat_registry_with(now, &[(a, 0.0)]);
1722        let refcount = refcount_with_zero(&[a], 1_000_000);
1723        let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
1724        let fold = cap_index_with(&[peer]);
1725        let local = overflow_enabled_local_caps();
1726        let cfg = OverflowConfig {
1727            enabled: true,
1728            ..Default::default()
1729        };
1730        let controller = BlobOverflowController::new(&local, &fold, &heat, &refcount, &cfg);
1731        let active = AtomicBool::new(false);
1732        let sink = OverflowPushRecorder::new();
1733
1734        let report = drive_blob_overflow_tick(
1735            &controller,
1736            &sink,
1737            OverflowTickObservation {
1738                disk_used_bytes: 500,
1739                disk_total_bytes: 0, // disk_total = 0
1740                hysteresis_active: &active,
1741                now,
1742            },
1743            |_| Some(1024),
1744        )
1745        .await;
1746        assert_eq!(report.disk_ratio_at_start, 0.0);
1747        assert!(!report.is_active_at_end);
1748        assert_eq!(sink.calls().len(), 0);
1749    }
1750
1751    // ========================================================================
1752    // scope_covers
1753    // ========================================================================
1754
1755    #[test]
1756    fn scope_covers_mesh_covers_everything() {
1757        use TopologyScope::*;
1758        for peer in [Node, Zone, Region, Mesh] {
1759            assert!(scope_covers(Mesh, peer));
1760        }
1761    }
1762
1763    #[test]
1764    fn scope_covers_zone_does_not_cover_node() {
1765        // Zone-scoped sender can't push to a Node-scoped
1766        // peer (peer is narrower; won't accept cross-scope).
1767        assert!(!scope_covers(TopologyScope::Zone, TopologyScope::Node));
1768        // But Zone-scoped sender CAN push to Zone / Region /
1769        // Mesh peers (peer's scope covers the sender's).
1770        assert!(scope_covers(TopologyScope::Zone, TopologyScope::Zone));
1771        assert!(scope_covers(TopologyScope::Zone, TopologyScope::Region));
1772        assert!(scope_covers(TopologyScope::Zone, TopologyScope::Mesh));
1773    }
1774
1775    // ========================================================================
1776    // Wire types (P3) — postcard round-trip
1777    //
1778    // The receive side decodes `OverflowPush` from the nRPC payload and
1779    // encodes `OverflowPushAck` back; the sender's
1780    // `MeshNodeOverflowPushSink` does the inverse. Verify the encode +
1781    // decode are total inverses for every typed variant so a sender +
1782    // receiver on different builds can't observe wire-format divergence.
1783    // ========================================================================
1784
1785    #[test]
1786    fn overflow_push_request_round_trips_postcard() {
1787        let req = OverflowPush {
1788            blob_hash: [0xAA; 32],
1789            size_bytes: 4 * (1 << 20),
1790            sender_node_id: 0xDEAD_BEEF_u64,
1791        };
1792        let bytes = postcard::to_allocvec(&req).expect("encode");
1793        let decoded: OverflowPush = postcard::from_bytes(&bytes).expect("decode");
1794        assert_eq!(decoded, req);
1795    }
1796
1797    #[test]
1798    fn overflow_push_ack_accepted_round_trips() {
1799        let ack = OverflowPushAck::Accepted;
1800        let bytes = postcard::to_allocvec(&ack).expect("encode");
1801        let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1802        assert_eq!(decoded, ack);
1803    }
1804
1805    #[test]
1806    fn overflow_push_ack_rejected_carries_typed_reason() {
1807        // Every `OverflowReject` variant round-trips inside
1808        // the ack. Operators dashboard the typed reason on
1809        // the sender side; the wire form must preserve it.
1810        for reason in [
1811            OverflowReject::NoStorageCap,
1812            OverflowReject::NotParticipating,
1813            OverflowReject::SenderNotOverflowing,
1814            OverflowReject::Unhealthy,
1815            OverflowReject::ScopeMismatch,
1816            OverflowReject::InsufficientDisk,
1817        ] {
1818            let ack = OverflowPushAck::Rejected(reason);
1819            let bytes = postcard::to_allocvec(&ack).expect("encode");
1820            let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1821            assert_eq!(decoded, ack, "ack with {:?} must round-trip", reason);
1822        }
1823    }
1824
1825    #[test]
1826    fn overflow_push_ack_open_chunk_failed_round_trips() {
1827        let ack = OverflowPushAck::OpenChunkFailed;
1828        let bytes = postcard::to_allocvec(&ack).expect("encode");
1829        let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
1830        assert_eq!(decoded, ack);
1831    }
1832
1833    #[test]
1834    fn overflow_push_service_name_is_stable() {
1835        // Pin the wire-level service-name token. A change here
1836        // would silently break sender/receiver compatibility
1837        // across builds (both pieces are gated by feature flag
1838        // but ship in the same crate).
1839        assert_eq!(OVERFLOW_PUSH_SERVICE, "dataforts.blob.overflow_push");
1840    }
1841}