1use std::{
2 collections::VecDeque,
3 sync::{Arc, Mutex},
4};
5
6use sim_kernel::{Args, ClassRef, Cx, Error, Expr, Object, Result, Stream, Symbol, Value};
7
8use crate::{FrameEnvelope, FrameKind, ServerFrame, StreamSink, eval_reply_from_frame};
9
10pub(crate) fn stream_handle_from_value(value: &Value) -> Option<&StreamHandle> {
11 value.object().downcast_ref::<StreamHandle>()
12}
13
14pub(crate) fn evaluated_stream_handle(cx: &mut Cx, expr: &Expr) -> Result<StreamHandle> {
15 let value = cx.eval_expr(expr.clone())?;
16 stream_handle_from_value(&value)
17 .cloned()
18 .ok_or(Error::TypeMismatch {
19 expected: "stream handle",
20 found: "non-stream",
21 })
22}
23
24pub(crate) fn stream_handle_arg<'a>(
25 args: &'a Args,
26 index: usize,
27 message: &'static str,
28) -> Result<&'a StreamHandle> {
29 let Some(value) = args.values().get(index) else {
30 return Err(Error::Eval(message.to_owned()));
31 };
32 value
33 .object()
34 .downcast_ref::<StreamHandle>()
35 .ok_or(Error::TypeMismatch {
36 expected: "stream handle",
37 found: "non-stream",
38 })
39}
40
41#[derive(Clone, Default)]
42pub struct StreamHandle {
47 state: Arc<Mutex<StreamState>>,
48}
49
50#[derive(Default)]
51struct StreamState {
52 chunks: VecDeque<Value>,
53 done: bool,
54 cancelled: bool,
55 error: Option<String>,
56}
57
58impl StreamHandle {
59 pub(crate) fn push(&self, value: Value) -> Result<()> {
60 let mut state = self
61 .state
62 .lock()
63 .map_err(|_| Error::PoisonedLock("stream handle"))?;
64 if !state.cancelled && !state.done {
65 state.chunks.push_back(value);
66 }
67 Ok(())
68 }
69
70 pub(crate) fn finish(&self) -> Result<()> {
71 let mut state = self
72 .state
73 .lock()
74 .map_err(|_| Error::PoisonedLock("stream handle"))?;
75 state.done = true;
76 Ok(())
77 }
78
79 pub(crate) fn finish_with_error(&self, message: String) {
80 if let Ok(mut state) = self.state.lock() {
81 state.error = Some(message);
82 state.done = true;
83 }
84 }
85
86 fn next_item(&self, _cx: &mut Cx) -> Result<Option<Value>> {
87 let mut state = self
88 .state
89 .lock()
90 .map_err(|_| Error::PoisonedLock("stream handle"))?;
91 if let Some(error) = state.error.take() {
92 return Err(Error::Eval(error));
93 }
94 Ok(state.chunks.pop_front())
95 }
96
97 pub(crate) fn next(&self, cx: &mut Cx) -> Result<Value> {
98 Ok(self.next_item(cx)?.unwrap_or(cx.factory().nil()?))
99 }
100
101 pub(crate) fn cancel(&self) {
102 if let Ok(mut state) = self.state.lock() {
103 state.cancelled = true;
104 state.done = true;
105 state.chunks.clear();
106 }
107 }
108
109 pub(crate) fn is_done(&self) -> bool {
110 self.state.lock().map(|state| state.done).unwrap_or(true)
111 }
112
113 pub(crate) fn buffered_len(&self) -> usize {
114 self.state
115 .lock()
116 .map(|state| state.chunks.len())
117 .unwrap_or(0)
118 }
119
120 pub(crate) fn is_cancelled(&self) -> bool {
121 self.state
122 .lock()
123 .map(|state| state.cancelled)
124 .unwrap_or(true)
125 }
126
127 pub fn buffered_values(&self) -> Result<Vec<Value>> {
129 self.state
130 .lock()
131 .map(|state| state.chunks.iter().cloned().collect())
132 .map_err(|_| Error::PoisonedLock("stream handle"))
133 }
134}
135
136impl Object for StreamHandle {
137 fn display(&self, _cx: &mut Cx) -> Result<String> {
138 Ok("#<server-stream>".to_owned())
139 }
140
141 fn as_any(&self) -> &dyn std::any::Any {
142 self
143 }
144}
145
146impl sim_kernel::ObjectCompat for StreamHandle {
147 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
148 cx.factory().class_stub(
149 sim_kernel::ClassId(0),
150 Symbol::qualified("server", "StreamHandle"),
151 )
152 }
153 fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
154 self.as_table(cx)?.object().as_expr(cx)
155 }
156 fn as_stream(&self) -> Option<&dyn Stream> {
157 Some(self)
158 }
159 fn as_table(&self, cx: &mut Cx) -> Result<Value> {
160 cx.factory().table(vec![
161 (
162 Symbol::new("kind"),
163 cx.factory().symbol(Symbol::new("stream-handle"))?,
164 ),
165 (
166 Symbol::new("buffered"),
167 cx.factory().string(self.buffered_len().to_string())?,
168 ),
169 (Symbol::new("done"), cx.factory().bool(self.is_done())?),
170 (
171 Symbol::new("cancelled"),
172 cx.factory().bool(self.is_cancelled())?,
173 ),
174 ])
175 }
176}
177
178impl Stream for StreamHandle {
179 fn next(&self, cx: &mut Cx) -> Result<Option<Value>> {
180 self.next_item(cx)
181 }
182
183 fn close(&self, _cx: &mut Cx) -> Result<()> {
184 self.cancel();
185 Ok(())
186 }
187}
188
189pub fn stream_frame_to_value(cx: &mut Cx, frame: &ServerFrame) -> Result<Option<Value>> {
194 match &frame.kind {
195 FrameKind::Response => Ok(Some(eval_reply_from_frame(cx, frame)?.value)),
196 FrameKind::StreamChunk => {
197 let expr = stream_frame_to_expr(cx, frame)?.ok_or_else(|| {
198 Error::Eval("stream chunk frame did not decode to a payload".to_owned())
199 })?;
200 Ok(Some(cx.eval_expr(expr)?))
201 }
202 FrameKind::StreamStart | FrameKind::StreamEnd => Ok(None),
203 _ => Err(unsupported_stream_frame_error(&frame.kind)),
204 }
205}
206
207pub fn stream_frame_to_expr(cx: &mut Cx, frame: &ServerFrame) -> Result<Option<Expr>> {
212 match &frame.kind {
213 FrameKind::Response => {
214 let value = eval_reply_from_frame(cx, frame)?.value;
215 Ok(Some(value.object().as_expr(cx)?))
216 }
217 FrameKind::StreamChunk => Ok(Some(
218 frame.decode_expr(cx, sim_kernel::ReadPolicy::default())?,
219 )),
220 FrameKind::StreamStart | FrameKind::StreamEnd => Ok(None),
221 _ => Err(unsupported_stream_frame_error(&frame.kind)),
222 }
223}
224
225pub fn stream_frame_from_expr(
231 cx: &mut Cx,
232 codec: Symbol,
233 kind: FrameKind,
234 expr: &Expr,
235 envelope: FrameEnvelope,
236) -> Result<ServerFrame> {
237 match kind {
238 FrameKind::StreamStart | FrameKind::StreamChunk => {
239 let mut frame = ServerFrame::from_expr(
240 cx,
241 codec,
242 kind,
243 expr,
244 envelope.consistency,
245 envelope.required_capabilities.clone(),
246 envelope.trace,
247 )?;
248 frame.envelope = envelope;
249 Ok(frame)
250 }
251 _ => Err(Error::Eval(format!(
252 "stream expression frame helper cannot build frame kind {}",
253 kind.as_symbol()
254 ))),
255 }
256}
257
258pub fn stream_chunk_frame_from_expr(
260 cx: &mut Cx,
261 codec: Symbol,
262 expr: &Expr,
263 envelope: FrameEnvelope,
264) -> Result<ServerFrame> {
265 stream_frame_from_expr(cx, codec, FrameKind::StreamChunk, expr, envelope)
266}
267
268pub fn stream_end_frame(codec: Symbol, envelope: FrameEnvelope) -> ServerFrame {
270 ServerFrame::new(codec, FrameKind::StreamEnd, envelope, Vec::new())
271}
272
273fn unsupported_stream_frame_error(kind: &FrameKind) -> Error {
274 Error::Eval(format!(
275 "stream sink cannot buffer frame kind {}",
276 kind.as_symbol()
277 ))
278}
279
280pub struct BufferedStreamSink {
282 handle: Arc<StreamHandle>,
283}
284
285impl BufferedStreamSink {
286 pub fn new(handle: Arc<StreamHandle>) -> Self {
288 Self { handle }
289 }
290}
291
292impl StreamSink for BufferedStreamSink {
293 fn chunk(&mut self, cx: &mut Cx, frame: ServerFrame) -> Result<()> {
294 let closes_stream = matches!(frame.kind, FrameKind::StreamEnd);
295 if let Some(value) = stream_frame_to_value(cx, &frame)? {
296 self.handle.push(value)?;
297 }
298 if closes_stream {
299 self.handle.finish()?;
300 }
301 Ok(())
302 }
303
304 fn end(&mut self, _cx: &mut Cx) -> Result<()> {
305 self.handle.finish()
306 }
307}
308
309#[cfg(test)]
310mod tests;