sim-lib-server 0.1.0-rc.1

SIM workspace package for sim lib server.
Documentation
use std::sync::Arc;

use sim_kernel::{Consistency, Error, EvalRequest, Expr, NumberLiteral, ReadPolicy, Symbol};

use crate::{
    EvalSite, FrameKind, ServerAddress, ServerFrame, ServerRuntime, ThreadMode,
    eval_request_from_frame, server_frame_from_request,
};

use super::super::{ServerTransport, UnixServerTransport, connect_transport_site, run_accept_loop};
use super::support::{cx, unix_socket_path};

#[test]
fn unix_transport_rebinds_after_stale_socket_file() {
    use std::os::unix::net::UnixListener;

    let path = unix_socket_path("stale");
    let listener = match UnixListener::bind(&path) {
        Ok(listener) => listener,
        Err(error) if error.kind() == std::io::ErrorKind::PermissionDenied => return,
        Err(error) => panic!("unix bind failed: {error}"),
    };
    drop(listener);
    assert!(path.exists());

    let transport = UnixServerTransport::bind(ServerAddress::Unix { path: path.clone() }).unwrap();
    assert!(path.exists());
    let mut cx = cx();
    transport.shutdown(&mut cx).unwrap();
    assert!(!path.exists());
}

#[test]
fn unix_transport_negotiates_and_answers_over_a_real_socket() {
    #[derive(Clone)]
    struct UnixAnswerSite {
        address: ServerAddress,
        codecs: Vec<Symbol>,
    }

    impl EvalSite for UnixAnswerSite {
        fn site_kind(&self) -> &'static str {
            "unix-answer"
        }
        fn address(&self) -> &ServerAddress {
            &self.address
        }
        fn codecs(&self) -> &[Symbol] {
            &self.codecs
        }
        fn answer(
            &self,
            cx: &mut sim_kernel::Cx,
            frame: ServerFrame,
        ) -> sim_kernel::Result<ServerFrame> {
            let expr = eval_request_from_frame(cx, &frame)?.expr;
            let expected = Expr::Call {
                operator: Box::new(Expr::Symbol(Symbol::new("+"))),
                args: vec![
                    Expr::Number(NumberLiteral {
                        domain: Symbol::qualified("numbers", "f64"),
                        canonical: "2".to_owned(),
                    }),
                    Expr::Number(NumberLiteral {
                        domain: Symbol::qualified("numbers", "f64"),
                        canonical: "2".to_owned(),
                    }),
                ],
            };
            assert_eq!(expr, expected);
            let mut reply = ServerFrame::from_expr(
                cx,
                frame.codec.clone(),
                FrameKind::Response,
                &Expr::Number(NumberLiteral {
                    domain: Symbol::qualified("numbers", "f64"),
                    canonical: "4".to_owned(),
                }),
                frame.envelope.consistency,
                Vec::new(),
                false,
            )?;
            reply.correlate = frame.msg_id;
            Ok(reply)
        }
        fn as_any(&self) -> &dyn std::any::Any {
            self
        }
    }

    let path = unix_socket_path("cross-socket");
    let transport = match UnixServerTransport::bind(ServerAddress::Unix { path: path.clone() }) {
        Ok(transport) => Arc::new(transport),
        Err(Error::HostError(message)) if message.contains("PermissionDenied") => return,
        Err(error) => panic!("unix bind failed: {error}"),
    };
    let runtime = Arc::new(ServerRuntime::new(
        transport,
        cx(),
        ThreadMode::Spawn,
        crate::transport::DEFAULT_MAX_INFLIGHT_FRAMES,
    ));
    let site = Arc::new(UnixAnswerSite {
        address: ServerAddress::Unix { path: path.clone() },
        codecs: vec![
            Symbol::qualified("codec", "lisp"),
            Symbol::qualified("codec", "binary"),
        ],
    });
    let handle = std::thread::spawn({
        let runtime = runtime.clone();
        let site = site.clone();
        move || run_accept_loop(runtime, site)
    });
    runtime.set_accept_thread(handle).unwrap();

    let mut cx = cx();
    cx.grant_named("network");
    let (client_site, selected) = connect_transport_site(
        &mut cx,
        ServerAddress::Unix { path: path.clone() },
        vec![
            Symbol::qualified("codec", "binary"),
            Symbol::qualified("codec", "json"),
        ],
    )
    .unwrap();
    assert_eq!(selected, Symbol::qualified("codec", "binary"));

    let request = server_frame_from_request(
        &mut cx,
        &selected,
        EvalRequest {
            expr: Expr::Call {
                operator: Box::new(Expr::Symbol(Symbol::new("+"))),
                args: vec![
                    Expr::Number(NumberLiteral {
                        domain: Symbol::qualified("numbers", "f64"),
                        canonical: "2".to_owned(),
                    }),
                    Expr::Number(NumberLiteral {
                        domain: Symbol::qualified("numbers", "f64"),
                        canonical: "2".to_owned(),
                    }),
                ],
            },
            mode: sim_kernel::EvalMode::Eval,
            result_shape: None,
            answer_limit: None,
            stream_buffer: None,
            stream: false,
            required_capabilities: Vec::new(),
            deadline: None,
            consistency: Consistency::RemoteOnly,
            trace: false,
        },
    )
    .unwrap();
    let reply = client_site.answer(&mut cx, request).unwrap();
    assert_eq!(
        reply.decode_expr(&mut cx, ReadPolicy::default()).unwrap(),
        Expr::Number(NumberLiteral {
            domain: Symbol::qualified("numbers", "f64"),
            canonical: "4".to_owned()
        })
    );

    drop(client_site);
    runtime.begin_stop();
    runtime.join_accept_thread().unwrap();
    runtime.join_worker_threads().unwrap();
    runtime
        .with_cx(|cx| runtime.transport().shutdown(cx))
        .unwrap();
    assert!(!path.exists());
}