use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use super::admission::OverflowReject;
use super::error::BlobError;
use super::mesh::OverflowConfig;
use super::refcount::BlobRefcountTable;
use crate::adapter::net::behavior::capability::CapabilityIndex;
use crate::adapter::net::behavior::{
is_blob_storage_unhealthy, BlobCapability, CapabilitySet, GravityCapability, TopologyScope,
};
use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
pub const OVERFLOW_PUSH_SERVICE: &str = "dataforts.blob.overflow_push";
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct OverflowPush {
pub blob_hash: [u8; 32],
pub size_bytes: u64,
pub sender_node_id: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum OverflowPushAck {
Accepted,
Rejected(OverflowReject),
OpenChunkFailed,
}
#[derive(Clone, Debug, Default)]
pub struct OverflowCandidateBatch {
pub candidates: Vec<BlobOverflowCandidate>,
pub no_target_count: usize,
}
#[derive(Clone, Debug)]
pub struct BlobOverflowCandidate {
pub hash: [u8; 32],
pub size_bytes: u64,
pub target_node_id: u64,
pub target_caps: CapabilitySet,
pub cold_rate: f64,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BlobOverflowTickReport {
pub admitted: u64,
pub rejected_no_target: u64,
pub push_errors: u64,
pub was_active_at_start: bool,
pub is_active_at_end: bool,
pub disk_ratio_at_start: f64,
pub disk_ratio_at_end: f64,
pub pushed_bytes: u64,
}
pub fn step_overflow_hysteresis(
active: &AtomicBool,
disk_ratio: f64,
high_water: f64,
low_water: f64,
) -> bool {
let was_active = active.load(Ordering::Relaxed);
let now_active = if disk_ratio >= high_water {
true
} else if disk_ratio <= low_water {
false
} else {
was_active
};
if now_active != was_active {
active.store(now_active, Ordering::Relaxed);
}
now_active
}
#[cfg(feature = "cortex")]
pub struct OverflowPushHandler {
pub mesh: Arc<crate::adapter::net::MeshNode>,
pub adapter: Arc<super::mesh::MeshBlobAdapter>,
}
#[cfg(feature = "cortex")]
impl OverflowPushHandler {
pub fn new(
mesh: Arc<crate::adapter::net::MeshNode>,
adapter: Arc<super::mesh::MeshBlobAdapter>,
) -> Self {
Self { mesh, adapter }
}
pub async fn handle(&self, request: OverflowPush) -> OverflowPushAck {
use super::adapter::BlobAdapter;
use super::admission::{should_accept_overflow_from, OverflowVerdict};
use super::blob_ref::BlobRef;
let sender_caps = self
.mesh
.capability_index_arc()
.get(request.sender_node_id)
.unwrap_or_default();
let local_caps = self.mesh.user_caps_snapshot();
let verdict = should_accept_overflow_from(&local_caps, &sender_caps, request.size_bytes);
match verdict {
OverflowVerdict::Reject(reason) => {
self.adapter.record_overflow_reject(reason);
OverflowPushAck::Rejected(reason)
}
OverflowVerdict::Admit => {
let mut hex = String::with_capacity(64);
for b in request.blob_hash {
use std::fmt::Write;
let _ = write!(&mut hex, "{:02x}", b);
}
let blob_ref = BlobRef::small(
format!("mesh://{}", hex),
request.blob_hash,
request.size_bytes,
);
match self.adapter.prefetch(&blob_ref).await {
Ok(()) => OverflowPushAck::Accepted,
Err(e) => {
tracing::warn!(
error = %e,
hash = %hex,
sender = request.sender_node_id,
"overflow push: prefetch failed after admit",
);
OverflowPushAck::OpenChunkFailed
}
}
}
}
}
}
#[cfg(feature = "cortex")]
#[async_trait]
impl crate::adapter::net::cortex::RpcHandler for OverflowPushHandler {
async fn call(
&self,
ctx: crate::adapter::net::cortex::RpcContext,
) -> Result<
crate::adapter::net::cortex::RpcResponsePayload,
crate::adapter::net::cortex::RpcHandlerError,
> {
use crate::adapter::net::cortex::{RpcHandlerError, RpcResponsePayload, RpcStatus};
let request: OverflowPush = postcard::from_bytes(&ctx.payload.body)
.map_err(|e| RpcHandlerError::Internal(format!("overflow push: decode failed: {e}")))?;
let ack = self.handle(request).await;
let body = postcard::to_allocvec(&ack).map_err(|e| {
RpcHandlerError::Internal(format!("overflow push: encode ack failed: {e}"))
})?;
Ok(RpcResponsePayload {
status: RpcStatus::Ok,
headers: Vec::new(),
body: bytes::Bytes::from(body),
})
}
}
#[cfg(feature = "cortex")]
pub struct MeshNodeOverflowPushSink {
pub mesh: Arc<crate::adapter::net::MeshNode>,
}
#[cfg(feature = "cortex")]
impl MeshNodeOverflowPushSink {
pub fn new(mesh: Arc<crate::adapter::net::MeshNode>) -> Self {
Self { mesh }
}
}
#[cfg(feature = "cortex")]
#[async_trait]
impl OverflowPushSink for MeshNodeOverflowPushSink {
async fn push(
&self,
hash: [u8; 32],
size_bytes: u64,
target_node_id: u64,
) -> Result<(), BlobError> {
let ack = self
.mesh
.send_overflow_push(target_node_id, hash, size_bytes)
.await?;
match ack {
OverflowPushAck::Accepted => Ok(()),
OverflowPushAck::Rejected(reason) => Err(BlobError::Backend(format!(
"overflow push to {target_node_id:#x} rejected: {reason:?}"
))),
OverflowPushAck::OpenChunkFailed => Err(BlobError::Backend(format!(
"overflow push to {target_node_id:#x} admitted but chunk open failed"
))),
}
}
}
#[async_trait]
pub trait OverflowPushSink: Send + Sync {
async fn push(
&self,
hash: [u8; 32],
size_bytes: u64,
target_node_id: u64,
) -> Result<(), BlobError>;
}
pub struct BlobOverflowController<'a> {
pub local_caps: &'a CapabilitySet,
pub capability_index: &'a CapabilityIndex,
pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
pub refcount: &'a BlobRefcountTable,
pub config: &'a OverflowConfig,
}
impl<'a> BlobOverflowController<'a> {
pub fn new(
local_caps: &'a CapabilitySet,
capability_index: &'a CapabilityIndex,
heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
refcount: &'a BlobRefcountTable,
config: &'a OverflowConfig,
) -> Self {
Self {
local_caps,
capability_index,
heat_registry,
refcount,
config,
}
}
pub fn candidates(
&self,
now: Instant,
size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
) -> Vec<BlobOverflowCandidate> {
self.candidate_batch(now, size_for_hash).candidates
}
pub fn candidate_batch(
&self,
now: Instant,
size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
) -> OverflowCandidateBatch {
let snap: Vec<([u8; 32], f64)> = {
let guard = self.heat_registry.lock();
guard
.iter()
.map(|(h, c)| {
let elapsed = now.saturating_duration_since(c.last_update());
let half_life_s = c.half_life().as_secs_f64();
let rate = if half_life_s == 0.0 || c.rate() == 0.0 {
c.rate()
} else {
let half_lives = elapsed.as_secs_f64() / half_life_s;
if half_lives > 64.0 {
0.0
} else {
c.rate() * 0.5_f64.powf(half_lives)
}
};
(*h, rate)
})
.collect()
};
let mut filtered: Vec<([u8; 32], f64, u64)> = snap
.into_iter()
.filter_map(|(h, rate)| {
let entry = self.refcount.get(&h)?;
if entry.pinned {
return None;
}
if entry.refcount > 0 {
return None;
}
let size = size_for_hash(h)?;
Some((h, rate, size))
})
.collect();
filtered.sort_by(|a, b| {
a.1.partial_cmp(&b.1)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.0.cmp(&b.0))
});
let local_gravity = GravityCapability::from_capability_set(self.local_caps);
let mut candidates: Vec<BlobOverflowCandidate> = Vec::new();
let mut no_target_count: usize = 0;
for (hash, cold_rate, size_bytes) in filtered {
match self.pick_target(size_bytes, local_gravity.scope) {
Some((target_node_id, target_caps)) => {
candidates.push(BlobOverflowCandidate {
hash,
size_bytes,
target_node_id,
target_caps,
cold_rate,
});
}
None => {
no_target_count += 1;
}
}
if candidates.len() >= self.config.max_pushes_per_tick {
break;
}
}
OverflowCandidateBatch {
candidates,
no_target_count,
}
}
fn pick_target(
&self,
size_bytes: u64,
local_scope: TopologyScope,
) -> Option<(u64, CapabilitySet)> {
let required_gb = size_bytes.div_ceil(1 << 30);
let mut best: Option<(u64, u64, CapabilitySet)> = None; for node_id in self.capability_index.all_nodes() {
let Some(caps) = self.capability_index.get(node_id) else {
continue;
};
let peer_blob = BlobCapability::from_capability_set(&caps);
if !peer_blob.storage || !peer_blob.overflow_enabled {
continue;
}
if peer_blob.disk_free_gb < required_gb {
continue;
}
if is_blob_storage_unhealthy(&caps) {
continue;
}
let peer_gravity = GravityCapability::from_capability_set(&caps);
if !scope_covers(local_scope, peer_gravity.scope) {
continue;
}
match &best {
None => best = Some((peer_blob.disk_free_gb, node_id, caps)),
Some((d, n, _)) => {
let is_better = peer_blob.disk_free_gb > *d
|| (peer_blob.disk_free_gb == *d && node_id < *n);
if is_better {
best = Some((peer_blob.disk_free_gb, node_id, caps));
}
}
}
}
best.map(|(_, node_id, caps)| (node_id, caps))
}
}
pub struct OverflowTickContext<'a> {
pub capability_index: &'a CapabilityIndex,
pub heat_registry: &'a Arc<parking_lot::Mutex<BlobHeatRegistry>>,
pub sink: &'a dyn OverflowPushSink,
pub local_caps: &'a CapabilitySet,
pub disk_used_bytes: u64,
pub disk_total_bytes: u64,
}
pub struct OverflowTickObservation<'a> {
pub disk_used_bytes: u64,
pub disk_total_bytes: u64,
pub hysteresis_active: &'a AtomicBool,
pub now: Instant,
}
pub async fn drive_blob_overflow_tick(
controller: &BlobOverflowController<'_>,
sink: &dyn OverflowPushSink,
observation: OverflowTickObservation<'_>,
size_for_hash: impl Fn([u8; 32]) -> Option<u64>,
) -> BlobOverflowTickReport {
let OverflowTickObservation {
disk_used_bytes,
disk_total_bytes,
hysteresis_active,
now,
} = observation;
let mut report = BlobOverflowTickReport::default();
let disk_ratio = if disk_total_bytes == 0 {
0.0
} else {
disk_used_bytes as f64 / disk_total_bytes as f64
};
report.disk_ratio_at_start = disk_ratio;
report.was_active_at_start = hysteresis_active.load(Ordering::Relaxed);
let fire = step_overflow_hysteresis(
hysteresis_active,
disk_ratio,
controller.config.high_water_ratio,
controller.config.low_water_ratio,
);
report.is_active_at_end = fire;
if !controller.config.enabled || !fire {
report.disk_ratio_at_end = disk_ratio;
return report;
}
let local_blob = BlobCapability::from_capability_set(controller.local_caps);
if !local_blob.overflow_enabled {
tracing::debug!(
"blob overflow: master switch on but local cap.blob.overflow not yet advertised; \
skipping tick until announce_capabilities propagates the tag"
);
report.disk_ratio_at_end = disk_ratio;
return report;
}
let batch = controller.candidate_batch(now, &size_for_hash);
report.rejected_no_target = batch.no_target_count as u64;
for candidate in batch.candidates {
match sink
.push(
candidate.hash,
candidate.size_bytes,
candidate.target_node_id,
)
.await
{
Ok(()) => {
report.admitted += 1;
report.pushed_bytes = report.pushed_bytes.saturating_add(candidate.size_bytes);
}
Err(e) => {
tracing::trace!(
error = ?e,
hash = ?candidate.hash,
target = candidate.target_node_id,
"blob overflow: push failed; counted"
);
report.push_errors += 1;
}
}
}
report.disk_ratio_at_end = disk_ratio;
report
}
fn scope_covers(local: TopologyScope, peer: TopologyScope) -> bool {
use TopologyScope::*;
matches!(
(local, peer),
(Mesh, _) | (Region, Region | Mesh) | (Zone, Zone | Region | Mesh) | (Node, Node)
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::CapabilityAnnouncement;
use crate::adapter::net::dataforts::gravity::BlobHeatRegistry;
use crate::adapter::net::identity::EntityId;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
fn hex64(byte: u8) -> ([u8; 32], String) {
let mut h = [0u8; 32];
h.fill(byte);
let hex: String = h.iter().map(|b| format!("{:02x}", b)).collect();
(h, hex)
}
fn overflow_peer_caps(disk_free_gb: u64) -> CapabilitySet {
CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag(format!("dataforts.blob.disk_free_gb={}", disk_free_gb))
.add_tag("dataforts.blob.overflow")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128")
}
fn overflow_enabled_local_caps() -> CapabilitySet {
CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.overflow")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128")
}
type RecordedPushCall = ([u8; 32], u64, u64);
type RecordedCallLog = Arc<parking_lot::Mutex<Vec<RecordedPushCall>>>;
struct OverflowPushRecorder {
calls: RecordedCallLog,
fail_count: Arc<AtomicU64>,
}
impl OverflowPushRecorder {
fn new() -> Self {
Self {
calls: Arc::new(parking_lot::Mutex::new(Vec::new())),
fail_count: Arc::new(AtomicU64::new(0)),
}
}
fn calls(&self) -> Vec<RecordedPushCall> {
self.calls.lock().clone()
}
}
#[async_trait]
impl OverflowPushSink for OverflowPushRecorder {
async fn push(
&self,
hash: [u8; 32],
size_bytes: u64,
target_node_id: u64,
) -> Result<(), BlobError> {
if self.fail_count.load(Ordering::Relaxed) > 0 {
self.fail_count.fetch_sub(1, Ordering::Relaxed);
return Err(BlobError::NotFound("simulated push failure".to_string()));
}
self.calls.lock().push((hash, size_bytes, target_node_id));
Ok(())
}
}
fn heat_registry_with(
now: Instant,
entries: &[([u8; 32], f64)],
) -> Arc<parking_lot::Mutex<BlobHeatRegistry>> {
let mut reg = BlobHeatRegistry::new();
for (hash, rate) in entries {
let counter = reg.entry_mut(*hash, Duration::from_secs(60), now);
for _ in 0..(*rate as usize) {
counter.bump(now);
}
}
Arc::new(parking_lot::Mutex::new(reg))
}
fn refcount_with_zero(hashes: &[[u8; 32]], now_ms: u64) -> BlobRefcountTable {
let rc = BlobRefcountTable::new();
for h in hashes {
rc.store_observed(*h, 0, now_ms);
}
rc
}
fn cap_index_with(peers: &[(u64, [u8; 32], CapabilitySet)]) -> CapabilityIndex {
let index = CapabilityIndex::new();
for (idx, (node_id, entity_bytes, caps)) in peers.iter().enumerate() {
let entity = EntityId::from_bytes(*entity_bytes);
let announce =
CapabilityAnnouncement::new(*node_id, entity, 1 + idx as u64, caps.clone());
index.index(announce);
}
index
}
#[test]
fn hysteresis_fires_above_high_water() {
let active = AtomicBool::new(false);
assert!(step_overflow_hysteresis(&active, 0.90, 0.85, 0.70));
assert!(active.load(Ordering::Relaxed));
}
#[test]
fn hysteresis_clears_below_low_water() {
let active = AtomicBool::new(true);
assert!(!step_overflow_hysteresis(&active, 0.65, 0.85, 0.70));
assert!(!active.load(Ordering::Relaxed));
}
#[test]
fn hysteresis_holds_state_in_band() {
let active = AtomicBool::new(true);
assert!(step_overflow_hysteresis(&active, 0.80, 0.85, 0.70));
assert!(active.load(Ordering::Relaxed));
let inactive = AtomicBool::new(false);
assert!(!step_overflow_hysteresis(&inactive, 0.80, 0.85, 0.70));
assert!(!inactive.load(Ordering::Relaxed));
}
#[test]
fn hysteresis_boundary_inclusive() {
let active = AtomicBool::new(false);
assert!(step_overflow_hysteresis(&active, 0.85, 0.85, 0.70));
let active2 = AtomicBool::new(true);
assert!(!step_overflow_hysteresis(&active2, 0.70, 0.85, 0.70));
}
#[test]
fn controller_candidates_returns_coldest_first() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let (b, _) = hex64(0xBB);
let (c, _) = hex64(0xCC);
let heat = heat_registry_with(now, &[(a, 0.0), (b, 1.0), (c, 5.0)]);
let refcount = refcount_with_zero(&[a, b, c], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let cands = controller.candidates(now, |_| Some(1024));
assert_eq!(cands.len(), 3);
assert_eq!(cands[0].hash, a);
assert_eq!(cands[2].hash, c);
}
#[test]
fn controller_skips_pinned_hashes() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let (b, _) = hex64(0xBB);
let heat = heat_registry_with(now, &[(a, 0.0), (b, 0.0)]);
let refcount = BlobRefcountTable::new();
refcount.store_observed(a, 0, 1_000_000);
refcount.pin(a, 1_000_000);
refcount.store_observed(b, 0, 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let cands = controller.candidates(now, |_| Some(1024));
assert_eq!(cands.len(), 1);
assert_eq!(cands[0].hash, b);
}
#[test]
fn controller_skips_hashes_with_nonzero_refcount() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = BlobRefcountTable::new();
refcount.incr(a, 1_000_000); let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
assert!(controller.candidates(now, |_| Some(1024)).is_empty());
}
#[test]
fn controller_picks_highest_disk_free_target() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer_low = (99u64, [0x11; 32], overflow_peer_caps(40));
let peer_high = (88u64, [0x22; 32], overflow_peer_caps(80));
let index = cap_index_with(&[peer_low, peer_high]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let cands = controller.candidates(now, |_| Some(1024));
assert_eq!(cands.len(), 1);
assert_eq!(cands[0].target_node_id, 88);
}
#[test]
fn controller_skips_peers_without_overflow_tag() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let no_overflow_peer_caps = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag("dataforts.blob.disk_free_gb=80")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128");
let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
assert!(controller.candidates(now, |_| Some(1024)).is_empty());
}
#[test]
fn controller_skips_peers_with_insufficient_disk() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(1));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 16,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let four_gib: u64 = 4 * (1 << 30);
assert!(controller.candidates(now, |_| Some(four_gib)).is_empty());
}
#[test]
fn controller_truncates_to_max_pushes_per_tick() {
let now = Instant::now();
let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
let heat = heat_registry_with(now, &entries);
let refcount = refcount_with_zero(&hashes, 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 2,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let cands = controller.candidates(now, |_| Some(1024));
assert_eq!(
cands.len(),
2,
"max_pushes_per_tick caps the candidate list"
);
}
#[tokio::test]
async fn tick_no_op_when_below_low_water() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 500,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.admitted, 0);
assert!(!report.is_active_at_end);
assert_eq!(sink.calls().len(), 0);
}
#[tokio::test]
async fn tick_fires_above_high_water_and_pushes_to_recorder() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.admitted, 1);
assert!(report.is_active_at_end);
assert_eq!(report.pushed_bytes, 1024);
let calls = sink.calls();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, a);
assert_eq!(calls[0].2, 99);
}
#[tokio::test]
async fn tick_master_switch_off_skips_pushes_even_above_high_water() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: false, ..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert!(report.is_active_at_end);
assert_eq!(report.admitted, 0);
assert_eq!(sink.calls().len(), 0);
}
#[tokio::test]
async fn tick_records_push_errors_when_sink_fails() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
sink.fail_count.store(1, Ordering::Relaxed);
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.admitted, 0);
assert_eq!(report.push_errors, 1);
assert_eq!(report.pushed_bytes, 0);
}
#[tokio::test]
async fn tick_records_no_target_when_no_overflow_enabled_peer() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let no_overflow_peer_caps = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag("dataforts.blob.disk_free_gb=80")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh");
let peer = (99u64, [0x11; 32], no_overflow_peer_caps);
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.admitted, 0);
assert_eq!(report.rejected_no_target, 1);
assert_eq!(sink.calls().len(), 0);
}
#[tokio::test]
async fn tick_skips_when_local_overflow_tag_not_advertised() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128");
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert!(report.is_active_at_end);
assert_eq!(report.admitted, 0);
assert_eq!(report.push_errors, 0);
assert_eq!(report.rejected_no_target, 0);
assert_eq!(sink.calls().len(), 0);
}
#[tokio::test]
async fn tick_no_target_excludes_truncated_hashes() {
let now = Instant::now();
let hashes: Vec<[u8; 32]> = (0..5).map(|i| hex64(i as u8).0).collect();
let entries: Vec<([u8; 32], f64)> = hashes.iter().map(|h| (*h, 0.0)).collect();
let heat = heat_registry_with(now, &entries);
let refcount = refcount_with_zero(&hashes, 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 2,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.admitted, 2);
assert_eq!(
report.rejected_no_target, 0,
"truncated hashes (never attempted) must NOT bump rejected_no_target"
);
assert_eq!(sink.calls().len(), 2);
}
#[tokio::test]
async fn tick_no_target_counts_only_attempted_failures() {
let now = Instant::now();
let (small1, _) = hex64(0x01);
let (big1, _) = hex64(0x02);
let (small2, _) = hex64(0x03);
let (big2, _) = hex64(0x04);
let heat = heat_registry_with(
now,
&[(small1, 0.0), (big1, 0.0), (small2, 0.0), (big2, 0.0)],
);
let refcount = refcount_with_zero(&[small1, big1, small2, big2], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(80));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
max_pushes_per_tick: 3,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let size_for_hash = move |h: [u8; 32]| -> Option<u64> {
if h == big1 || h == big2 {
Some(100 * (1 << 30)) } else {
Some(1024) }
};
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 900,
disk_total_bytes: 1000,
hysteresis_active: &active,
now,
},
size_for_hash,
)
.await;
assert_eq!(report.admitted, 2);
assert_eq!(report.rejected_no_target, 2);
}
#[tokio::test]
async fn tick_zero_disk_total_never_fires() {
let now = Instant::now();
let (a, _) = hex64(0xAA);
let heat = heat_registry_with(now, &[(a, 0.0)]);
let refcount = refcount_with_zero(&[a], 1_000_000);
let peer = (99u64, [0x11; 32], overflow_peer_caps(50));
let index = cap_index_with(&[peer]);
let local = overflow_enabled_local_caps();
let cfg = OverflowConfig {
enabled: true,
..Default::default()
};
let controller = BlobOverflowController::new(&local, &index, &heat, &refcount, &cfg);
let active = AtomicBool::new(false);
let sink = OverflowPushRecorder::new();
let report = drive_blob_overflow_tick(
&controller,
&sink,
OverflowTickObservation {
disk_used_bytes: 500,
disk_total_bytes: 0, hysteresis_active: &active,
now,
},
|_| Some(1024),
)
.await;
assert_eq!(report.disk_ratio_at_start, 0.0);
assert!(!report.is_active_at_end);
assert_eq!(sink.calls().len(), 0);
}
#[test]
fn scope_covers_mesh_covers_everything() {
use TopologyScope::*;
for peer in [Node, Zone, Region, Mesh] {
assert!(scope_covers(Mesh, peer));
}
}
#[test]
fn scope_covers_zone_does_not_cover_node() {
assert!(!scope_covers(TopologyScope::Zone, TopologyScope::Node));
assert!(scope_covers(TopologyScope::Zone, TopologyScope::Zone));
assert!(scope_covers(TopologyScope::Zone, TopologyScope::Region));
assert!(scope_covers(TopologyScope::Zone, TopologyScope::Mesh));
}
#[test]
fn overflow_push_request_round_trips_postcard() {
let req = OverflowPush {
blob_hash: [0xAA; 32],
size_bytes: 4 * (1 << 20),
sender_node_id: 0xDEAD_BEEF_u64,
};
let bytes = postcard::to_allocvec(&req).expect("encode");
let decoded: OverflowPush = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, req);
}
#[test]
fn overflow_push_ack_accepted_round_trips() {
let ack = OverflowPushAck::Accepted;
let bytes = postcard::to_allocvec(&ack).expect("encode");
let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, ack);
}
#[test]
fn overflow_push_ack_rejected_carries_typed_reason() {
for reason in [
OverflowReject::NoStorageCap,
OverflowReject::NotParticipating,
OverflowReject::SenderNotOverflowing,
OverflowReject::Unhealthy,
OverflowReject::ScopeMismatch,
OverflowReject::InsufficientDisk,
] {
let ack = OverflowPushAck::Rejected(reason);
let bytes = postcard::to_allocvec(&ack).expect("encode");
let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, ack, "ack with {:?} must round-trip", reason);
}
}
#[test]
fn overflow_push_ack_open_chunk_failed_round_trips() {
let ack = OverflowPushAck::OpenChunkFailed;
let bytes = postcard::to_allocvec(&ack).expect("encode");
let decoded: OverflowPushAck = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, ack);
}
#[test]
fn overflow_push_service_name_is_stable() {
assert_eq!(OVERFLOW_PUSH_SERVICE, "dataforts.blob.overflow_push");
}
}