sim-lib-server 0.1.0-rc.1

SIM workspace package for sim lib server.
Documentation
use std::sync::Arc;

use sim_codec_lisp::LispCodecLib;
use sim_kernel::{Consistency, DefaultFactory, EagerPolicy, EvalReply};

use super::*;
use crate::{FrameEnvelope, server_frame_from_reply};

fn cx() -> Cx {
    let mut cx = Cx::new(Arc::new(EagerPolicy), Arc::new(DefaultFactory));
    let lisp = LispCodecLib::new(cx.registry_mut().fresh_codec_id()).unwrap();
    cx.load_lib(&lisp).unwrap();
    cx
}

fn codec() -> Symbol {
    Symbol::qualified("codec", "lisp")
}

fn response_frame(cx: &mut Cx, text: &str) -> ServerFrame {
    let value = cx.factory().string(text.to_owned()).unwrap();
    server_frame_from_reply(
        cx,
        &codec(),
        EvalReply {
            value,
            diagnostics: Vec::new(),
            trace: None,
        },
        Consistency::LocalFirst,
    )
    .unwrap()
}

fn chunk_frame(cx: &mut Cx, text: &str) -> ServerFrame {
    stream_chunk_frame_from_expr(
        cx,
        codec(),
        &Expr::String(text.to_owned()),
        FrameEnvelope {
            consistency: Consistency::LocalFirst,
            ..FrameEnvelope::default()
        },
    )
    .unwrap()
}

#[test]
fn stream_frame_helpers_decode_payloads_and_skip_boundaries() {
    let mut cx = cx();
    let start = ServerFrame::new(
        codec(),
        FrameKind::StreamStart,
        FrameEnvelope::default(),
        Vec::new(),
    );
    assert_eq!(stream_frame_to_expr(&mut cx, &start).unwrap(), None);
    assert!(stream_frame_to_value(&mut cx, &start).unwrap().is_none());

    let chunk = chunk_frame(&mut cx, "chunk");
    assert_eq!(
        stream_frame_to_expr(&mut cx, &chunk).unwrap(),
        Some(Expr::String("chunk".to_owned()))
    );
    assert_eq!(
        stream_frame_to_value(&mut cx, &chunk)
            .unwrap()
            .unwrap()
            .object()
            .as_expr(&mut cx)
            .unwrap(),
        Expr::String("chunk".to_owned())
    );

    let response = response_frame(&mut cx, "fallback");
    assert_eq!(
        stream_frame_to_expr(&mut cx, &response).unwrap(),
        Some(Expr::String("fallback".to_owned()))
    );
    assert_eq!(
        stream_frame_to_value(&mut cx, &response)
            .unwrap()
            .unwrap()
            .object()
            .as_expr(&mut cx)
            .unwrap(),
        Expr::String("fallback".to_owned())
    );

    let end = ServerFrame::new(
        codec(),
        FrameKind::StreamEnd,
        FrameEnvelope::default(),
        Vec::new(),
    );
    assert_eq!(stream_frame_to_expr(&mut cx, &end).unwrap(), None);
    assert!(stream_frame_to_value(&mut cx, &end).unwrap().is_none());
}

#[test]
fn stream_frame_builders_roundtrip_payloads_and_reject_non_stream_kinds() {
    let mut cx = cx();
    let payload = Expr::String("payload".to_owned());
    let envelope = FrameEnvelope {
        consistency: Consistency::LocalFirst,
        trace: true,
        ..FrameEnvelope::default()
    };
    let start = stream_frame_from_expr(
        &mut cx,
        codec(),
        FrameKind::StreamStart,
        &payload,
        envelope.clone(),
    )
    .unwrap();
    assert_eq!(start.kind, FrameKind::StreamStart);
    assert_eq!(start.envelope, envelope);
    assert_eq!(stream_frame_to_expr(&mut cx, &start).unwrap(), None);

    let chunk =
        stream_chunk_frame_from_expr(&mut cx, codec(), &payload, FrameEnvelope::default()).unwrap();
    assert_eq!(chunk.kind, FrameKind::StreamChunk);
    assert_eq!(
        stream_frame_to_expr(&mut cx, &chunk).unwrap(),
        Some(payload.clone())
    );

    let end = stream_end_frame(codec(), FrameEnvelope::default());
    assert_eq!(end.kind, FrameKind::StreamEnd);
    assert!(stream_frame_to_expr(&mut cx, &end).unwrap().is_none());

    let err = stream_frame_from_expr(
        &mut cx,
        codec(),
        FrameKind::Response,
        &payload,
        FrameEnvelope::default(),
    )
    .unwrap_err();
    assert!(err.to_string().contains("cannot build frame kind response"));
}

#[test]
fn buffered_sink_handles_response_fallback_and_idempotent_end() {
    let mut cx = cx();
    let handle = Arc::new(StreamHandle::default());
    let mut sink = BufferedStreamSink::new(handle.clone());

    let fallback = response_frame(&mut cx, "fallback");
    sink.chunk(&mut cx, fallback).unwrap();
    assert_eq!(handle.buffered_len(), 1);
    sink.end(&mut cx).unwrap();
    assert!(handle.is_done());
    sink.end(&mut cx).unwrap();
    assert!(handle.is_done());
    assert_eq!(
        handle
            .next(&mut cx)
            .unwrap()
            .object()
            .as_expr(&mut cx)
            .unwrap(),
        Expr::String("fallback".to_owned())
    );
}

#[test]
fn buffered_sink_closes_on_stream_end_before_sink_end() {
    let mut cx = cx();
    let handle = Arc::new(StreamHandle::default());
    let mut sink = BufferedStreamSink::new(handle.clone());

    sink.chunk(
        &mut cx,
        ServerFrame::new(
            codec(),
            FrameKind::StreamStart,
            FrameEnvelope::default(),
            Vec::new(),
        ),
    )
    .unwrap();
    let chunk = chunk_frame(&mut cx, "chunk");
    sink.chunk(&mut cx, chunk).unwrap();
    sink.chunk(
        &mut cx,
        ServerFrame::new(
            codec(),
            FrameKind::StreamEnd,
            FrameEnvelope::default(),
            Vec::new(),
        ),
    )
    .unwrap();

    assert!(handle.is_done());
    assert_eq!(handle.buffered_len(), 1);
    sink.end(&mut cx).unwrap();
    assert!(handle.is_done());
    assert_eq!(handle.buffered_len(), 1);
    assert_eq!(
        handle
            .next(&mut cx)
            .unwrap()
            .object()
            .as_expr(&mut cx)
            .unwrap(),
        Expr::String("chunk".to_owned())
    );
}