#![cfg(feature = "dataforts")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use net::adapter::net::behavior::capability::CapabilitySet;
use net::adapter::net::behavior::placement::IntentRegistry;
use net::adapter::net::behavior::tag::Tag;
use net::adapter::net::channel::{ChannelName, ChannelPublisher, PublishConfig};
use net::adapter::net::dataforts::{
synthesize_cache_channel_name, DataGravityPolicy, GreedyConfig, IntentMatchPolicy,
};
use net::adapter::net::redex::Redex;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id).await.expect("connect");
accept.await.expect("accept task").expect("accept");
a.start();
b.start();
let _ = a_id;
let _ = b_id;
}
fn cn(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
fn has_heat_tag(caps: &CapabilitySet, hex: &str) -> bool {
caps.tags.iter().any(|t| match t {
Tag::Reserved { prefix, body } if prefix == "heat:" => {
body.starts_with(hex) && matches!(body.as_bytes().get(hex.len()), Some(b'='))
}
_ => false,
})
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn read_hot_chain_emits_heat_tag_into_local_caps() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_b = Arc::new(Redex::new());
let cfg = GreedyConfig::default().with_intent_match(IntentMatchPolicy::Disabled);
redex_b
.enable_greedy_dataforts(
node_b.clone(),
cfg,
Arc::new(CapabilitySet::default()),
IntentRegistry::new(),
)
.expect("enable greedy");
redex_b
.enable_gravity_for_greedy(
node_b.clone(),
DataGravityPolicy::default(),
Duration::from_millis(50),
)
.expect("enable gravity");
let name = cn("dataforts/test/hot-chain");
node_b
.subscribe_channel(node_a.node_id(), name.clone())
.await
.expect("subscribe");
let publisher = ChannelPublisher::new(name.clone(), PublishConfig::default());
const N: u64 = 4;
for i in 0..N {
node_a
.publish(&publisher, Bytes::from(format!("event-{i}")))
.await
.expect("publish");
}
let runtime = redex_b.greedy_runtime().expect("runtime");
let synth = synthesize_cache_channel_name(name.wire_hash());
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
while tokio::time::Instant::now() < deadline {
if runtime.contains(&synth) && runtime.cached_bytes() >= N {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
runtime.contains(&synth),
"cache must populate before heat reads"
);
for _ in 0..16 {
let _ = redex_b.greedy_cache_for(&name);
}
let _entry_present = runtime.cache_file(&synth).map(|_| ());
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut observed_heat = false;
while tokio::time::Instant::now() < deadline {
for _ in 0..8 {
let _ = redex_b.greedy_cache_for(&name);
}
runtime.gravity_tick().await;
let b_caps_self = node_b.test_capability_fold_get(node_b.node_id());
if b_caps_self
.tags
.iter()
.any(|t| matches!(t, Tag::Reserved { prefix, .. } if prefix == "heat:"))
{
observed_heat = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
observed_heat,
"B's self-view of its own caps must carry a heat: tag after \
read-driven bumps + a gravity_tick"
);
redex_b.disable_gravity_for_greedy();
redex_b.disable_greedy_dataforts();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn greedy_without_gravity_emits_no_heat_tags() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_b = Arc::new(Redex::new());
redex_b
.enable_greedy_dataforts(
node_b.clone(),
GreedyConfig::default().with_intent_match(IntentMatchPolicy::Disabled),
Arc::new(CapabilitySet::default()),
IntentRegistry::new(),
)
.expect("enable greedy");
let runtime = redex_b.greedy_runtime().expect("runtime");
assert!(!runtime.gravity_enabled());
let name = cn("dataforts/test/no-gravity");
node_b
.subscribe_channel(node_a.node_id(), name.clone())
.await
.expect("subscribe");
let publisher = ChannelPublisher::new(name.clone(), PublishConfig::default());
for i in 0..4 {
node_a
.publish(&publisher, Bytes::from(format!("event-{i}")))
.await
.expect("publish");
}
let synth = synthesize_cache_channel_name(name.wire_hash());
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if runtime.contains(&synth) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
for _ in 0..16 {
let _ = redex_b.greedy_cache_for(&name);
}
tokio::time::sleep(Duration::from_millis(500)).await;
let b_caps_self = node_b.test_capability_fold_get(node_b.node_id());
let has_heat = b_caps_self
.tags
.iter()
.any(|t| matches!(t, Tag::Reserved { prefix, .. } if prefix == "heat:"));
assert!(
!has_heat,
"gravity-disabled greedy must not emit any heat: tag"
);
redex_b.disable_greedy_dataforts();
}
#[allow(dead_code)]
fn _force_use_has_heat_tag() {
let _ = has_heat_tag(&CapabilitySet::default(), "deadbeef");
}