sim-lib-server 0.1.0-rc.1

SIM workspace package for sim lib server.
Documentation
use std::{io::Cursor, path::PathBuf, sync::Arc};

use sim_kernel::{Consistency, EvalRequest, Expr, ReadPolicy, Symbol, eval_remote_capability};

use crate::{
    EvalSite, FrameEnvelope, FrameKind, LocalEvalSite, ServerAddress, ServerFrame,
    eval_request_from_frame, server_frame_from_request,
};

use super::super::{
    ConnectionTransport, LocalTransport, TransportEndpoint, connect_transport_site,
    connect_transport_site_with_loopback, decode_transport_frame, encode_transport_frame,
    read_frame_from, register_endpoint, require_connect_capabilities, require_start_capabilities,
    route_frame_bytes, unregister_endpoint, write_frame_to,
};
use super::support::{codecs, cx};

#[test]
fn transport_frame_round_trips_negotiate_header() {
    let frame = ServerFrame {
        version: 1,
        codec: Symbol::qualified("codec", "binary"),
        msg_id: Some(7),
        correlate: Some(3),
        kind: FrameKind::Negotiate {
            codecs: vec![
                Symbol::qualified("codec", "binary"),
                Symbol::qualified("codec", "lisp"),
            ],
        },
        envelope: FrameEnvelope::default(),
        payload: b"abc".to_vec(),
    };

    let decoded = decode_transport_frame(&encode_transport_frame(&frame).unwrap()).unwrap();
    assert_eq!(decoded, frame);
}

#[test]
fn stream_helpers_round_trip_transport_frames() {
    let frame = ServerFrame {
        version: 1,
        codec: Symbol::qualified("codec", "binary"),
        msg_id: Some(7),
        correlate: Some(3),
        kind: FrameKind::Negotiate {
            codecs: vec![Symbol::qualified("codec", "binary")],
        },
        envelope: FrameEnvelope::default(),
        payload: b"abc".to_vec(),
    };

    let mut cursor = Cursor::new(Vec::new());
    write_frame_to(&mut cursor, &frame).unwrap();
    cursor.set_position(0);

    let decoded = read_frame_from(&mut cursor).unwrap().unwrap();
    assert_eq!(decoded, frame);
    assert!(read_frame_from(&mut cursor).unwrap().is_none());
}

#[test]
fn transport_frame_rejects_truncated_prefix() {
    let err = decode_transport_frame(&[0, 1, 2]).unwrap_err();
    assert!(format!("{err}").contains("truncated transport frame prefix"));
}

#[test]
fn stream_helpers_reject_truncated_prefix() {
    let mut cursor = Cursor::new(vec![0, 1, 2]);
    let err = read_frame_from(&mut cursor).unwrap_err();
    assert!(format!("{err}").contains("truncated transport frame prefix"));
}

#[test]
fn transport_frame_rejects_oversized_lengths_before_allocation() {
    let mut bytes = Vec::new();
    bytes.extend_from_slice(&(u32::MAX).to_be_bytes());
    bytes.extend_from_slice(&1u32.to_be_bytes());
    bytes.extend_from_slice(&[0u8; 8]);
    let err = decode_transport_frame(&bytes).unwrap_err();
    assert!(format!("{err}").contains("size limit"));
}

#[test]
fn local_transport_routes_through_eval_site() {
    let mut cx = cx();
    cx.grant(eval_remote_capability());
    let site = Arc::new(LocalEvalSite::new(ServerAddress::Local, codecs()));
    let mut transport = LocalTransport::new(ServerAddress::Local, site.clone());
    let frame = server_frame_from_request(
        &mut cx,
        &Symbol::qualified("codec", "binary"),
        EvalRequest {
            expr: Expr::Bool(true),
            mode: sim_kernel::EvalMode::Eval,
            result_shape: None,
            answer_limit: None,
            stream_buffer: None,
            stream: false,
            required_capabilities: Vec::new(),
            deadline: None,
            consistency: Consistency::LocalFirst,
            trace: false,
        },
    )
    .unwrap();
    transport.send_frame(&mut cx, frame).unwrap();
    let reply = transport.recv_frame(&mut cx, None).unwrap().unwrap();
    assert_eq!(reply.kind, FrameKind::Response);
}

