1mod event_source;
18mod queue;
19
20use std::{sync::Arc, time::Duration};
21
22use sim_citizen_derive::non_citizen;
23use sim_kernel::{
24 CORE_SEQUENCE_CLASS_ID, ClassRef, Cx, Error, Event, Object, ObjectCompat, Ref, Result,
25 Sequence, SequenceItem, Symbol, Tick, Value, stream_surface::stream_packet_event,
26 validate_ticks,
27};
28
29use crate::{StreamMetadata, StreamPacket, publish_metadata_claims};
30
31pub use event_source::StreamEventSource;
32use queue::{PullSpine, PushSpine};
33pub use queue::{PushResult, StreamStats};
34
35#[derive(Clone, Debug, PartialEq, Eq)]
42pub struct StreamItem {
43 packet: StreamPacket,
44 ticks: Vec<Tick>,
45}
46
47impl StreamItem {
48 pub fn new(packet: StreamPacket) -> Self {
50 Self {
51 packet,
52 ticks: Vec::new(),
53 }
54 }
55
56 pub fn with_ticks(packet: StreamPacket, ticks: Vec<Tick>) -> Result<Self> {
60 validate_ticks(&ticks)?;
61 Ok(Self { packet, ticks })
62 }
63
64 pub fn packet(&self) -> &StreamPacket {
66 &self.packet
67 }
68
69 pub fn ticks(&self) -> &[Tick] {
71 &self.ticks
72 }
73
74 pub fn packet_value(&self, cx: &mut Cx) -> Result<Value> {
76 cx.factory().expr(self.packet.to_expr())
77 }
78
79 pub fn sequence_item(&self, cx: &mut Cx) -> Result<SequenceItem> {
81 SequenceItem::with_ticks(self.packet_value(cx)?, self.ticks.clone())
82 }
83
84 pub fn chunk_event(&self, cx: &mut Cx, run: Ref, seq: u64) -> Result<Event> {
86 let payload = self.packet.intern_ref(cx)?;
87 stream_packet_event(run, seq, self.ticks.clone(), payload)
88 }
89}
90
91#[non_citizen(
111 reason = "live stream spine; reconstruct stream/Packet and stream/Metadata descriptors then realize separately",
112 kind = "handle",
113 descriptor = "stream/Packet"
114)]
115pub struct StreamValue {
116 metadata: StreamMetadata,
117 spine: StreamSpine,
118}
119
120enum StreamSpine {
121 Pull(PullSpine),
122 Push(PushSpine),
123}
124
125impl StreamValue {
126 pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
131 Self {
132 metadata,
133 spine: StreamSpine::Pull(PullSpine::new(items)),
134 }
135 }
136
137 pub fn push(metadata: StreamMetadata) -> Self {
143 Self {
144 spine: StreamSpine::Push(PushSpine::new(metadata.buffer().clone())),
145 metadata,
146 }
147 }
148
149 pub fn metadata(&self) -> &StreamMetadata {
151 &self.metadata
152 }
153
154 pub fn publish_claims(&self, cx: &mut Cx, subject: Ref) -> Result<()> {
156 publish_metadata_claims(cx, subject, &self.metadata)
157 }
158
159 pub fn push_packet(&self, item: StreamItem) -> Result<PushResult> {
163 match &self.spine {
164 StreamSpine::Pull(_) => Err(Error::Eval(
165 "cannot push packets into a pull stream".to_owned(),
166 )),
167 StreamSpine::Push(spine) => spine.push(item),
168 }
169 }
170
171 pub fn close_push(&self) -> Result<()> {
173 match &self.spine {
174 StreamSpine::Pull(spine) => spine.close(),
175 StreamSpine::Push(spine) => spine.close(),
176 }
177 }
178
179 pub fn next_packet(&self) -> Result<Option<StreamItem>> {
184 match &self.spine {
185 StreamSpine::Pull(spine) => spine.next(),
186 StreamSpine::Push(spine) => spine.next(),
187 }
188 }
189
190 pub fn next_packet_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
196 match &self.spine {
197 StreamSpine::Pull(spine) => spine.next(),
198 StreamSpine::Push(spine) => spine.next_timeout(timeout),
199 }
200 }
201
202 pub fn peek_packet(&self) -> Result<Option<StreamItem>> {
204 match &self.spine {
205 StreamSpine::Pull(spine) => spine.peek(),
206 StreamSpine::Push(spine) => spine.peek(),
207 }
208 }
209
210 pub fn is_done(&self) -> Result<bool> {
212 match &self.spine {
213 StreamSpine::Pull(spine) => spine.is_done(),
214 StreamSpine::Push(spine) => spine.is_done(),
215 }
216 }
217
218 pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
220 let mut out = Vec::new();
221 for _ in 0..limit {
222 let Some(item) = self.next_packet()? else {
223 break;
224 };
225 out.push(item);
226 }
227 Ok(out)
228 }
229
230 pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
236 let mut seq = start_seq;
237 let mut out = Vec::new();
238 while let Some(item) = self.next_packet()? {
239 out.push(item.chunk_event(cx, run.clone(), seq)?);
240 seq = seq.saturating_add(1);
241 }
242 if self.is_done()? {
243 out.push(Event::done(run, seq)?);
244 }
245 Ok(out)
246 }
247
248 pub fn cancel(&self) -> Result<()> {
250 match &self.spine {
251 StreamSpine::Pull(spine) => spine.cancel(),
252 StreamSpine::Push(spine) => spine.cancel(),
253 }
254 }
255
256 pub fn stats(&self) -> Result<StreamStats> {
258 match &self.spine {
259 StreamSpine::Pull(spine) => spine.stats(),
260 StreamSpine::Push(spine) => spine.stats(),
261 }
262 }
263
264 pub fn queue_depth(&self) -> Result<usize> {
266 match &self.spine {
267 StreamSpine::Pull(spine) => spine.depth(),
268 StreamSpine::Push(spine) => spine.depth(),
269 }
270 }
271
272 pub fn event_source(self: &Arc<Self>, run: Ref, start_seq: u64) -> Arc<StreamEventSource> {
275 Arc::new(StreamEventSource::new(Arc::clone(self), run, start_seq))
276 }
277}
278
279impl Object for StreamValue {
280 fn display(&self, _cx: &mut Cx) -> Result<String> {
281 Ok(format!("#<stream {}>", self.metadata.id()))
282 }
283
284 fn as_any(&self) -> &dyn std::any::Any {
285 self
286 }
287}
288
289impl ObjectCompat for StreamValue {
290 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
291 cx.factory().class_stub(
292 CORE_SEQUENCE_CLASS_ID,
293 Symbol::qualified("stream", "Stream"),
294 )
295 }
296
297 fn as_sequence(&self) -> Option<&dyn Sequence> {
298 Some(self)
299 }
300}
301
302impl Sequence for StreamValue {
303 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
304 self.next_packet()?
305 .map(|item| item.sequence_item(cx))
306 .transpose()
307 }
308
309 fn close(&self, _cx: &mut Cx) -> Result<()> {
310 self.cancel()
311 }
312
313 fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
314 self.peek_packet()?
315 .map(|item| item.sequence_item(cx))
316 .transpose()
317 }
318
319 fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
320 self.is_done()
321 }
322}
323
324pub fn stream_next_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
326 stream.next_packet()
327}
328
329pub fn stream_peek_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
331 stream.peek_packet()
332}
333
334pub fn stream_done_q(stream: &StreamValue) -> Result<bool> {
336 stream.is_done()
337}
338
339pub fn stream_take(stream: &StreamValue, limit: usize) -> Result<Vec<StreamItem>> {
341 stream.take_packets(limit)
342}
343
344pub fn stream_run_bang(
347 stream: &StreamValue,
348 cx: &mut Cx,
349 run: Ref,
350 start_seq: u64,
351) -> Result<Vec<Event>> {
352 stream.run_events(cx, run, start_seq)
353}
354
355pub fn stream_cancel_bang(stream: &StreamValue) -> Result<()> {
357 stream.cancel()
358}
359
360pub fn stream_stats(stream: &StreamValue) -> Result<StreamStats> {
362 stream.stats()
363}
364
365pub fn stream_metadata(stream: &StreamValue) -> &StreamMetadata {
367 stream.metadata()
368}
369
370pub fn stream_next_symbol() -> Symbol {
372 Symbol::qualified("stream", "next!")
373}
374
375pub fn stream_peek_symbol() -> Symbol {
377 Symbol::qualified("stream", "peek!")
378}
379
380pub fn stream_done_symbol() -> Symbol {
382 Symbol::qualified("stream", "done?")
383}
384
385pub fn stream_take_symbol() -> Symbol {
387 Symbol::qualified("stream", "take")
388}
389
390pub fn stream_run_symbol() -> Symbol {
392 Symbol::qualified("stream", "run!")
393}
394
395pub fn stream_cancel_symbol() -> Symbol {
397 Symbol::qualified("stream", "cancel!")
398}
399
400pub fn stream_stats_symbol() -> Symbol {
402 Symbol::qualified("stream", "stats")
403}
404
405pub fn stream_metadata_symbol() -> Symbol {
407 Symbol::qualified("stream", "metadata")
408}