sim-lib-server 0.1.0

SIM workspace package for sim lib server.
Documentation
use super::*;

#[test]
fn lifecycle_functions_change_and_report_status() {
    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();

    let started = cx
        .eval_expr(Expr::Call {
            operator: Box::new(Expr::Symbol(Symbol::qualified("server", "start"))),
            args: vec![
                Expr::Symbol(Symbol::new(":address")),
                Expr::Symbol(Symbol::new("local")),
                Expr::Symbol(Symbol::new(":name")),
                Expr::Symbol(Symbol::qualified("demo", "srv")),
            ],
        })
        .unwrap();
    let server = started.object().downcast_ref::<Server>().unwrap();
    assert_eq!(server.status(), ServerStatus::Running);

    let suspended = cx
        .call_function(
            &Symbol::qualified("server", "suspend"),
            sim_kernel::Args::new(vec![started.clone()]),
        )
        .unwrap();
    assert_eq!(
        suspended
            .object()
            .downcast_ref::<Server>()
            .unwrap()
            .status(),
        ServerStatus::Suspended
    );

    let resumed = cx
        .call_function(
            &Symbol::qualified("server", "resume"),
            sim_kernel::Args::new(vec![started.clone()]),
        )
        .unwrap();
    assert_eq!(
        resumed.object().downcast_ref::<Server>().unwrap().status(),
        ServerStatus::Running
    );

    let stopped = cx
        .call_function(
            &Symbol::qualified("server", "stop"),
            sim_kernel::Args::new(vec![started.clone()]),
        )
        .unwrap();
    assert_eq!(
        stopped.object().downcast_ref::<Server>().unwrap().status(),
        ServerStatus::Stopped
    );

    let health = cx
        .call_function(
            &Symbol::qualified("server", "health"),
            sim_kernel::Args::new(vec![started]),
        )
        .unwrap();
    let Expr::Map(entries) = health.object().as_expr(&mut cx).unwrap() else {
        panic!("server/health should produce a table expression");
    };
    assert_eq!(
        table_field(&entries, "status"),
        Some(Expr::Symbol(Symbol::new("stopped")))
    );
    assert_eq!(
        table_field(&entries, "sessions"),
        Some(Expr::String("0".to_owned()))
    );
    assert_eq!(table_field(&entries, "listening"), None);
}

