sim-lib-stream-fabric 0.1.0

Content-addressed distributed evaluation for remote stream realization.
Documentation
use std::{
    io::{BufRead, BufReader},
    process::{Child, Command, Stdio},
    sync::{Arc, Mutex},
    thread,
    time::{Duration, Instant},
};

use sim_codec_binary::BinaryCodecLib;
use sim_kernel::{
    CapabilityName, Consistency, Cx, DefaultFactory, EagerPolicy, EvalFabric, EvalFabricRef,
    EvalMode, EvalReply, EvalRequest, Expr, Result, Symbol, Value, eval_remote_capability,
};
use sim_lib_server::{Connection, ServerAddress, connect_transport_site};
use sim_lib_stream_fabric::{
    ContentAddressedFabric, ContentKey, ContentPeer, EvalCassette, EvalCassetteLedger,
};

#[derive(Default)]
struct MemoryLedger {
    entries: Mutex<Vec<(ContentKey, EvalReply)>>,
}

impl EvalCassetteLedger for MemoryLedger {
    fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
        self.entries
            .lock()
            .unwrap()
            .push((key.clone(), reply.clone()));
        Ok(())
    }

    fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
        Ok(self.entries.lock().unwrap().clone())
    }
}

struct ChildGuard {
    label: &'static str,
    child: Child,
}

impl ChildGuard {
    fn spawn(label: &'static str, seeds: &[(&str, &str)]) -> Self {
        let mut command = Command::new(env!("CARGO_BIN_EXE_sim-fabric-cadr-fixture"));
        command
            .args(["--serve-held", "--node", label, "--request-limit", "16"])
            .stderr(Stdio::piped())
            .stdout(Stdio::null());
        for (expr, value) in seeds {
            command.args(["--hold", expr, value]);
        }
        let child = command.spawn().expect("cadr fixture starts");
        Self { label, child }
    }

    fn label(&self) -> &'static str {
        self.label
    }

    fn port(&mut self) -> u16 {
        let stderr = self.child.stderr.take().expect("fixture stderr is piped");
        wait_for_cadr_ready(stderr)
    }

    fn kill(&mut self) {
        let _ = self.child.kill();
        let _ = self.child.wait();
    }
}

impl Drop for ChildGuard {
    fn drop(&mut self) {
        self.kill();
    }
}

#[test]
fn three_process_store_resolves_by_content_id_and_replays_after_two_losses() {
    let mut node_a = ChildGuard::spawn("node-a", &[]);
    let mut node_b = ChildGuard::spawn("node-b", &[("shared", "from-b")]);
    let mut node_c = ChildGuard::spawn("node-c", &[]);
    let port_a = node_a.port();
    let port_b = node_b.port();
    let port_c = node_c.port();
    let mut cx = client_cx();
    let local = Arc::new(EvalCassette::new(Arc::new(MemoryLedger::default())));
    let fabric = ContentAddressedFabric::new(
        Symbol::new("client"),
        local.clone(),
        vec![
            ContentPeer::new(Symbol::new(node_a.label()), connect_serve(&mut cx, port_a)),
            ContentPeer::new(Symbol::new(node_b.label()), connect_serve(&mut cx, port_b)),
            ContentPeer::new(Symbol::new(node_c.label()), connect_serve(&mut cx, port_c)),
        ],
    );

    let shared = request("shared");
    let shared_key = ContentKey::from_request(&shared);
    let resolved = fabric.realize(&mut cx, shared.clone()).unwrap();
    assert_eq!(value_display(&mut cx, &resolved.value), "from-b");
    assert!(
        local.get(&shared_key).is_some(),
        "client becomes a holder after a TCP peer hit"
    );

    node_b.kill();
    node_c.kill();
    let replayed = fabric.realize(&mut cx, shared).unwrap();
    assert_eq!(value_display(&mut cx, &replayed.value), "from-b");

    let unknown = request("never-seen");
    let unknown_key = ContentKey::from_request(&unknown);
    let Err(error) = fabric.realize(&mut cx, unknown) else {
        panic!("unknown content must fail closed");
    };
    assert!(format!("{error}").contains("no holder"));
    assert!(
        local.get(&unknown_key).is_none(),
        "unknown content must not pollute the local cassette"
    );

    node_a.kill();
}

fn connect_serve(cx: &mut Cx, port: u16) -> EvalFabricRef {
    Arc::new(connection(cx, port))
}

fn connection(cx: &mut Cx, port: u16) -> Connection {
    let address = ServerAddress::Tcp {
        host: loopback_host(),
        port,
    };
    let (site, selected) = connect_transport_site(cx, address.clone(), codecs()).unwrap();
    Connection::new(address, selected, codecs(), site).unwrap()
}

fn wait_for_cadr_ready(stderr: impl std::io::Read) -> u16 {
    let deadline = Instant::now() + Duration::from_secs(5);
    let mut lines = BufReader::new(stderr).lines();
    loop {
        assert!(
            Instant::now() < deadline,
            "fixture did not report readiness"
        );
        let Some(line) = lines.next() else {
            panic!("fixture exited before reporting readiness");
        };
        let line = line.expect("fixture readiness line is readable");
        if let Some(port) = line.strip_prefix("CADR_READY ") {
            return port.parse().expect("fixture prints a numeric port");
        }
        thread::sleep(Duration::from_millis(10));
    }
}

fn client_cx() -> Cx {
    let mut cx = Cx::new(Arc::new(EagerPolicy), Arc::new(DefaultFactory));
    let binary = BinaryCodecLib::new(cx.registry_mut().fresh_codec_id());
    cx.load_lib(&binary).expect("binary codec loads");
    cx.grant_named("network");
    cx.grant(eval_remote_capability());
    cx
}

fn request(expr: &str) -> EvalRequest {
    EvalRequest {
        expr: Expr::String(expr.to_owned()),
        result_shape: None,
        required_capabilities: vec![CapabilityName::new("network")],
        deadline: Some(Duration::from_millis(500)),
        consistency: Consistency::RemoteOnly,
        mode: EvalMode::Eval,
        answer_limit: None,
        stream_buffer: None,
        stream: false,
        trace: false,
    }
}

fn value_display(cx: &mut Cx, value: &Value) -> String {
    value.object().display(cx).unwrap()
}

fn codecs() -> Vec<Symbol> {
    vec![Symbol::qualified("codec", "binary")]
}

fn loopback_host() -> String {
    "127.0.0.1".to_owned()
}