sim_lib_stream_core/spine/event_source.rs
1//! Event-source adapter that feeds a stream's packets into the kernel run
2//! ledger.
3//!
4//! [`StreamEventSource`] wraps a [`StreamValue`] and implements the kernel
5//! [`EventSource`] contract: each pull drains one packet into a sequenced
6//! packet event under a run [`Ref`], and a terminal `done` event is emitted
7//! once -- and only once -- the stream reports itself done. The kernel defines
8//! the `EventSource` protocol; this adapter supplies the stream-specific
9//! behavior of turning spine packets into ledger events.
10
11use std::sync::{Arc, Mutex};
12
13use sim_kernel::{Cx, Error, Event, EventSource, Ref, Result};
14
15use super::StreamValue;
16
17/// Bridges a [`StreamValue`] to the kernel [`EventSource`] contract.
18///
19/// Holds the stream, the run [`Ref`] that events are attributed to, and the
20/// sequencing state. Construct one with [`StreamEventSource::new`] (or
21/// [`StreamValue::event_source`]) and pull events through the [`EventSource`]
22/// implementation.
23///
24/// [`StreamValue::event_source`]: super::StreamValue::event_source
25pub struct StreamEventSource {
26 stream: Arc<StreamValue>,
27 run: Ref,
28 state: Mutex<EventSourceState>,
29}
30
31struct EventSourceState {
32 next_seq: u64,
33 done_sent: bool,
34}
35
36impl StreamEventSource {
37 /// Creates an event source over `stream`, attributing emitted events to
38 /// `run` and numbering them from `start_seq`.
39 pub fn new(stream: Arc<StreamValue>, run: Ref, start_seq: u64) -> Self {
40 Self {
41 stream,
42 run,
43 state: Mutex::new(EventSourceState {
44 next_seq: start_seq,
45 done_sent: false,
46 }),
47 }
48 }
49}
50
51impl EventSource for StreamEventSource {
52 fn next(&self, cx: &mut Cx) -> Result<Option<Event>> {
53 if let Some(item) = self.stream.next_packet()? {
54 let mut state = self
55 .state
56 .lock()
57 .map_err(|_| Error::PoisonedLock("stream event source"))?;
58 let event = item.chunk_event(cx, self.run.clone(), state.next_seq)?;
59 state.next_seq = state.next_seq.saturating_add(1);
60 return Ok(Some(event));
61 }
62 let mut state = self
63 .state
64 .lock()
65 .map_err(|_| Error::PoisonedLock("stream event source"))?;
66 if self.stream.is_done()? && !state.done_sent {
67 state.done_sent = true;
68 let event = Event::done(self.run.clone(), state.next_seq)?;
69 state.next_seq = state.next_seq.saturating_add(1);
70 Ok(Some(event))
71 } else {
72 Ok(None)
73 }
74 }
75
76 fn close(&self, _cx: &mut Cx) -> Result<()> {
77 self.stream.cancel()
78 }
79}