use super::error::BlobError;
use crate::adapter::net::behavior::{
is_blob_storage_unhealthy, BlobCapability, CapabilitySet, GravityCapability, GreedyCapability,
TopologyScope,
};
use crate::adapter::net::channel::{AuthGuard, ChannelName};
pub fn should_pull_blob(
local_caps: &CapabilitySet,
publisher_caps: &CapabilitySet,
) -> PullBlobVerdict {
let local_blob = BlobCapability::from_capability_set(local_caps);
let local_greedy = GreedyCapability::from_capability_set(local_caps);
if !local_blob.storage {
return PullBlobVerdict::Reject(PullBlobReject::NoStorageCap);
}
if !local_greedy.enabled {
return PullBlobVerdict::Reject(PullBlobReject::GreedyDisabled);
}
if local_greedy.proximity == 0 {
return PullBlobVerdict::Reject(PullBlobReject::ProximityZero);
}
if is_blob_storage_unhealthy(local_caps) {
return PullBlobVerdict::Reject(PullBlobReject::Unhealthy);
}
if !scope_allows_cross(local_greedy.scope, publisher_caps) {
return PullBlobVerdict::Reject(PullBlobReject::ScopeMismatch);
}
PullBlobVerdict::Admit
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PullBlobVerdict {
Admit,
Reject(PullBlobReject),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PullBlobReject {
NoStorageCap,
GreedyDisabled,
ProximityZero,
Unhealthy,
ScopeMismatch,
}
pub fn should_migrate_blob_to(
target_caps: &CapabilitySet,
publisher_caps: &CapabilitySet,
blob_size_bytes: u64,
) -> MigrateBlobVerdict {
let target_blob = BlobCapability::from_capability_set(target_caps);
let target_gravity = GravityCapability::from_capability_set(target_caps);
if !target_blob.storage {
return MigrateBlobVerdict::Reject(MigrateBlobReject::NoStorageCap);
}
if !target_gravity.enabled {
return MigrateBlobVerdict::Reject(MigrateBlobReject::GravityDisabled);
}
if target_gravity.proximity == 0 {
return MigrateBlobVerdict::Reject(MigrateBlobReject::ProximityZero);
}
if is_blob_storage_unhealthy(target_caps) {
return MigrateBlobVerdict::Reject(MigrateBlobReject::Unhealthy);
}
if !scope_allows_cross(target_gravity.scope, publisher_caps) {
return MigrateBlobVerdict::Reject(MigrateBlobReject::ScopeMismatch);
}
let required_gb = blob_size_bytes.div_ceil(1 << 30);
if target_blob.disk_free_gb < required_gb {
return MigrateBlobVerdict::Reject(MigrateBlobReject::InsufficientDisk);
}
MigrateBlobVerdict::Admit
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MigrateBlobVerdict {
Admit,
Reject(MigrateBlobReject),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MigrateBlobReject {
NoStorageCap,
GravityDisabled,
ProximityZero,
Unhealthy,
ScopeMismatch,
InsufficientDisk,
}
pub fn should_accept_overflow_from(
local_caps: &CapabilitySet,
sender_caps: &CapabilitySet,
blob_size_bytes: u64,
) -> OverflowVerdict {
let local_blob = BlobCapability::from_capability_set(local_caps);
let sender_blob = BlobCapability::from_capability_set(sender_caps);
let local_gravity = GravityCapability::from_capability_set(local_caps);
if !local_blob.storage {
return OverflowVerdict::Reject(OverflowReject::NoStorageCap);
}
if !local_blob.overflow_enabled {
return OverflowVerdict::Reject(OverflowReject::NotParticipating);
}
if !sender_blob.overflow_enabled {
return OverflowVerdict::Reject(OverflowReject::SenderNotOverflowing);
}
if is_blob_storage_unhealthy(local_caps) {
return OverflowVerdict::Reject(OverflowReject::Unhealthy);
}
if !scope_allows_cross(local_gravity.scope, sender_caps) {
return OverflowVerdict::Reject(OverflowReject::ScopeMismatch);
}
let effective_size = blob_size_bytes.max(super::blob_ref::BLOB_CHUNK_SIZE_BYTES);
let required_gb = effective_size.div_ceil(1 << 30);
if local_blob.disk_free_gb < required_gb {
return OverflowVerdict::Reject(OverflowReject::InsufficientDisk);
}
OverflowVerdict::Admit
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OverflowVerdict {
Admit,
Reject(OverflowReject),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum OverflowReject {
NoStorageCap,
NotParticipating,
SenderNotOverflowing,
Unhealthy,
ScopeMismatch,
InsufficientDisk,
}
pub fn auth_allows_blob_op(
guard: &AuthGuard,
origin_hash: u64,
channel: &ChannelName,
) -> Result<(), BlobError> {
if guard.is_authorized_full(origin_hash, channel) {
Ok(())
} else {
Err(BlobError::Unauthorized(format!(
"origin {:#x} not authorized",
origin_hash
)))
}
}
fn scope_allows_cross(local_scope: TopologyScope, publisher_caps: &CapabilitySet) -> bool {
if matches!(local_scope, TopologyScope::Mesh) {
return true;
}
let pub_greedy = GreedyCapability::from_capability_set(publisher_caps);
let pub_gravity = GravityCapability::from_capability_set(publisher_caps);
let candidate_scopes = [pub_greedy.scope, pub_gravity.scope];
candidate_scopes
.iter()
.any(|s| scope_at_least_as_narrow(local_scope, *s))
}
fn scope_at_least_as_narrow(local: TopologyScope, publisher: TopologyScope) -> bool {
use TopologyScope::*;
matches!(
(local, publisher),
(Node, _) | (Zone, Zone | Region | Mesh) | (Region, Region | Mesh) | (Mesh, Mesh)
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::CapabilitySet;
fn participating_local_node(scope: TopologyScope, proximity: u8) -> CapabilitySet {
let scope_str = match scope {
TopologyScope::Node => "node",
TopologyScope::Zone => "zone",
TopologyScope::Region => "region",
TopologyScope::Mesh => "mesh",
};
CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag("dataforts.blob.disk_free_gb=50")
.add_tag("dataforts.greedy.enabled")
.add_tag(format!("dataforts.greedy.scope={}", scope_str))
.add_tag(format!("dataforts.greedy.proximity={}", proximity))
}
fn participating_gravity_node(
scope: TopologyScope,
proximity: u8,
disk_free_gb: u64,
) -> CapabilitySet {
let scope_str = match scope {
TopologyScope::Node => "node",
TopologyScope::Zone => "zone",
TopologyScope::Region => "region",
TopologyScope::Mesh => "mesh",
};
CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_total_gb=100")
.add_tag(format!("dataforts.blob.disk_free_gb={}", disk_free_gb))
.add_tag("dataforts.gravity.enabled")
.add_tag(format!("dataforts.gravity.scope={}", scope_str))
.add_tag(format!("dataforts.gravity.proximity={}", proximity))
}
fn publisher_with_mesh_scope() -> CapabilitySet {
CapabilitySet::new()
.add_tag("dataforts.greedy.scope=mesh")
.add_tag("dataforts.gravity.scope=mesh")
}
fn publisher_with_scope(scope: TopologyScope) -> CapabilitySet {
let scope_str = match scope {
TopologyScope::Node => "node",
TopologyScope::Zone => "zone",
TopologyScope::Region => "region",
TopologyScope::Mesh => "mesh",
};
CapabilitySet::new()
.add_tag(format!("dataforts.greedy.scope={}", scope_str))
.add_tag(format!("dataforts.gravity.scope={}", scope_str))
}
#[test]
fn pull_admits_participating_local_with_mesh_publisher() {
let local = participating_local_node(TopologyScope::Mesh, 128);
let publisher = publisher_with_mesh_scope();
assert_eq!(should_pull_blob(&local, &publisher), PullBlobVerdict::Admit);
}
#[test]
fn pull_rejects_no_storage_cap() {
let local = CapabilitySet::new(); let publisher = publisher_with_mesh_scope();
assert_eq!(
should_pull_blob(&local, &publisher),
PullBlobVerdict::Reject(PullBlobReject::NoStorageCap)
);
}
#[test]
fn pull_rejects_greedy_disabled() {
let local = CapabilitySet::new().add_tag("dataforts.blob.storage");
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_pull_blob(&local, &publisher),
PullBlobVerdict::Reject(PullBlobReject::GreedyDisabled)
);
}
#[test]
fn pull_rejects_proximity_zero() {
let local = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.greedy.enabled")
.add_tag("dataforts.greedy.scope=mesh");
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_pull_blob(&local, &publisher),
PullBlobVerdict::Reject(PullBlobReject::ProximityZero)
);
}
#[test]
fn pull_rejects_unhealthy_local() {
let mut local = participating_local_node(TopologyScope::Mesh, 128);
local
.tags
.insert(crate::adapter::net::behavior::Tag::Reserved {
prefix: "dataforts:".to_owned(),
body: "blob-storage-unhealthy".to_owned(),
});
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_pull_blob(&local, &publisher),
PullBlobVerdict::Reject(PullBlobReject::Unhealthy)
);
}
#[test]
fn pull_admits_when_local_zone_and_publisher_mesh_covers_it() {
let local = participating_local_node(TopologyScope::Zone, 128);
let publisher = publisher_with_scope(TopologyScope::Mesh);
assert_eq!(should_pull_blob(&local, &publisher), PullBlobVerdict::Admit);
}
#[test]
fn pull_admits_when_local_zone_and_publisher_makes_no_scope_claim() {
let local = participating_local_node(TopologyScope::Zone, 128);
let publisher = CapabilitySet::new();
assert_eq!(should_pull_blob(&local, &publisher), PullBlobVerdict::Admit);
}
#[test]
fn migrate_admits_target_with_disk_and_caps() {
let target = participating_gravity_node(TopologyScope::Mesh, 128, 100);
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_migrate_blob_to(&target, &publisher, 1024),
MigrateBlobVerdict::Admit
);
}
#[test]
fn migrate_rejects_no_blob_storage() {
let target = CapabilitySet::new().add_tag("dataforts.gravity.enabled");
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_migrate_blob_to(&target, &publisher, 1024),
MigrateBlobVerdict::Reject(MigrateBlobReject::NoStorageCap)
);
}
#[test]
fn migrate_rejects_gravity_disabled() {
let target = CapabilitySet::new()
.add_tag("dataforts.blob.storage")
.add_tag("dataforts.blob.disk_free_gb=100");
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_migrate_blob_to(&target, &publisher, 1024),
MigrateBlobVerdict::Reject(MigrateBlobReject::GravityDisabled)
);
}
#[test]
fn migrate_rejects_insufficient_disk() {
let target = participating_gravity_node(TopologyScope::Mesh, 128, 2);
let publisher = publisher_with_mesh_scope();
let ten_gib: u64 = 10 * (1 << 30);
assert_eq!(
should_migrate_blob_to(&target, &publisher, ten_gib),
MigrateBlobVerdict::Reject(MigrateBlobReject::InsufficientDisk)
);
}
#[test]
fn migrate_disk_check_rounds_up() {
let target = participating_gravity_node(TopologyScope::Mesh, 128, 1);
let publisher = publisher_with_mesh_scope();
let one_and_a_half_gib: u64 = (1 << 30) + (1 << 29);
assert_eq!(
should_migrate_blob_to(&target, &publisher, one_and_a_half_gib),
MigrateBlobVerdict::Reject(MigrateBlobReject::InsufficientDisk)
);
let target2 = participating_gravity_node(TopologyScope::Mesh, 128, 2);
assert_eq!(
should_migrate_blob_to(&target2, &publisher, one_and_a_half_gib),
MigrateBlobVerdict::Admit
);
}
#[test]
fn migrate_rejects_unhealthy_target() {
let mut target = participating_gravity_node(TopologyScope::Mesh, 128, 100);
target
.tags
.insert(crate::adapter::net::behavior::Tag::Reserved {
prefix: "dataforts:".to_owned(),
body: "blob-storage-unhealthy".to_owned(),
});
let publisher = publisher_with_mesh_scope();
assert_eq!(
should_migrate_blob_to(&target, &publisher, 1024),
MigrateBlobVerdict::Reject(MigrateBlobReject::Unhealthy)
);
}
#[test]
fn auth_admits_when_origin_authorized_for_channel() {
let guard = AuthGuard::new();
let origin = 0xDEAD_BEEF_u64;
let channel = ChannelName::new("dataforts/test/auth").unwrap();
guard.allow_channel(origin, &channel);
assert!(auth_allows_blob_op(&guard, origin, &channel).is_ok());
}
#[test]
fn auth_rejects_when_origin_not_authorized() {
let guard = AuthGuard::new();
let channel = ChannelName::new("dataforts/test/auth").unwrap();
let err = auth_allows_blob_op(&guard, 0xDEAD, &channel).unwrap_err();
assert!(matches!(err, BlobError::Unauthorized(_)));
}
#[test]
fn auth_rejects_when_origin_authorized_for_different_channel() {
let guard = AuthGuard::new();
let allowed = ChannelName::new("allowed/channel").unwrap();
let other = ChannelName::new("other/channel").unwrap();
let origin = 0xC0FFEE_u64;
guard.allow_channel(origin, &allowed);
let err = auth_allows_blob_op(&guard, origin, &other).unwrap_err();
assert!(matches!(err, BlobError::Unauthorized(_)));
}
#[test]
fn scope_node_is_narrowest() {
use TopologyScope::*;
assert!(scope_at_least_as_narrow(Node, Node));
assert!(scope_at_least_as_narrow(Node, Zone));
assert!(scope_at_least_as_narrow(Node, Region));
assert!(scope_at_least_as_narrow(Node, Mesh));
}
#[test]
fn scope_zone_admits_zone_region_mesh() {
use TopologyScope::*;
assert!(scope_at_least_as_narrow(Zone, Zone));
assert!(scope_at_least_as_narrow(Zone, Region));
assert!(scope_at_least_as_narrow(Zone, Mesh));
assert!(!scope_at_least_as_narrow(Zone, Node));
}
#[test]
fn scope_mesh_only_admits_mesh() {
use TopologyScope::*;
assert!(scope_at_least_as_narrow(Mesh, Mesh));
assert!(!scope_at_least_as_narrow(Mesh, Region));
assert!(!scope_at_least_as_narrow(Mesh, Zone));
assert!(!scope_at_least_as_narrow(Mesh, Node));
}
#[test]
fn arc_authguard_compiles() {
use std::sync::Arc;
let guard: Arc<AuthGuard> = Arc::new(AuthGuard::new());
let channel = ChannelName::new("dataforts/test").unwrap();
let _ = auth_allows_blob_op(&guard, 0, &channel);
}
fn overflow_enabled_node(scope: TopologyScope, disk_free_gb: u64) -> CapabilitySet {
participating_gravity_node(scope, 128, disk_free_gb).add_tag("dataforts.blob.overflow")
}
fn overflow_enabled_sender(scope: TopologyScope) -> CapabilitySet {
publisher_with_scope(scope).add_tag("dataforts.blob.overflow")
}
#[test]
fn overflow_admits_when_both_sides_opted_in() {
let local = overflow_enabled_node(TopologyScope::Mesh, 100);
let sender = overflow_enabled_sender(TopologyScope::Mesh);
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Admit
);
}
#[test]
fn overflow_rejects_when_local_has_no_storage_cap() {
let local = CapabilitySet::new().add_tag("dataforts.blob.overflow");
let sender = overflow_enabled_sender(TopologyScope::Mesh);
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::NoStorageCap)
);
}
#[test]
fn overflow_rejects_when_local_not_participating() {
let local = participating_gravity_node(TopologyScope::Mesh, 128, 100);
let sender = overflow_enabled_sender(TopologyScope::Mesh);
assert!(!BlobCapability::from_capability_set(&local).overflow_enabled);
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::NotParticipating)
);
}
#[test]
fn overflow_rejects_when_sender_not_overflowing() {
let local = overflow_enabled_node(TopologyScope::Mesh, 100);
let sender = publisher_with_mesh_scope();
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::SenderNotOverflowing)
);
}
#[test]
fn overflow_rejects_when_local_unhealthy() {
let mut local = overflow_enabled_node(TopologyScope::Mesh, 100);
local
.tags
.insert(crate::adapter::net::behavior::Tag::Reserved {
prefix: "dataforts:".to_owned(),
body: "blob-storage-unhealthy".to_owned(),
});
let sender = overflow_enabled_sender(TopologyScope::Mesh);
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::Unhealthy)
);
}
#[test]
fn overflow_rejects_when_sender_scope_outside_local() {
let local = overflow_enabled_node(TopologyScope::Zone, 100);
let sender = CapabilitySet::new()
.add_tag("dataforts.gravity.scope=node")
.add_tag("dataforts.greedy.scope=node")
.add_tag("dataforts.blob.overflow");
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::ScopeMismatch)
);
}
#[test]
fn overflow_rejects_when_insufficient_disk() {
let local = overflow_enabled_node(TopologyScope::Mesh, 2);
let sender = overflow_enabled_sender(TopologyScope::Mesh);
let ten_gib: u64 = 10 * (1 << 30);
assert_eq!(
should_accept_overflow_from(&local, &sender, ten_gib),
OverflowVerdict::Reject(OverflowReject::InsufficientDisk)
);
}
#[test]
fn overflow_disk_gate_rounds_up() {
let one_and_a_half_gib: u64 = (1 << 30) + (1 << 29);
let sender = overflow_enabled_sender(TopologyScope::Mesh);
let tight = overflow_enabled_node(TopologyScope::Mesh, 1);
assert_eq!(
should_accept_overflow_from(&tight, &sender, one_and_a_half_gib),
OverflowVerdict::Reject(OverflowReject::InsufficientDisk)
);
let loose = overflow_enabled_node(TopologyScope::Mesh, 2);
assert_eq!(
should_accept_overflow_from(&loose, &sender, one_and_a_half_gib),
OverflowVerdict::Admit
);
}
#[test]
fn overflow_reject_ordering_storage_before_overflow_opt_in() {
let local = CapabilitySet::new(); let sender = overflow_enabled_sender(TopologyScope::Mesh);
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::NoStorageCap)
);
}
#[test]
fn overflow_reject_ordering_local_overflow_before_sender_overflow() {
let local = participating_gravity_node(TopologyScope::Mesh, 128, 100);
let sender = publisher_with_mesh_scope();
assert_eq!(
should_accept_overflow_from(&local, &sender, 1024),
OverflowVerdict::Reject(OverflowReject::NotParticipating)
);
}
}