#[test]
fn coroutine_servers_yield_resume_and_cancel() {
    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();
    cx.grant(eval_fabric_capability());
    cx.grant(eval_remote_capability());

    let site = cx.factory().opaque(Arc::new(YieldingSiteFn)).unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", "yield-site"), site)
        .unwrap();

    let server = cx
        .eval_expr(Expr::Call {
            operator: Box::new(Expr::Symbol(Symbol::qualified("server", "start"))),
            args: vec![
                Expr::Symbol(Symbol::new(":address")),
                Expr::List(vec![Expr::Symbol(Symbol::new("coroutine"))]),
                Expr::Symbol(Symbol::new(":thread")),
                Expr::List(vec![
                    Expr::Symbol(Symbol::new("coroutine")),
                    Expr::Symbol(Symbol::new("main")),
                ]),
                Expr::Symbol(Symbol::new(":site")),
                Expr::Symbol(Symbol::qualified("test", "yield-site")),
            ],
        })
        .unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", "coro-server"), server.clone())
        .unwrap();

    let status = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "coroutine-status"))
                .unwrap(),
            vec![Expr::Symbol(Symbol::qualified("test", "coro-server"))],
        )
        .unwrap();
    assert_eq!(
        status.object().as_expr(&mut cx).unwrap(),
        Expr::Symbol(Symbol::new("suspended"))
    );

    let connection = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "connect"))
                .unwrap(),
            vec![Expr::Symbol(Symbol::qualified("test", "coro-server"))],
        )
        .unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", "coro-conn"), connection)
        .unwrap();

    let yielded = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "request"))
                .unwrap(),
            vec![
                Expr::Symbol(Symbol::qualified("test", "coro-conn")),
                Expr::String("first".to_owned()),
            ],
        )
        .unwrap();
    assert_eq!(
        yielded.object().as_expr(&mut cx).unwrap(),
        Expr::String("first".to_owned())
    );

    let resumed = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "resume"))
                .unwrap(),
            vec![
                Expr::Symbol(Symbol::qualified("test", "coro-server")),
                Expr::String("second".to_owned()),
            ],
        )
        .unwrap();
    assert_eq!(
        resumed.object().as_expr(&mut cx).unwrap(),
        Expr::String("second".to_owned())
    );

    let lisp = cx
        .call_function(
            &Symbol::qualified("server", "lisp"),
            sim_kernel::Args::new(vec![server.clone()]),
        )
        .unwrap();
    let Expr::Call { args, .. } = lisp.object().as_expr(&mut cx).unwrap() else {
        panic!("server/lisp should return a call expression");
    };
    assert!(args.windows(2).any(|pair| {
        pair[0] == Expr::Symbol(Symbol::new(":thread"))
            && pair[1]
                == Expr::List(vec![
                    Expr::Symbol(Symbol::new("coroutine")),
                    Expr::Symbol(Symbol::new("main")),
                ])
    }));

    cx.call_exprs(
        cx.resolve_function(&Symbol::qualified("server", "cancel-coroutine"))
            .unwrap(),
        vec![Expr::Symbol(Symbol::qualified("test", "coro-server"))],
    )
    .unwrap();

    let done = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "coroutine-status"))
                .unwrap(),
            vec![Expr::Symbol(Symbol::qualified("test", "coro-server"))],
        )
        .unwrap();
    assert_eq!(
        done.object().as_expr(&mut cx).unwrap(),
        Expr::Symbol(Symbol::new("done"))
    );
}

#[test]
fn spawn_and_pool_thread_modes_start_cleanly() {
    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();
    for thread in ["spawn", "pool"] {
        let server = cx
            .eval_expr(Expr::Call {
                operator: Box::new(Expr::Symbol(Symbol::qualified("server", "start"))),
                args: vec![
                    Expr::Symbol(Symbol::new(":thread")),
                    Expr::Symbol(Symbol::new(thread)),
                ],
            })
            .unwrap();
        let reflected = cx
            .call_function(
                &Symbol::qualified("server", "reflect"),
                sim_kernel::Args::new(vec![server.clone()]),
            )
            .unwrap();
        let Expr::Map(entries) = reflected.object().as_expr(&mut cx).unwrap() else {
            panic!("server/reflect should produce a table expression");
        };
        assert_eq!(
            table_field(&entries, "thread"),
            Some(Expr::Symbol(Symbol::new(thread)))
        );
        let _ = cx
            .call_function(
                &Symbol::qualified("server", "stop"),
                sim_kernel::Args::new(vec![server]),
            )
            .unwrap();
    }
}

#[test]
fn unsupported_coroutine_thread_base_reports_current_error() {
    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();

    let err = cx
        .eval_expr(Expr::Call {
            operator: Box::new(Expr::Symbol(Symbol::qualified("server", "start"))),
            args: vec![
                Expr::Symbol(Symbol::new(":thread")),
                Expr::List(vec![
                    Expr::Symbol(Symbol::new("coroutine")),
                    Expr::Symbol(Symbol::new("spawn")),
                ]),
            ],
        })
        .unwrap_err();

    assert!(matches!(
        err,
        sim_kernel::Error::Eval(message)
            if message == "server/start: coroutine thread mode supports only main or coop base"
    ));
}

