use std::sync::Arc;
use crate::adapter::net::behavior::capability::{CapabilityIndex, CapabilitySet};
use crate::adapter::net::behavior::dataforts_capabilities::{
GravityCapability, GreedyCapability, TopologyScope,
};
use crate::adapter::net::behavior::tag::{AxisSeparator, Tag};
use crate::adapter::net::behavior::TaxonomyAxis;
use super::adapter::BlobAdapter;
use super::admission::{should_migrate_blob_to, MigrateBlobReject, MigrateBlobVerdict};
use super::blob_ref::BlobRef;
fn scope_at_least_as_narrow(a: TopologyScope, b: TopologyScope) -> bool {
use TopologyScope::*;
matches!(
(a, b),
(Node, _) | (Zone, Zone | Region | Mesh) | (Region, Region | Mesh) | (Mesh, Mesh)
)
}
fn narrower_scope(a: TopologyScope, b: TopologyScope) -> TopologyScope {
if scope_at_least_as_narrow(a, b) {
a
} else {
b
}
}
fn narrow_scope_tags_in(
caps: &mut CapabilitySet,
gravity_floor: TopologyScope,
greedy_floor: TopologyScope,
) {
caps.tags.retain(|t| match t {
Tag::AxisValue { axis, key, .. } if *axis == TaxonomyAxis::Dataforts => {
key != "greedy.scope" && key != "gravity.scope"
}
_ => true,
});
caps.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Dataforts,
key: "greedy.scope".to_string(),
value: greedy_floor.as_wire_str().to_string(),
separator: AxisSeparator::Eq,
});
caps.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Dataforts,
key: "gravity.scope".to_string(),
value: gravity_floor.as_wire_str().to_string(),
separator: AxisSeparator::Eq,
});
}
#[inline]
fn nibble_lowercase(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
_ => None,
}
}
pub fn parse_blob_heat_tag(tag: &Tag) -> Option<([u8; 32], f64)> {
let body = match tag {
Tag::Reserved { prefix, body } if prefix == "heat:" => body,
_ => return None,
};
let rest = body.strip_prefix("blob:")?;
let eq_idx = rest.find('=')?;
let hex = &rest[..eq_idx];
let rate_str = &rest[eq_idx + 1..];
if hex.len() != 64 {
return None;
}
if hex.bytes().any(|b| b.is_ascii_uppercase()) {
return None;
}
let mut hash = [0u8; 32];
let bytes = hex.as_bytes();
for (i, byte) in hash.iter_mut().enumerate() {
let hi = nibble_lowercase(bytes[2 * i])?;
let lo = nibble_lowercase(bytes[2 * i + 1])?;
*byte = (hi << 4) | lo;
}
let rate: f64 = rate_str.parse().ok()?;
if !rate.is_finite() {
return None;
}
Some((hash, rate))
}
#[derive(Debug, Clone)]
pub struct BlobMigrationCandidate {
pub hash: [u8; 32],
pub publisher_node_id: u64,
pub publisher_caps: CapabilitySet,
pub rate: f64,
}
pub struct BlobMigrationController<'a> {
pub local_caps: &'a CapabilitySet,
pub capability_index: &'a CapabilityIndex,
}
impl<'a> BlobMigrationController<'a> {
pub fn new(local_caps: &'a CapabilitySet, capability_index: &'a CapabilityIndex) -> Self {
Self {
local_caps,
capability_index,
}
}
pub fn candidates(&self) -> Vec<BlobMigrationCandidate> {
struct ScopeFloor {
gravity: Option<TopologyScope>,
greedy: Option<TopologyScope>,
}
let mut raw: Vec<(u64, [u8; 32], f64)> = Vec::new();
let mut floors: std::collections::HashMap<[u8; 32], ScopeFloor> =
std::collections::HashMap::new();
let mut peer_caps_cache: std::collections::HashMap<u64, CapabilitySet> =
std::collections::HashMap::new();
for node_id in self.capability_index.all_nodes() {
let caps = match self.capability_index.get(node_id) {
Some(c) => c,
None => continue,
};
let peer_gravity = GravityCapability::from_capability_set(&caps);
let peer_greedy = GreedyCapability::from_capability_set(&caps);
let mut emitted_any = false;
for tag in &caps.tags {
if let Some((hash, rate)) = parse_blob_heat_tag(tag) {
let entry = floors.entry(hash).or_insert(ScopeFloor {
gravity: None,
greedy: None,
});
if peer_gravity.enabled {
entry.gravity = Some(match entry.gravity {
Some(prev) => narrower_scope(prev, peer_gravity.scope),
None => peer_gravity.scope,
});
}
if peer_greedy.enabled {
entry.greedy = Some(match entry.greedy {
Some(prev) => narrower_scope(prev, peer_greedy.scope),
None => peer_greedy.scope,
});
}
raw.push((node_id, hash, rate));
emitted_any = true;
}
}
if emitted_any {
peer_caps_cache.insert(node_id, caps);
}
}
let mut out = Vec::with_capacity(raw.len());
for (node_id, hash, rate) in raw {
let mut caps = match peer_caps_cache.get(&node_id) {
Some(c) => c.clone(),
None => continue, };
if let Some(floor) = floors.get(&hash) {
if let (Some(g), Some(gr)) = (floor.gravity, floor.greedy) {
narrow_scope_tags_in(&mut caps, g, gr);
} else if let Some(g) = floor.gravity {
let cur_greedy = GreedyCapability::from_capability_set(&caps).scope;
narrow_scope_tags_in(&mut caps, g, cur_greedy);
} else if let Some(gr) = floor.greedy {
let cur_gravity = GravityCapability::from_capability_set(&caps).scope;
narrow_scope_tags_in(&mut caps, cur_gravity, gr);
}
}
out.push(BlobMigrationCandidate {
hash,
publisher_node_id: node_id,
publisher_caps: caps,
rate,
});
}
out
}
}
pub const DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK: usize = 32;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct BlobMigrationTickReport {
pub admitted: u64,
pub rejected_no_storage: u64,
pub rejected_gravity_disabled: u64,
pub rejected_proximity_zero: u64,
pub rejected_unhealthy: u64,
pub rejected_scope_mismatch: u64,
pub rejected_insufficient_disk: u64,
pub skipped_unknown_size: u64,
pub prefetch_errors: u64,
pub skipped_peer_budget: u64,
}
impl BlobMigrationTickReport {
pub fn total_rejected(&self) -> u64 {
self.rejected_no_storage
+ self.rejected_gravity_disabled
+ self.rejected_proximity_zero
+ self.rejected_unhealthy
+ self.rejected_scope_mismatch
+ self.rejected_insufficient_disk
}
fn record_reject(&mut self, r: MigrateBlobReject) {
match r {
MigrateBlobReject::NoStorageCap => self.rejected_no_storage += 1,
MigrateBlobReject::GravityDisabled => self.rejected_gravity_disabled += 1,
MigrateBlobReject::ProximityZero => self.rejected_proximity_zero += 1,
MigrateBlobReject::Unhealthy => self.rejected_unhealthy += 1,
MigrateBlobReject::ScopeMismatch => self.rejected_scope_mismatch += 1,
MigrateBlobReject::InsufficientDisk => self.rejected_insufficient_disk += 1,
}
}
}
pub async fn drive_blob_migration_tick<A, F>(
local_caps: &CapabilitySet,
capability_index: &CapabilityIndex,
adapter: &A,
size_for_hash: F,
) -> BlobMigrationTickReport
where
A: BlobAdapter + ?Sized,
F: Fn([u8; 32]) -> Option<u64>,
{
let controller = BlobMigrationController::new(local_caps, capability_index);
let candidates = controller.candidates();
let mut report = BlobMigrationTickReport::default();
let mut peer_admits: std::collections::HashMap<u64, usize> = std::collections::HashMap::new();
for candidate in candidates {
let size = match size_for_hash(candidate.hash) {
Some(s) => s,
None => {
report.skipped_unknown_size += 1;
continue;
}
};
let verdict = should_migrate_blob_to(local_caps, &candidate.publisher_caps, size);
match verdict {
MigrateBlobVerdict::Admit => {
let count = peer_admits.entry(candidate.publisher_node_id).or_insert(0);
if *count >= DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK {
report.skipped_peer_budget += 1;
continue;
}
*count += 1;
let blob_ref = BlobRef::small(
format!("mesh://{}", hex32(&candidate.hash)),
candidate.hash,
size,
);
match adapter.prefetch(&blob_ref).await {
Ok(()) => report.admitted += 1,
Err(e) => {
tracing::trace!(
error = ?e,
hash = ?candidate.hash,
"blob migration: prefetch failed; counted"
);
report.prefetch_errors += 1;
}
}
}
MigrateBlobVerdict::Reject(r) => report.record_reject(r),
}
}
report
}
pub async fn drive_blob_migration_tick_arc<F>(
local_caps: &CapabilitySet,
capability_index: &CapabilityIndex,
adapter: Arc<dyn BlobAdapter>,
size_for_hash: F,
) -> BlobMigrationTickReport
where
F: Fn([u8; 32]) -> Option<u64>,
{
drive_blob_migration_tick(local_caps, capability_index, &*adapter, size_for_hash).await
}
pub type ManifestSiblings = Vec<([u8; 32], u64)>;
pub async fn drive_blob_migration_tick_with_manifest_resolver<A, F, M>(
local_caps: &CapabilitySet,
capability_index: &CapabilityIndex,
adapter: &A,
size_for_hash: F,
manifest_resolver: M,
) -> BlobMigrationTickReport
where
A: BlobAdapter + ?Sized,
F: Fn([u8; 32]) -> Option<u64>,
M: Fn([u8; 32]) -> Option<ManifestSiblings>,
{
let controller = BlobMigrationController::new(local_caps, capability_index);
let candidates = controller.candidates();
let mut report = BlobMigrationTickReport::default();
let mut already_prefetched: std::collections::HashSet<[u8; 32]> =
std::collections::HashSet::new();
let mut peer_admits: std::collections::HashMap<u64, usize> = std::collections::HashMap::new();
for candidate in candidates {
let size = match size_for_hash(candidate.hash) {
Some(s) => s,
None => {
report.skipped_unknown_size += 1;
continue;
}
};
if already_prefetched.contains(&candidate.hash) {
continue;
}
let verdict = should_migrate_blob_to(local_caps, &candidate.publisher_caps, size);
match verdict {
MigrateBlobVerdict::Admit => {
let count = peer_admits.entry(candidate.publisher_node_id).or_insert(0);
if *count >= DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK {
report.skipped_peer_budget += 1;
continue;
}
*count += 1;
let blob_ref = BlobRef::small(
format!("mesh://{}", hex32(&candidate.hash)),
candidate.hash,
size,
);
match adapter.prefetch(&blob_ref).await {
Ok(()) => {
report.admitted += 1;
already_prefetched.insert(candidate.hash);
}
Err(e) => {
tracing::trace!(
error = ?e,
hash = ?candidate.hash,
"blob migration: prefetch failed; counted"
);
report.prefetch_errors += 1;
}
}
let siblings = manifest_resolver(candidate.hash).unwrap_or_default();
for (sibling_hash, sibling_size) in siblings {
if already_prefetched.contains(&sibling_hash) {
continue;
}
let sibling_verdict =
should_migrate_blob_to(local_caps, &candidate.publisher_caps, sibling_size);
match sibling_verdict {
MigrateBlobVerdict::Admit => {
let count = peer_admits.entry(candidate.publisher_node_id).or_insert(0);
if *count >= DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK {
report.skipped_peer_budget += 1;
continue;
}
*count += 1;
let blob_ref = BlobRef::small(
format!("mesh://{}", hex32(&sibling_hash)),
sibling_hash,
sibling_size,
);
match adapter.prefetch(&blob_ref).await {
Ok(()) => {
report.admitted += 1;
already_prefetched.insert(sibling_hash);
}
Err(e) => {
tracing::trace!(
error = ?e,
hash = ?sibling_hash,
"blob migration: manifest sibling prefetch failed"
);
report.prefetch_errors += 1;
}
}
}
MigrateBlobVerdict::Reject(r) => report.record_reject(r),
}
}
}
MigrateBlobVerdict::Reject(r) => report.record_reject(r),
}
}
report
}
use super::hex32;
#[cfg(test)]
mod tests {
use super::super::error::BlobError;
use super::*;
use crate::adapter::net::behavior::tag::AxisSeparator;
use crate::adapter::net::behavior::TaxonomyAxis;
use crate::adapter::net::identity::EntityId;
fn hex64(seed: u8) -> ([u8; 32], String) {
let mut h = [0u8; 32];
h[0] = seed;
let mut s = String::with_capacity(64);
for b in &h {
use std::fmt::Write;
let _ = write!(s, "{:02x}", b);
}
(h, s)
}
#[test]
fn parse_blob_heat_tag_round_trip() {
let (h, hex) = hex64(0x42);
let tag = Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.75", hex),
};
let (got_hash, rate) = parse_blob_heat_tag(&tag).expect("must parse");
assert_eq!(got_hash, h);
assert!((rate - 0.75).abs() < 1e-6);
}
#[test]
fn parse_blob_heat_tag_rejects_chain_heat_shape() {
let chain_tag = Tag::Reserved {
prefix: "heat:".to_string(),
body: "deadbeefcafebabe=0.50".to_string(),
};
assert!(parse_blob_heat_tag(&chain_tag).is_none());
}
#[test]
fn parse_blob_heat_tag_rejects_axis_tag() {
let axis = Tag::AxisValue {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
value: "1".to_string(),
separator: AxisSeparator::Eq,
};
assert!(parse_blob_heat_tag(&axis).is_none());
}
#[test]
fn parse_blob_heat_tag_rejects_malformed_hex() {
let bad = Tag::Reserved {
prefix: "heat:".to_string(),
body: "blob:zzzz=0.50".to_string(),
};
assert!(parse_blob_heat_tag(&bad).is_none());
let short = Tag::Reserved {
prefix: "heat:".to_string(),
body: "blob:dead=0.50".to_string(),
};
assert!(parse_blob_heat_tag(&short).is_none());
}
#[test]
fn parse_blob_heat_tag_decode_matches_from_str_radix_byte_for_byte() {
let cases: [[u8; 32]; 4] = [
[0x00; 32],
[0xFF; 32],
{
let mut a = [0u8; 32];
for (i, b) in a.iter_mut().enumerate() {
*b = i as u8;
}
a
},
[
0xde, 0xad, 0xbe, 0xef, 0xca, 0xfe, 0xba, 0xbe, 0xf0, 0x0d, 0xfa, 0xce, 0x1b, 0xad,
0xd0, 0x0d, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0xfe, 0xdc, 0xba, 0x98,
0x76, 0x54, 0x32, 0x10,
],
];
for expected in &cases {
let mut hex = String::with_capacity(64);
for b in expected {
use std::fmt::Write as _;
write!(hex, "{:02x}", b).unwrap();
}
let tag = Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=1.00", hex),
};
let (got, _rate) = parse_blob_heat_tag(&tag).expect("must parse");
assert_eq!(&got, expected, "table-lookup decode must match legacy");
}
let bad = Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:g{}=1.00", "0".repeat(63)),
};
assert!(parse_blob_heat_tag(&bad).is_none());
}
#[test]
fn parse_blob_heat_tag_rejects_non_finite_rate() {
let (_h, hex) = hex64(0x01);
let nan_tag = Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=NaN", hex),
};
assert!(parse_blob_heat_tag(&nan_tag).is_none());
}
fn index_with_peer_heat(
peer_id: u64,
peer_caps: CapabilitySet,
peer_seed: u8,
) -> CapabilityIndex {
let index = CapabilityIndex::new();
let entity = EntityId::from_bytes([peer_seed; 32]);
let ann = crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
peer_id, entity, 1, peer_caps,
);
index.index(ann);
index
}
#[test]
fn controller_lists_one_candidate_per_blob_heat_tag() {
let (h1, hex1) = hex64(0x10);
let (h2, hex2) = hex64(0x20);
let mut peer_caps = CapabilitySet::default();
peer_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", hex1),
});
peer_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.30", hex2),
});
let index = index_with_peer_heat(99, peer_caps, 0xAA);
let local = CapabilitySet::default();
let controller = BlobMigrationController::new(&local, &index);
let candidates = controller.candidates();
assert_eq!(candidates.len(), 2);
assert!(candidates
.iter()
.any(|c| c.hash == h1 && (c.rate - 0.50).abs() < 1e-6));
assert!(candidates
.iter()
.any(|c| c.hash == h2 && (c.rate - 0.30).abs() < 1e-6));
assert!(candidates.iter().all(|c| c.publisher_node_id == 99));
}
#[test]
fn controller_ignores_chain_heat_shape_tags() {
let mut peer_caps = CapabilitySet::default();
peer_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: "deadbeefcafebabe=0.50".to_string(),
});
let index = index_with_peer_heat(7, peer_caps, 0xBB);
let local = CapabilitySet::default();
let candidates = BlobMigrationController::new(&local, &index).candidates();
assert!(candidates.is_empty());
}
struct PrefetchRecorder {
calls: std::sync::Arc<std::sync::atomic::AtomicU64>,
fail: bool,
}
impl PrefetchRecorder {
fn new() -> Self {
Self {
calls: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
fail: false,
}
}
fn failing() -> Self {
Self {
calls: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
fail: true,
}
}
}
#[async_trait::async_trait]
impl BlobAdapter for PrefetchRecorder {
fn adapter_id(&self) -> &str {
"test-prefetch-recorder"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
unreachable!()
}
async fn fetch(&self, _: &BlobRef) -> Result<bytes::Bytes, BlobError> {
unreachable!()
}
async fn fetch_range(
&self,
_: &BlobRef,
_: std::ops::Range<u64>,
) -> Result<bytes::Bytes, BlobError> {
unreachable!()
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
unreachable!()
}
async fn prefetch(&self, _: &BlobRef) -> Result<(), BlobError> {
self.calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if self.fail {
Err(BlobError::Backend("test prefetch failure".into()))
} else {
Ok(())
}
}
}
fn participating_local(scope: &str, proximity: u8, disk_free_gb: u64) -> CapabilitySet {
CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag(format!("dataforts.blob.disk_free_gb={}", disk_free_gb))
.add_tag("dataforts.gravity.enabled")
.add_tag(format!("dataforts.gravity.scope={}", scope))
.add_tag(format!("dataforts.gravity.proximity={}", proximity))
}
fn publisher_caps_with_heat(hash_seed: u8, scope: &str, rate: &str) -> CapabilitySet {
let (_h, hex) = hex64(hash_seed);
let mut caps = CapabilitySet::new()
.add_tag(format!("dataforts.gravity.scope={}", scope))
.add_tag(format!("dataforts.greedy.scope={}", scope));
caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}={}", hex, rate),
});
caps
}
#[tokio::test]
async fn drive_tick_admits_and_calls_prefetch_on_participating_local() {
let publisher_caps = publisher_caps_with_heat(0x10, "mesh", "0.75");
let index = index_with_peer_heat(99, publisher_caps, 0xAA);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(1024)).await;
assert_eq!(report.admitted, 1);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(report.total_rejected(), 0);
assert_eq!(report.prefetch_errors, 0);
}
#[tokio::test]
async fn drive_tick_rejects_when_local_lacks_blob_storage() {
let publisher_caps = publisher_caps_with_heat(0x20, "mesh", "0.50");
let index = index_with_peer_heat(50, publisher_caps, 0xBB);
let local = CapabilitySet::new()
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128");
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(1024)).await;
assert_eq!(report.admitted, 0);
assert_eq!(report.rejected_no_storage, 1);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 0);
}
#[tokio::test]
async fn drive_tick_skips_when_size_resolver_returns_none() {
let publisher_caps = publisher_caps_with_heat(0x30, "mesh", "0.40");
let index = index_with_peer_heat(50, publisher_caps, 0xCC);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| None).await;
assert_eq!(report.skipped_unknown_size, 1);
assert_eq!(report.admitted, 0);
assert_eq!(report.total_rejected(), 0);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 0);
}
#[tokio::test]
async fn drive_tick_counts_prefetch_errors_without_propagating() {
let publisher_caps = publisher_caps_with_heat(0x40, "mesh", "0.90");
let index = index_with_peer_heat(50, publisher_caps, 0xDD);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::failing();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(1024)).await;
assert_eq!(report.admitted, 0);
assert_eq!(report.prefetch_errors, 1);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 1);
}
#[test]
fn cross_advertiser_scope_is_floored_to_narrowest_claim() {
let (hash, hex) = hex64(0xCA);
let mut publisher = CapabilitySet::new()
.with_gravity_capability(crate::adapter::net::behavior::GravityCapability {
enabled: true,
scope: TopologyScope::Zone,
proximity: 128,
})
.with_greedy_capability(crate::adapter::net::behavior::GreedyCapability {
enabled: true,
scope: TopologyScope::Zone,
proximity: 128,
});
publisher.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", hex),
});
let mut cache_holder = CapabilitySet::new()
.with_gravity_capability(crate::adapter::net::behavior::GravityCapability {
enabled: true,
scope: TopologyScope::Mesh,
proximity: 128,
})
.with_greedy_capability(crate::adapter::net::behavior::GreedyCapability {
enabled: true,
scope: TopologyScope::Mesh,
proximity: 128,
});
cache_holder.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.40", hex),
});
let index = CapabilityIndex::new();
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
10,
EntityId::from_bytes([0xAA; 32]),
1,
publisher,
),
);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
20,
EntityId::from_bytes([0xBB; 32]),
1,
cache_holder,
),
);
let local = CapabilitySet::default();
let controller = BlobMigrationController::new(&local, &index);
let candidates = controller.candidates();
assert_eq!(candidates.len(), 2);
for c in &candidates {
assert_eq!(c.hash, hash);
let gravity = GravityCapability::from_capability_set(&c.publisher_caps);
let greedy = GreedyCapability::from_capability_set(&c.publisher_caps);
assert_eq!(
gravity.scope,
TopologyScope::Zone,
"gravity scope floors to narrowest across all advertisers"
);
assert_eq!(
greedy.scope,
TopologyScope::Zone,
"greedy scope floors to narrowest across all advertisers"
);
}
}
#[test]
fn scope_narrowing_ignores_unparticipating_peers() {
let (_, hex) = hex64(0xCB);
let mut publisher = CapabilitySet::new().with_gravity_capability(
crate::adapter::net::behavior::GravityCapability {
enabled: true,
scope: TopologyScope::Zone,
proximity: 128,
},
);
publisher.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", hex),
});
let mut tag_only = CapabilitySet::new();
tag_only.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Dataforts,
key: "gravity.scope".to_string(),
value: "node".to_string(),
separator: AxisSeparator::Eq,
});
tag_only.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.30", hex),
});
let index = CapabilityIndex::new();
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
30,
EntityId::from_bytes([0xCC; 32]),
1,
publisher,
),
);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
40,
EntityId::from_bytes([0xDD; 32]),
1,
tag_only,
),
);
let local = CapabilitySet::default();
let controller = BlobMigrationController::new(&local, &index);
let candidates = controller.candidates();
for c in &candidates {
let gravity = GravityCapability::from_capability_set(&c.publisher_caps);
assert_eq!(
gravity.scope,
TopologyScope::Zone,
"unparticipating peer's tag must not narrow the floor"
);
}
}
#[tokio::test]
async fn drive_tick_caps_per_peer_admits_at_budget() {
let mut publisher_caps = CapabilitySet::new()
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.greedy.scope=mesh");
let flood = DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK * 2;
for i in 0..flood {
let mut hash = [0u8; 32];
hash[0..8].copy_from_slice(&(i as u64).to_le_bytes());
let mut hex = String::with_capacity(64);
for b in &hash {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
publisher_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", hex),
});
}
let index = index_with_peer_heat(77, publisher_caps, 0x77);
let local = participating_local("mesh", 128, 1024);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(1024)).await;
assert_eq!(
report.admitted as usize, DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK,
"per-peer admit budget caps the prefetch fan-out"
);
assert_eq!(
calls.load(std::sync::atomic::Ordering::Relaxed) as usize,
DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK
);
assert_eq!(
report.skipped_peer_budget as usize,
flood - DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK,
"overflow tags route into skipped_peer_budget"
);
}
#[tokio::test]
async fn drive_tick_per_peer_budgets_are_independent() {
let make_caps_with_heat = |scope: &str, seed_base: u8, count: usize| {
let mut caps = CapabilitySet::new()
.add_tag(format!("dataforts.gravity.scope={}", scope))
.add_tag(format!("dataforts.greedy.scope={}", scope));
for i in 0..count {
let mut hash = [0u8; 32];
hash[0] = seed_base;
hash[1..9].copy_from_slice(&(i as u64).to_le_bytes());
let mut hex = String::with_capacity(64);
for b in &hash {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", hex),
});
}
caps
};
let flood = DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK + 4;
let index = CapabilityIndex::new();
let entity_a = EntityId::from_bytes([0xA1; 32]);
let entity_b = EntityId::from_bytes([0xB2; 32]);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
111,
entity_a,
1,
make_caps_with_heat("mesh", 0xA0, flood),
),
);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
222,
entity_b,
1,
make_caps_with_heat("mesh", 0xB0, flood),
),
);
let local = participating_local("mesh", 128, 1024);
let adapter = PrefetchRecorder::new();
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(1024)).await;
assert_eq!(
report.admitted as usize,
2 * DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK,
"two peers each hit their own budget independently"
);
assert_eq!(
report.skipped_peer_budget as usize,
2 * (flood - DEFAULT_MIGRATION_PER_PEER_BUDGET_PER_TICK),
);
}
#[tokio::test]
async fn drive_tick_rejects_when_disk_free_insufficient() {
let publisher_caps = publisher_caps_with_heat(0x50, "mesh", "0.50");
let index = index_with_peer_heat(50, publisher_caps, 0xEE);
let local = participating_local("mesh", 128, 1);
let adapter = PrefetchRecorder::new();
let four_gib: u64 = 4 * (1 << 30);
let report = drive_blob_migration_tick(&local, &index, &adapter, |_| Some(four_gib)).await;
assert_eq!(report.admitted, 0);
assert_eq!(report.rejected_insufficient_disk, 1);
}
#[tokio::test]
async fn manifest_resolver_prefetches_every_sibling_chunk() {
let publisher_caps = publisher_caps_with_heat(0x60, "mesh", "0.50");
let index = index_with_peer_heat(50, publisher_caps, 0xFF);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let (head_hash, _) = hex64(0x60);
let (s1, _) = hex64(0x61);
let (s2, _) = hex64(0x62);
let (s3, _) = hex64(0x63);
let siblings_for_head = vec![(s1, 1024), (s2, 2048), (s3, 4096)];
let report = drive_blob_migration_tick_with_manifest_resolver(
&local,
&index,
&adapter,
|_h| Some(1024),
|h| {
if h == head_hash {
Some(siblings_for_head.clone())
} else {
None
}
},
)
.await;
assert_eq!(report.admitted, 4, "head + 3 siblings = 4 admits");
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 4);
assert_eq!(report.prefetch_errors, 0);
}
#[tokio::test]
async fn manifest_resolver_dedups_overlapping_sibling_and_candidate_lists() {
let (head_hash, head_hex) = hex64(0x70);
let (sibling_hash, sibling_hex) = hex64(0x71);
let mut publisher = CapabilitySet::new()
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.greedy.scope=mesh");
publisher.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.40", head_hex),
});
publisher.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.30", sibling_hex),
});
let index = index_with_peer_heat(99, publisher, 0xAB);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick_with_manifest_resolver(
&local,
&index,
&adapter,
|_h| Some(1024),
move |h| {
if h == head_hash {
Some(vec![(head_hash, 1024), (sibling_hash, 2048)])
} else {
None
}
},
)
.await;
assert_eq!(
calls.load(std::sync::atomic::Ordering::Relaxed),
2,
"dedup must collapse head + already-advertised sibling to 2 distinct prefetches"
);
assert_eq!(report.admitted, 2);
}
#[tokio::test]
async fn manifest_resolver_none_falls_through_to_single_chunk_path() {
let publisher_caps = publisher_caps_with_heat(0x80, "mesh", "0.50");
let index = index_with_peer_heat(50, publisher_caps, 0xCD);
let local = participating_local("mesh", 128, 50);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick_with_manifest_resolver(
&local,
&index,
&adapter,
|_h| Some(1024),
|_h| None,
)
.await;
assert_eq!(report.admitted, 1);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 1);
}
#[tokio::test]
async fn manifest_sibling_rejection_routes_into_per_reason_counters() {
let publisher_caps = publisher_caps_with_heat(0x90, "mesh", "0.50");
let index = index_with_peer_heat(50, publisher_caps, 0xEF);
let local = participating_local("mesh", 128, 1); let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let (head_hash, _) = hex64(0x90);
let (sibling_hash, _) = hex64(0x91);
let four_gib: u64 = 4 * (1 << 30);
let report = drive_blob_migration_tick_with_manifest_resolver(
&local,
&index,
&adapter,
move |h| {
if h == sibling_hash {
Some(four_gib) } else {
Some(1024)
}
},
move |h| {
if h == head_hash {
Some(vec![(sibling_hash, four_gib)])
} else {
None
}
},
)
.await;
assert_eq!(report.admitted, 1, "head admits");
assert_eq!(
report.rejected_insufficient_disk, 1,
"sibling rejects on disk gate"
);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 1);
}
#[tokio::test]
async fn rejected_sibling_stays_reconsiderable_across_candidates() {
let (a_top_hash, a_hex) = hex64(0xC1);
let (b_top_hash, b_hex) = hex64(0xC2);
let (shared_sibling_hash, _shared_hex) = hex64(0xC3);
let mut a_caps = CapabilitySet::new()
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.greedy.scope=mesh");
a_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", a_hex),
});
let mut b_caps = CapabilitySet::new()
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.greedy.scope=mesh");
b_caps.tags.insert(Tag::Reserved {
prefix: "heat:".to_string(),
body: format!("blob:{}=0.50", b_hex),
});
let index = CapabilityIndex::new();
let entity_a = EntityId::from_bytes([0x11; 32]);
let entity_b = EntityId::from_bytes([0x22; 32]);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
100, entity_a, 1, a_caps,
),
);
index.index(
crate::adapter::net::behavior::capability::CapabilityAnnouncement::new(
200, entity_b, 1, b_caps,
),
);
let local = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag("dataforts.blob.disk_free_gb=1")
.add_tag("dataforts.gravity.enabled")
.add_tag("dataforts.gravity.scope=mesh")
.add_tag("dataforts.gravity.proximity=128");
let four_gib: u64 = 4 * (1 << 30);
let adapter = PrefetchRecorder::new();
let calls = adapter.calls.clone();
let report = drive_blob_migration_tick_with_manifest_resolver(
&local,
&index,
&adapter,
move |h| {
if h == shared_sibling_hash {
Some(four_gib) } else {
Some(1024) }
},
move |h| {
if h == a_top_hash || h == b_top_hash {
Some(vec![(shared_sibling_hash, four_gib)])
} else {
None
}
},
)
.await;
assert_eq!(report.admitted, 2);
assert_eq!(calls.load(std::sync::atomic::Ordering::Relaxed), 2);
assert_eq!(
report.rejected_insufficient_disk, 2,
"rejected siblings must remain reconsiderable across candidates; \
pre-fix would have been 1 (dedup ate the second expansion)"
);
}
}