net-mesh 0.24.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
//! End-to-end integration tests for the data-gravity layer
//! (DATAFORTS_PLAN § Phase 4).
//!
//! Wires two `MeshNode`s + a `Redex` on the observer side. Node A
//! is the publisher; Node B has greedy + gravity enabled. The
//! tests assert that reads on B drive heat emissions, which
//! propagate via the capability-announcement bus and surface in
//! A's local capability index as `heat:<hex>=<rate>` reserved
//! tags.
//!
//! Run: `cargo test --features dataforts --test dataforts_gravity_e2e`

#![cfg(feature = "dataforts")]

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;

use net::adapter::net::behavior::capability::CapabilitySet;
use net::adapter::net::behavior::placement::IntentRegistry;
use net::adapter::net::behavior::tag::Tag;
use net::adapter::net::channel::{ChannelName, ChannelPublisher, PublishConfig};
use net::adapter::net::dataforts::{
    synthesize_cache_channel_name, DataGravityPolicy, GreedyConfig, IntentMatchPolicy,
};
use net::adapter::net::redex::Redex;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};

const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];

fn test_config() -> MeshNodeConfig {
    let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
    let mut cfg = MeshNodeConfig::new(addr, PSK)
        .with_heartbeat_interval(Duration::from_millis(200))
        .with_session_timeout(Duration::from_secs(5))
        .with_handshake(3, Duration::from_secs(2))
        .with_capability_gc_interval(Duration::from_millis(250));
    cfg.socket_buffers = SocketBufferConfig {
        send_buffer_size: TEST_BUFFER_SIZE,
        recv_buffer_size: TEST_BUFFER_SIZE,
    };
    cfg
}

async fn build_node() -> Arc<MeshNode> {
    let cfg = test_config();
    let keypair = EntityKeypair::generate();
    Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}

async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
    let a_id = a.node_id();
    let b_id = b.node_id();
    let b_pub = *b.public_key();
    let b_addr = b.local_addr();
    let b_clone = b.clone();
    let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
    a.connect(b_addr, &b_pub, b_id).await.expect("connect");
    accept.await.expect("accept task").expect("accept");
    a.start();
    b.start();
    let _ = a_id;
    let _ = b_id;
}

fn cn(s: &str) -> ChannelName {
    ChannelName::new(s).unwrap()
}

/// Returns `true` iff `caps` carries any `heat:<hex>=*` tag for
/// the supplied lowercase 16-hex `origin_hash`.
fn has_heat_tag(caps: &CapabilitySet, hex: &str) -> bool {
    caps.tags.iter().any(|t| match t {
        Tag::Reserved { prefix, body } if prefix == "heat:" => {
            body.starts_with(hex) && matches!(body.as_bytes().get(hex.len()), Some(b'='))
        }
        _ => false,
    })
}

