sim-lib-server 0.1.0

SIM workspace package for sim lib server.
Documentation
use std::sync::Arc;

use sim_kernel::{Cx, Error, Expr, RawArgs, Result, Symbol, Value};

use crate::helpers::{
    clone_server_cx, connect_target_value, default_server_codec, ensure_installed_codec,
    installed_server_codecs, keyword, symbol_of, usize_from_value,
};
use crate::isolation::IsolatedEvalSite;
use crate::transport::{open_server_transport, require_start_capabilities, start_server_transport};
use crate::{
    Coroutine, CoroutineEvalSite, EvalSite, LocalEvalSite, Server, ServerAddress, ServerRuntime,
    ThreadMode, isolation::IsolationPolicy,
};

pub(crate) fn server_start(cx: &mut Cx, args: RawArgs) -> Result<Value> {
    let options = args.into_exprs();
    if !options.len().is_multiple_of(2) {
        return Err(Error::Eval(
            "server/start options must be key/value pairs".to_owned(),
        ));
    }

    let mut address = ServerAddress::Local;
    let mut codec: Option<Symbol> = None;
    let mut thread = ThreadMode::Coop;
    let mut isolation = IsolationPolicy::default();
    let mut name = None;
    let mut site = None;
    let mut max_inflight = crate::transport::DEFAULT_MAX_INFLIGHT_FRAMES;
    let mut saw_address = false;
    let mut saw_codec = false;
    let mut saw_thread = false;
    let mut saw_isolate = false;
    let mut saw_max_inflight = false;
    let mut spec = Vec::new();

    for pair in options.chunks(2) {
        let key = keyword(&pair[0])?;
        match key.as_str() {
            "address" => {
                address = ServerAddress::from_expr(&pair[1])?;
                saw_address = true;
            }
            "codec" => {
                let parsed = symbol_of(&pair[1], "server/start :codec expects a symbol")?;
                ensure_installed_codec(cx, &parsed)?;
                codec = Some(parsed);
                saw_codec = true;
            }
            "thread" => {
                thread = ThreadMode::from_expr(&pair[1])?;
                saw_thread = true;
            }
            "isolate" => {
                isolation = IsolationPolicy::from_expr(&pair[1])?;
                saw_isolate = true;
            }
            "name" => {
                name = Some(symbol_of(&pair[1], "server/start :name expects a symbol")?);
            }
            "site" => {
                site = Some(cx.eval_expr(pair[1].clone())?);
            }
            "max-inflight" => {
                let value = cx.eval_expr(pair[1].clone())?;
                max_inflight =
                    usize_from_value(cx, value, "server/start :max-inflight expects an integer")?;
                saw_max_inflight = true;
            }
            "capable" | "session" => {}
            other => {
                return Err(Error::Eval(format!(
                    "server/start: unknown option :{other}"
                )));
            }
        }
        spec.push((Symbol::new(key), pair[1].clone()));
    }

    require_start_capabilities(cx, &address)?;
    if !address.transport_available() {
        return Err(Error::Eval(
            "server/start: no transport for that address yet".to_owned(),
        ));
    }
    if !thread.is_available_now() {
        return Err(Error::Eval(
            "server/start: coroutine thread mode supports only main or coop base".to_owned(),
        ));
    }

    let supported = installed_server_codecs(cx);
    let default_codec = match codec {
        Some(codec) => codec,
        None => default_server_codec(&supported)?,
    };
    if !saw_address {
        spec.push((Symbol::new("address"), Expr::Symbol(Symbol::new("local"))));
    }
    if !saw_codec {
        spec.push((Symbol::new("codec"), Expr::Symbol(default_codec.clone())));
    }
    if !saw_thread {
        spec.push((Symbol::new("thread"), thread.as_expr()));
    }
    if !saw_isolate {
        spec.push((Symbol::new("isolate"), Expr::Nil));
    }
    if !saw_max_inflight {
        spec.push((
            Symbol::new("max-inflight"),
            Expr::String(max_inflight.to_string()),
        ));
    }

    let runtime_transport = open_server_transport(address.clone())?;
    if let Some(transport) = runtime_transport.as_ref() {
        address = transport.address().clone();
    }
    let coroutine_backed =
        matches!(address, ServerAddress::Coroutine { .. }) || thread.is_coroutine();
    let site: Arc<dyn EvalSite> = if coroutine_backed {
        let handler = site.ok_or_else(|| {
            Error::Eval("server/start: coroutine servers require :site".to_owned())
        })?;
        if handler.object().as_callable().is_none() {
            return Err(Error::TypeMismatch {
                expected: "callable",
                found: "non-callable",
            });
        }
        let coroutine = Arc::new(Coroutine::new(address.clone(), handler));
        Arc::new(CoroutineEvalSite::new(
            address.clone(),
            supported.clone(),
            coroutine,
        ))
    } else {
        if let Some(target) = site {
            connect_target_value(
                cx,
                target,
                None,
                &[],
                None,
                false,
                IsolationPolicy::default(),
            )?
            .site()
            .clone()
        } else {
            Arc::new(LocalEvalSite::new(address.clone(), supported.clone()))
        }
    };
    let site = IsolatedEvalSite::wrap(site, isolation.clone());
    let runtime = runtime_transport.map(|transport| {
        Arc::new(ServerRuntime::new_with_isolation(
            transport,
            clone_server_cx(cx),
            thread.clone(),
            max_inflight,
            isolation.clone(),
        ))
    });
    let server = Server::with_runtime(
        address,
        default_codec,
        supported,
        thread,
        isolation,
        name,
        site,
        spec,
        runtime,
    )?;
    start_server_transport(&server)?;
    cx.factory().opaque(Arc::new(server))
}