Skip to main content

sim_lib_server/
stream_support.rs

1use std::{
2    collections::VecDeque,
3    sync::{Arc, Mutex},
4};
5
6use sim_kernel::{Args, ClassRef, Cx, Error, Expr, Object, Result, Stream, Symbol, Value};
7
8use crate::{FrameEnvelope, FrameKind, ServerFrame, StreamSink, eval_reply_from_frame};
9
10pub(crate) fn stream_handle_from_value(value: &Value) -> Option<&StreamHandle> {
11    value.object().downcast_ref::<StreamHandle>()
12}
13
14pub(crate) fn evaluated_stream_handle(cx: &mut Cx, expr: &Expr) -> Result<StreamHandle> {
15    let value = cx.eval_expr(expr.clone())?;
16    stream_handle_from_value(&value)
17        .cloned()
18        .ok_or(Error::TypeMismatch {
19            expected: "stream handle",
20            found: "non-stream",
21        })
22}
23
24pub(crate) fn stream_handle_arg<'a>(
25    args: &'a Args,
26    index: usize,
27    message: &'static str,
28) -> Result<&'a StreamHandle> {
29    let Some(value) = args.values().get(index) else {
30        return Err(Error::Eval(message.to_owned()));
31    };
32    value
33        .object()
34        .downcast_ref::<StreamHandle>()
35        .ok_or(Error::TypeMismatch {
36            expected: "stream handle",
37            found: "non-stream",
38        })
39}
40
41#[derive(Clone, Default)]
42/// Buffering, cancellable handle over a server stream's chunk queue.
43///
44/// Cloning shares the same underlying buffer; usable as a runtime stream
45/// object.
46pub struct StreamHandle {
47    state: Arc<Mutex<StreamState>>,
48}
49
50#[derive(Default)]
51struct StreamState {
52    chunks: VecDeque<Value>,
53    done: bool,
54    cancelled: bool,
55    error: Option<String>,
56}
57
58impl StreamHandle {
59    pub(crate) fn push(&self, value: Value) -> Result<()> {
60        let mut state = self
61            .state
62            .lock()
63            .map_err(|_| Error::PoisonedLock("stream handle"))?;
64        if !state.cancelled && !state.done {
65            state.chunks.push_back(value);
66        }
67        Ok(())
68    }
69
70    pub(crate) fn finish(&self) -> Result<()> {
71        let mut state = self
72            .state
73            .lock()
74            .map_err(|_| Error::PoisonedLock("stream handle"))?;
75        state.done = true;
76        Ok(())
77    }
78
79    pub(crate) fn finish_with_error(&self, message: String) {
80        if let Ok(mut state) = self.state.lock() {
81            state.error = Some(message);
82            state.done = true;
83        }
84    }
85
86    fn next_item(&self, _cx: &mut Cx) -> Result<Option<Value>> {
87        let mut state = self
88            .state
89            .lock()
90            .map_err(|_| Error::PoisonedLock("stream handle"))?;
91        if let Some(error) = state.error.take() {
92            return Err(Error::Eval(error));
93        }
94        Ok(state.chunks.pop_front())
95    }
96
97    pub(crate) fn next(&self, cx: &mut Cx) -> Result<Value> {
98        Ok(self.next_item(cx)?.unwrap_or(cx.factory().nil()?))
99    }
100
101    pub(crate) fn cancel(&self) {
102        if let Ok(mut state) = self.state.lock() {
103            state.cancelled = true;
104            state.done = true;
105            state.chunks.clear();
106        }
107    }
108
109    pub(crate) fn is_done(&self) -> bool {
110        self.state.lock().map(|state| state.done).unwrap_or(true)
111    }
112
113    pub(crate) fn buffered_len(&self) -> usize {
114        self.state
115            .lock()
116            .map(|state| state.chunks.len())
117            .unwrap_or(0)
118    }
119
120    pub(crate) fn is_cancelled(&self) -> bool {
121        self.state
122            .lock()
123            .map(|state| state.cancelled)
124            .unwrap_or(true)
125    }
126
127    /// Returns a snapshot copy of the currently buffered chunk values.
128    pub fn buffered_values(&self) -> Result<Vec<Value>> {
129        self.state
130            .lock()
131            .map(|state| state.chunks.iter().cloned().collect())
132            .map_err(|_| Error::PoisonedLock("stream handle"))
133    }
134}
135
136impl Object for StreamHandle {
137    fn display(&self, _cx: &mut Cx) -> Result<String> {
138        Ok("#<server-stream>".to_owned())
139    }
140
141    fn as_any(&self) -> &dyn std::any::Any {
142        self
143    }
144}
145
146impl sim_kernel::ObjectCompat for StreamHandle {
147    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
148        cx.factory().class_stub(
149            sim_kernel::ClassId(0),
150            Symbol::qualified("server", "StreamHandle"),
151        )
152    }
153    fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
154        self.as_table(cx)?.object().as_expr(cx)
155    }
156    fn as_stream(&self) -> Option<&dyn Stream> {
157        Some(self)
158    }
159    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
160        cx.factory().table(vec![
161            (
162                Symbol::new("kind"),
163                cx.factory().symbol(Symbol::new("stream-handle"))?,
164            ),
165            (
166                Symbol::new("buffered"),
167                cx.factory().string(self.buffered_len().to_string())?,
168            ),
169            (Symbol::new("done"), cx.factory().bool(self.is_done())?),
170            (
171                Symbol::new("cancelled"),
172                cx.factory().bool(self.is_cancelled())?,
173            ),
174        ])
175    }
176}
177
178impl Stream for StreamHandle {
179    fn next(&self, cx: &mut Cx) -> Result<Option<Value>> {
180        self.next_item(cx)
181    }
182
183    fn close(&self, _cx: &mut Cx) -> Result<()> {
184        self.cancel();
185        Ok(())
186    }
187}
188
189/// Converts a stream frame into the value it carries, if any.
190///
191/// Response and stream-chunk frames yield a value; stream start/end frames
192/// yield `None`; other frame kinds are an error.
193pub fn stream_frame_to_value(cx: &mut Cx, frame: &ServerFrame) -> Result<Option<Value>> {
194    match &frame.kind {
195        FrameKind::Response => Ok(Some(eval_reply_from_frame(cx, frame)?.value)),
196        FrameKind::StreamChunk => {
197            let expr = stream_frame_to_expr(cx, frame)?.ok_or_else(|| {
198                Error::Eval("stream chunk frame did not decode to a payload".to_owned())
199            })?;
200            Ok(Some(cx.eval_expr(expr)?))
201        }
202        FrameKind::StreamStart | FrameKind::StreamEnd => Ok(None),
203        _ => Err(unsupported_stream_frame_error(&frame.kind)),
204    }
205}
206
207/// Converts a stream frame into the expression it carries, if any.
208///
209/// Response and stream-chunk frames yield an expression; stream start/end
210/// frames yield `None`; other frame kinds are an error.
211pub fn stream_frame_to_expr(cx: &mut Cx, frame: &ServerFrame) -> Result<Option<Expr>> {
212    match &frame.kind {
213        FrameKind::Response => {
214            let value = eval_reply_from_frame(cx, frame)?.value;
215            Ok(Some(value.object().as_expr(cx)?))
216        }
217        FrameKind::StreamChunk => Ok(Some(
218            frame.decode_expr(cx, sim_kernel::ReadPolicy::default())?,
219        )),
220        FrameKind::StreamStart | FrameKind::StreamEnd => Ok(None),
221        _ => Err(unsupported_stream_frame_error(&frame.kind)),
222    }
223}
224
225/// Builds a stream boundary or chunk frame from an expression payload.
226///
227/// New stream-producing code should use this helper for `StreamStart` and
228/// `StreamChunk` frames so envelope handling stays consistent across server,
229/// agent, and stream-fabric adapters.
230pub fn stream_frame_from_expr(
231    cx: &mut Cx,
232    codec: Symbol,
233    kind: FrameKind,
234    expr: &Expr,
235    envelope: FrameEnvelope,
236) -> Result<ServerFrame> {
237    match kind {
238        FrameKind::StreamStart | FrameKind::StreamChunk => {
239            let mut frame = ServerFrame::from_expr(
240                cx,
241                codec,
242                kind,
243                expr,
244                envelope.consistency,
245                envelope.required_capabilities.clone(),
246                envelope.trace,
247            )?;
248            frame.envelope = envelope;
249            Ok(frame)
250        }
251        _ => Err(Error::Eval(format!(
252            "stream expression frame helper cannot build frame kind {}",
253            kind.as_symbol()
254        ))),
255    }
256}
257
258/// Builds the standard `StreamChunk` frame for one expression payload.
259pub fn stream_chunk_frame_from_expr(
260    cx: &mut Cx,
261    codec: Symbol,
262    expr: &Expr,
263    envelope: FrameEnvelope,
264) -> Result<ServerFrame> {
265    stream_frame_from_expr(cx, codec, FrameKind::StreamChunk, expr, envelope)
266}
267
268/// Builds the standard `StreamEnd` frame for a stream envelope.
269pub fn stream_end_frame(codec: Symbol, envelope: FrameEnvelope) -> ServerFrame {
270    ServerFrame::new(codec, FrameKind::StreamEnd, envelope, Vec::new())
271}
272
273fn unsupported_stream_frame_error(kind: &FrameKind) -> Error {
274    Error::Eval(format!(
275        "stream sink cannot buffer frame kind {}",
276        kind.as_symbol()
277    ))
278}
279
280/// [`StreamSink`] that buffers received chunks into a [`StreamHandle`].
281pub struct BufferedStreamSink {
282    handle: Arc<StreamHandle>,
283}
284
285impl BufferedStreamSink {
286    /// Creates a sink that pushes chunks into `handle`.
287    pub fn new(handle: Arc<StreamHandle>) -> Self {
288        Self { handle }
289    }
290}
291
292impl StreamSink for BufferedStreamSink {
293    fn chunk(&mut self, cx: &mut Cx, frame: ServerFrame) -> Result<()> {
294        let closes_stream = matches!(frame.kind, FrameKind::StreamEnd);
295        if let Some(value) = stream_frame_to_value(cx, &frame)? {
296            self.handle.push(value)?;
297        }
298        if closes_stream {
299            self.handle.finish()?;
300        }
301        Ok(())
302    }
303
304    fn end(&mut self, _cx: &mut Cx) -> Result<()> {
305        self.handle.finish()
306    }
307}
308
309#[cfg(test)]
310mod tests;