#[test]
fn server_health_reports_live_runtime_counters() {
    let mut eval_cx = cx();
    let codecs = installed_codecs();
    let default_codec = codecs[0].clone();
    let site = Arc::new(LoopbackSite {
        address: ServerAddress::InProcess { thread: 7 },
        codecs: codecs.clone(),
    });
    let transport = Arc::new(crate::transport::RegistryTransport::new(
        ServerAddress::InProcess { thread: 7 },
    ));
    let runtime = Arc::new(ServerRuntime::new(
        transport,
        cx(),
        crate::ThreadMode::Spawn,
        crate::transport::DEFAULT_MAX_INFLIGHT_FRAMES,
    ));
    let session_id = runtime
        .open_session(
            Symbol::qualified("codec", "binary"),
            IsolationPolicy::default(),
        )
        .unwrap();
    runtime.note_message_received();
    runtime.note_message_sent();
    runtime
        .update_session_codec(session_id, Symbol::qualified("codec", "json"))
        .unwrap();
    let server = Server::with_runtime(
        ServerAddress::InProcess { thread: 7 },
        default_codec,
        codecs,
        crate::ThreadMode::Spawn,
        IsolationPolicy::default(),
        None,
        site,
        vec![],
        Some(runtime),
    )
    .unwrap();

    let health = server.health_value(&mut eval_cx).unwrap();
    let Expr::Map(entries) = health.object().as_expr(&mut eval_cx).unwrap() else {
        panic!("server/health should produce a table expression");
    };
    assert_eq!(
        table_field(&entries, "sessions"),
        Some(Expr::String("1".to_owned()))
    );
    assert_eq!(
        table_field(&entries, "connections"),
        Some(Expr::String("1".to_owned()))
    );
    assert_eq!(
        table_field(&entries, "messages-sent"),
        Some(Expr::String("1".to_owned()))
    );
    assert_eq!(
        table_field(&entries, "messages-received"),
        Some(Expr::String("1".to_owned()))
    );
}

#[test]
fn server_sessions_lists_runtime_session_tables() {
    let mut eval_cx = cx();
    let codecs = installed_codecs();
    let site = Arc::new(LoopbackSite {
        address: ServerAddress::InProcess { thread: 9 },
        codecs: codecs.clone(),
    });
    let transport = Arc::new(crate::transport::RegistryTransport::new(
        ServerAddress::InProcess { thread: 9 },
    ));
    let runtime = Arc::new(ServerRuntime::new(
        transport,
        cx(),
        crate::ThreadMode::Spawn,
        crate::transport::DEFAULT_MAX_INFLIGHT_FRAMES,
    ));
    runtime
        .open_session(
            Symbol::qualified("codec", "binary"),
            IsolationPolicy::default(),
        )
        .unwrap();
    let server = Server::with_runtime(
        ServerAddress::InProcess { thread: 9 },
        codecs[0].clone(),
        codecs,
        crate::ThreadMode::Spawn,
        IsolationPolicy::default(),
        None,
        site,
        vec![],
        Some(runtime),
    )
    .unwrap();

    let sessions = server.sessions_value(&mut eval_cx).unwrap();
    let Expr::List(items) = sessions.object().as_expr(&mut eval_cx).unwrap() else {
        panic!("server/sessions should return a list");
    };
    assert_eq!(items.len(), 1);
    let Expr::Map(entries) = items[0].clone() else {
        panic!("server/sessions entries should be tables");
    };
    assert_eq!(
        table_field(&entries, "negotiated-codec"),
        Some(Expr::Symbol(Symbol::qualified("codec", "binary")))
    );
    assert_eq!(table_field(&entries, "closed"), Some(Expr::Bool(false)));
}

#[test]
fn server_close_marks_connection_session_closed() {
    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();

    let connection = cx
        .call_exprs(
            cx.resolve_function(&Symbol::qualified("server", "connect"))
                .unwrap(),
            vec![quoted(Expr::Symbol(Symbol::new("local")))],
        )
        .unwrap();
    let connection_ref = connection.object().downcast_ref::<Connection>().unwrap();
    assert!(!connection_ref.session().closed);

    let closed = cx
        .call_function(
            &Symbol::qualified("server", "close"),
            sim_kernel::Args::new(vec![connection.clone()]),
        )
        .unwrap();
    let closed_ref = closed.object().downcast_ref::<Connection>().unwrap();
    assert!(closed_ref.session().closed);
}