#[test]
fn tcp_transport_uses_registry_loopback_only_when_requested() {
    #[derive(Clone)]
    struct LoopbackFallbackSite {
        address: ServerAddress,
        codecs: Vec<Symbol>,
    }

    impl EvalSite for LoopbackFallbackSite {
        fn site_kind(&self) -> &'static str {
            "loopback-fallback"
        }

        fn address(&self) -> &ServerAddress {
            &self.address
        }

        fn codecs(&self) -> &[Symbol] {
            &self.codecs
        }

        fn answer(
            &self,
            cx: &mut sim_kernel::Cx,
            frame: ServerFrame,
        ) -> sim_kernel::Result<ServerFrame> {
            let expr = eval_request_from_frame(cx, &frame)?.expr;
            let mut reply = ServerFrame::from_expr(
                cx,
                frame.codec.clone(),
                FrameKind::Response,
                &expr,
                frame.envelope.consistency,
                Vec::new(),
                false,
            )?;
            reply.correlate = frame.msg_id;
            Ok(reply)
        }

        fn as_any(&self) -> &dyn std::any::Any {
            self
        }
    }

    let address = ServerAddress::Tcp {
        host: "127.0.0.1".to_owned(),
        port: 65123,
    };
    register_endpoint(TransportEndpoint {
        address: address.clone(),
        site: Arc::new(LoopbackFallbackSite {
            address: address.clone(),
            codecs: codecs(),
        }),
    })
    .unwrap();

    let mut cx = cx();
    cx.grant_named("network");
    let err = connect_transport_site(&mut cx, address.clone(), codecs())
        .err()
        .unwrap();
    assert!(format!("{err}").contains("io"));

    let (client_site, selected) =
        connect_transport_site_with_loopback(&mut cx, address.clone(), codecs(), true).unwrap();
    assert_eq!(selected, Symbol::qualified("codec", "binary"));

    let request = server_frame_from_request(
        &mut cx,
        &selected,
        EvalRequest {
            expr: Expr::String("loopback".to_owned()),
            mode: sim_kernel::EvalMode::Eval,
            result_shape: None,
            answer_limit: None,
            stream_buffer: None,
            stream: false,
            required_capabilities: Vec::new(),
            deadline: None,
            consistency: Consistency::RemoteOnly,
            trace: false,
        },
    )
    .unwrap();
    let reply = client_site.answer(&mut cx, request).unwrap();
    assert_eq!(
        reply.decode_expr(&mut cx, ReadPolicy::default()).unwrap(),
        Expr::String("loopback".to_owned())
    );

    unregister_endpoint(&address).unwrap();
}

#[test]
fn start_and_connect_capabilities_gate_network_surfaces() {
    let cx = cx();
    let tcp = ServerAddress::Tcp {
        host: "127.0.0.1".to_owned(),
        port: 7171,
    };
    let unix = ServerAddress::Unix {
        path: PathBuf::from("/tmp/sim-say-capability.sock"),
    };
    let http = ServerAddress::Http {
        url: "http://127.0.0.1:7171".to_owned(),
    };

    let start_err = require_start_capabilities(&cx, &tcp).unwrap_err();
    assert!(format!("{start_err}").contains("network"));

    let connect_err = require_connect_capabilities(&cx, &tcp).unwrap_err();
    assert!(format!("{connect_err}").contains("network"));

    let unix_start_err = require_start_capabilities(&cx, &unix).unwrap_err();
    assert!(format!("{unix_start_err}").contains("network"));

    let unix_connect_err = require_connect_capabilities(&cx, &unix).unwrap_err();
    assert!(format!("{unix_connect_err}").contains("network"));

    let mut cx = cx;
    cx.grant_named("network");
    let http_err = require_start_capabilities(&cx, &http).unwrap_err();
    assert!(format!("{http_err}").contains("webhook-serve"));
}

#[cfg(not(unix))]
#[test]
fn unix_transports_report_not_available_on_non_unix_targets() {
    let address = ServerAddress::Unix {
        path: PathBuf::from("/tmp/sim-say-non-unix.sock"),
    };
    let err = super::super::open_server_transport(address.clone()).unwrap_err();
    assert!(format!("{err}").contains("unix sockets are not available"));

    let mut cx = cx();
    cx.grant_named("network");
    let err = connect_transport_site(&mut cx, address, codecs()).unwrap_err();
    assert!(format!("{err}").contains("unix sockets are not available"));
}

#[cfg(not(feature = "server-net-http"))]
#[test]
fn http_family_transports_fail_clearly_when_feature_is_disabled() {
    let address = ServerAddress::Http {
        url: "http://127.0.0.1:7171".to_owned(),
    };
    let err = super::super::open_server_transport(address.clone())
        .err()
        .unwrap();
    assert!(format!("{err}").contains("server-net-http"));

    let mut cx = cx();
    cx.grant_named("network");
    let err = connect_transport_site(&mut cx, address, codecs())
        .err()
        .unwrap();
    assert!(format!("{err}").contains("server-net-http"));
}

#[test]
fn wasm_transport_reuses_wasm_frame_limits() {
    let endpoint_site = Arc::new(LocalEvalSite::new(
        ServerAddress::Wasm {
            region: "shared-1".to_owned(),
        },
        codecs(),
    ));
    register_endpoint(TransportEndpoint {
        address: ServerAddress::Wasm {
            region: "shared-1".to_owned(),
        },
        site: endpoint_site.clone(),
    })
    .unwrap();

    let mut cx = cx();
    let request = server_frame_from_request(
        &mut cx,
        &Symbol::qualified("codec", "binary"),
        EvalRequest {
            expr: Expr::Bool(true),
            mode: sim_kernel::EvalMode::Eval,
            result_shape: None,
            answer_limit: None,
            stream_buffer: None,
            stream: false,
            required_capabilities: Vec::new(),
            deadline: None,
            consistency: Consistency::LocalFirst,
            trace: false,
        },
    )
    .unwrap();

    let bytes = encode_transport_frame(&request).unwrap();
    let endpoint_site: Arc<dyn EvalSite> = endpoint_site;
    let reply = route_frame_bytes(&mut cx, &endpoint_site, &bytes).unwrap();
    let decoded = decode_transport_frame(&reply).unwrap();
    assert_eq!(decoded.kind, FrameKind::Response);
}