/// B's reads on a cached chain drive heat emissions which land
/// in B's own capability set as `heat:<hex>=<rate>` reserved
/// tags. End-to-end demonstration of the gravity-emergent-from-
/// greedy story up to the local-emit point:
///
///   greedy_cache_for → note_read → heat_counter.bump →
///     gravity_tick → mesh.announce_heat → user_caps now carries
///     the heat tag
///
/// Cross-peer propagation (A picks up B's heat tag via the
/// capability-announcement bus) is exercised by the
/// capability-broadcast tests; this scenario pins the
/// gravity-specific path locally on B so a regression in the
/// emit logic is loud regardless of propagation timing.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn read_hot_chain_emits_heat_tag_into_local_caps() {
    let node_a = build_node().await;
    let node_b = build_node().await;
    handshake(&node_a, &node_b).await;

    let redex_b = Arc::new(Redex::new());
    let cfg = GreedyConfig::default().with_intent_match(IntentMatchPolicy::Disabled);
    redex_b
        .enable_greedy_dataforts(
            node_b.clone(),
            cfg,
            Arc::new(CapabilitySet::default()),
            IntentRegistry::new(),
        )
        .expect("enable greedy");

    // Install gravity with a fast tick so the test doesn't have
    // to wait for the default heartbeat-aligned cadence.
    redex_b
        .enable_gravity_for_greedy(
            node_b.clone(),
            DataGravityPolicy::default(),
            Duration::from_millis(50),
        )
        .expect("enable gravity");

    // B subscribes to A's channel + A publishes some events so
    // B's cache has something to read.
    let name = cn("dataforts/test/hot-chain");
    node_b
        .subscribe_channel(node_a.node_id(), name.clone())
        .await
        .expect("subscribe");
    let publisher = ChannelPublisher::new(name.clone(), PublishConfig::default());
    const N: u64 = 4;
    for i in 0..N {
        node_a
            .publish(&publisher, Bytes::from(format!("event-{i}")))
            .await
            .expect("publish");
    }

    // Wait for the cache to populate.
    let runtime = redex_b.greedy_runtime().expect("runtime");
    let synth = synthesize_cache_channel_name(name.wire_hash());
    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
    while tokio::time::Instant::now() < deadline {
        if runtime.contains(&synth) && runtime.cached_bytes() >= N {
            break;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    assert!(
        runtime.contains(&synth),
        "cache must populate before heat reads"
    );

    // Drive reads to bump the heat counter. The first
    // greedy_cache_for hit calls note_read which bumps the heat
    // counter (origin_hash was recorded on the entry by
    // dispatch_event).
    for _ in 0..16 {
        let _ = redex_b.greedy_cache_for(&name);
    }

    // The packet header carries `parsed.header.origin_hash` as a
    // u32 widened to u64 — same value that lands on the cache
    // entry. Compute the hex form the substrate emits to the
    // wire: chain_hex pads to 16 lowercase hex chars.
    // Fetch the origin_hash from the entry via the public
    // GreedyCacheEntry surface — entry is borrowed via the
    // cache.get path. The entry's origin_hash was stored in
    // dispatch_event; we don't have a direct accessor on
    // GreedyRuntime, so the assertion below tests for ANY
    // heat tag in A's capability_index for B (regardless of
    // which chain was hottest).
    let _entry_present = runtime.cache_file(&synth).map(|_| ());

    // Drive reads + tick + poll B's self-view for a heat tag.
    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
    let mut observed_heat = false;
    while tokio::time::Instant::now() < deadline {
        for _ in 0..8 {
            let _ = redex_b.greedy_cache_for(&name);
        }
        runtime.gravity_tick().await;
        let b_caps_self = node_b.test_capability_fold_get(node_b.node_id());
        if b_caps_self
            .tags
            .iter()
            .any(|t| matches!(t, Tag::Reserved { prefix, .. } if prefix == "heat:"))
        {
            observed_heat = true;
            break;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    assert!(
        observed_heat,
        "B's self-view of its own caps must carry a heat: tag after \
         read-driven bumps + a gravity_tick"
    );

    redex_b.disable_gravity_for_greedy();
    redex_b.disable_greedy_dataforts();
}

/// Gravity-disabled greedy still works: reads bump serve_count
/// but no heat tags emit. Pins the "Phase 4 is layered on top of
/// Phase 1, not required by it" invariant.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn greedy_without_gravity_emits_no_heat_tags() {
    let node_a = build_node().await;
    let node_b = build_node().await;
    handshake(&node_a, &node_b).await;

    let redex_b = Arc::new(Redex::new());
    redex_b
        .enable_greedy_dataforts(
            node_b.clone(),
            GreedyConfig::default().with_intent_match(IntentMatchPolicy::Disabled),
            Arc::new(CapabilitySet::default()),
            IntentRegistry::new(),
        )
        .expect("enable greedy");
    // No gravity install.
    let runtime = redex_b.greedy_runtime().expect("runtime");
    assert!(!runtime.gravity_enabled());

    let name = cn("dataforts/test/no-gravity");
    node_b
        .subscribe_channel(node_a.node_id(), name.clone())
        .await
        .expect("subscribe");
    let publisher = ChannelPublisher::new(name.clone(), PublishConfig::default());
    for i in 0..4 {
        node_a
            .publish(&publisher, Bytes::from(format!("event-{i}")))
            .await
            .expect("publish");
    }

    // Pump a few reads on B; without gravity these don't drive
    // any heat emission.
    let synth = synthesize_cache_channel_name(name.wire_hash());
    let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
    while tokio::time::Instant::now() < deadline {
        if runtime.contains(&synth) {
            break;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    for _ in 0..16 {
        let _ = redex_b.greedy_cache_for(&name);
    }

    // Wait through several heartbeat windows; assert no heat tag
    // ever appears on B's self-caps.
    tokio::time::sleep(Duration::from_millis(500)).await;
    let b_caps_self = node_b.test_capability_fold_get(node_b.node_id());
    let has_heat = b_caps_self
        .tags
        .iter()
        .any(|t| matches!(t, Tag::Reserved { prefix, .. } if prefix == "heat:"));
    assert!(
        !has_heat,
        "gravity-disabled greedy must not emit any heat: tag"
    );

    redex_b.disable_greedy_dataforts();
}

#[allow(dead_code)]
fn _force_use_has_heat_tag() {
    let _ = has_heat_tag(&CapabilitySet::default(), "deadbeef");
}