sim-lib-server 0.1.0-rc.1

SIM workspace package for sim lib server.
Documentation
#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
use std::{net::TcpListener, thread, time::Duration};

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
use super::*;

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn wait_until(timeout_ms: u64, predicate: impl Fn() -> bool) {
    let start = std::time::Instant::now();
    while start.elapsed() < Duration::from_millis(timeout_ms) {
        if predicate() {
            return;
        }
        thread::sleep(Duration::from_millis(10));
    }
    panic!("timed out waiting for trigger effect");
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn bind_loopback_listener() -> Option<TcpListener> {
    let mut last_error = None;
    for _ in 0..10 {
        match TcpListener::bind(("127.0.0.1", 0)) {
            Ok(listener) => return Some(listener),
            Err(error) if error.kind() == std::io::ErrorKind::PermissionDenied => {
                last_error = Some(error);
                thread::sleep(Duration::from_millis(25));
            }
            Err(error) => panic!("failed to bind loopback listener: {error}"),
        }
    }
    eprintln!(
        "skipping real http trigger bind in this environment: {}",
        last_error
            .map(|error| error.to_string())
            .unwrap_or_else(|| "unknown error".to_owned())
    );
    None
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn install_recording_decoder(cx: &mut sim_kernel::Cx) -> Arc<Mutex<Vec<String>>> {
    let seen = Arc::new(Mutex::new(Vec::new()));
    let record = cx
        .factory()
        .opaque(Arc::new(RecordFn { seen: seen.clone() }))
        .unwrap();
    let decoder = cx.factory().opaque(Arc::new(DecodeRecordFn)).unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", "record"), record)
        .unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", "decode-record"), decoder)
        .unwrap();
    seen
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn start_server(cx: &mut sim_kernel::Cx, name: &str) {
    let server = cx
        .eval_expr(Expr::Call {
            operator: Box::new(Expr::Symbol(Symbol::qualified("server", "start"))),
            args: vec![],
        })
        .unwrap();
    cx.registry_mut()
        .register_value(Symbol::qualified("test", name), server)
        .unwrap();
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn register_trigger(cx: &mut sim_kernel::Cx, server_name: &str, source: Expr) -> sim_kernel::Value {
    cx.call_exprs(
        cx.resolve_function(&Symbol::qualified("server", "trigger"))
            .unwrap(),
        vec![
            Expr::Symbol(Symbol::qualified("test", server_name)),
            Expr::Symbol(Symbol::new(":source")),
            quoted(source),
            Expr::Symbol(Symbol::new(":decode")),
            Expr::Symbol(Symbol::qualified("test", "decode-record")),
        ],
    )
    .unwrap()
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn stop_server(cx: &mut sim_kernel::Cx, name: &str) {
    cx.call_exprs(
        cx.resolve_function(&Symbol::qualified("server", "stop"))
            .unwrap(),
        vec![Expr::Symbol(Symbol::qualified("test", name))],
    )
    .unwrap();
}

#[cfg(all(feature = "server-net-http", feature = "trigger-telegram"))]
#[test]
fn telegram_trigger_polls_real_http_endpoint() {
    let Some(listener) = bind_loopback_listener() else {
        return;
    };
    let base_url = format!(
        "http://127.0.0.1:{}/telegram",
        listener.local_addr().unwrap().port()
    );
    let server_thread = spawn_json_http_server(listener, move |count, request| {
        assert_eq!(request.method, "GET");
        assert!(
            request
                .path
                .starts_with("/telegram/botbot-token/getUpdates?")
        );
        if count == 0 {
            json_response(
                br#"{"ok":true,"result":[{"update_id":41,"message":{"chat":{"id":"other"},"text":"skip"}},{"update_id":42,"message":{"chat":{"id":"chat-7"},"text":"hello telegram"}}]}"#
                    .to_vec(),
            )
        } else {
            json_response(br#"{"ok":true,"result":[]}"#.to_vec())
        }
    });

    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();
    cx.grant_named("telegram-bot");
    let seen = install_recording_decoder(&mut cx);
    start_server(&mut cx, "telegram-real-server");

    register_trigger(
        &mut cx,
        "telegram-real-server",
        Expr::List(vec![
            Expr::Symbol(Symbol::new("telegram")),
            Expr::Symbol(Symbol::new(":chat-id")),
            Expr::String("chat-7".to_owned()),
            Expr::Symbol(Symbol::new(":bot")),
            Expr::String("bot-token".to_owned()),
            Expr::Symbol(Symbol::new(":base-url")),
            Expr::String(base_url),
        ]),
    );

    wait_until(1_000, || {
        seen.lock()
            .expect("telegram seen mutex poisoned")
            .first()
            .map(|text| text.contains("\"update_id\":42"))
            .unwrap_or(false)
    });

    stop_server(&mut cx, "telegram-real-server");
    server_thread.join().unwrap();
}

#[cfg(all(feature = "server-net-http", feature = "trigger-matrix"))]
#[test]
fn matrix_trigger_polls_real_http_sync_endpoint() {
    let Some(listener) = bind_loopback_listener() else {
        return;
    };
    let base_url = format!(
        "http://127.0.0.1:{}/matrix",
        listener.local_addr().unwrap().port()
    );
    let server_thread = spawn_json_http_server(listener, move |count, request| {
        assert_eq!(request.method, "GET");
        assert!(request.path.starts_with("/matrix/sync?"));
        if count == 0 {
            json_response(
                br#"{"next_batch":"batch-1","rooms":{"join":{"!room:example":{"timeline":{"events":[{"type":"m.room.message","content":{"body":"matrix hello"}}]}}}}}"#
                    .to_vec(),
            )
        } else {
            json_response(
                br#"{"next_batch":"batch-2","rooms":{"join":{"!room:example":{"timeline":{"events":[]}}}}}"#
                    .to_vec(),
            )
        }
    });

    let mut cx = cx();
    install_server_lib(&mut cx).unwrap();
    cx.grant_named("matrix-bot");
    let seen = install_recording_decoder(&mut cx);
    start_server(&mut cx, "matrix-real-server");

    register_trigger(
        &mut cx,
        "matrix-real-server",
        Expr::List(vec![
            Expr::Symbol(Symbol::new("matrix")),
            Expr::Symbol(Symbol::new(":room-id")),
            Expr::String("!room:example".to_owned()),
            Expr::Symbol(Symbol::new(":base-url")),
            Expr::String(base_url),
        ]),
    );

    wait_until(1_000, || {
        seen.lock()
            .expect("matrix seen mutex poisoned")
            .first()
            .map(|text| text.contains("matrix hello"))
            .unwrap_or(false)
    });

    stop_server(&mut cx, "matrix-real-server");
    server_thread.join().unwrap();
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn spawn_json_http_server(
    listener: TcpListener,
    handler: impl Fn(usize, crate::http::HttpRequest) -> crate::http::HttpResponse + Send + 'static,
) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        listener.set_nonblocking(true).unwrap();
        let deadline = std::time::Instant::now() + Duration::from_secs(2);
        let mut handled = 0usize;
        while std::time::Instant::now() < deadline {
            match listener.accept() {
                Ok((mut stream, _)) => {
                    if let Some(request) = crate::http::read_request(&mut stream).unwrap() {
                        let response = handler(handled, request);
                        crate::http::write_response(&mut stream, &response).unwrap();
                        handled += 1;
                    }
                }
                Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
                    thread::sleep(Duration::from_millis(10));
                }
                Err(error) => panic!("http test server accept failed: {error}"),
            }
        }
    })
}

#[cfg(all(
    feature = "server-net-http",
    any(feature = "trigger-telegram", feature = "trigger-matrix")
))]
fn json_response(body: Vec<u8>) -> crate::http::HttpResponse {
    crate::http::HttpResponse {
        status: 200,
        headers: vec![("Content-Type".to_owned(), "application/json".to_owned())],
        body,
    }
}