Skip to main content

sim_lib_stream_core/spine/
event_source.rs

1//! Event-source adapter that feeds a stream's packets into the kernel run
2//! ledger.
3//!
4//! [`StreamEventSource`] wraps a [`StreamValue`] and implements the kernel
5//! [`EventSource`] contract: each pull drains one packet into a sequenced
6//! packet event under a run [`Ref`], and a terminal `done` event is emitted
7//! once -- and only once -- the stream reports itself done. The kernel defines
8//! the `EventSource` protocol; this adapter supplies the stream-specific
9//! behavior of turning spine packets into ledger events.
10
11use std::sync::{Arc, Mutex};
12
13use sim_kernel::{Cx, Error, Event, EventSource, Ref, Result};
14
15use super::StreamValue;
16
17/// Bridges a [`StreamValue`] to the kernel [`EventSource`] contract.
18///
19/// Holds the stream, the run [`Ref`] that events are attributed to, and the
20/// sequencing state. Construct one with [`StreamEventSource::new`] (or
21/// [`StreamValue::event_source`]) and pull events through the [`EventSource`]
22/// implementation.
23///
24/// [`StreamValue::event_source`]: super::StreamValue::event_source
25pub struct StreamEventSource {
26    stream: Arc<StreamValue>,
27    run: Ref,
28    state: Mutex<EventSourceState>,
29}
30
31struct EventSourceState {
32    next_seq: u64,
33    done_sent: bool,
34}
35
36impl StreamEventSource {
37    /// Creates an event source over `stream`, attributing emitted events to
38    /// `run` and numbering them from `start_seq`.
39    pub fn new(stream: Arc<StreamValue>, run: Ref, start_seq: u64) -> Self {
40        Self {
41            stream,
42            run,
43            state: Mutex::new(EventSourceState {
44                next_seq: start_seq,
45                done_sent: false,
46            }),
47        }
48    }
49}
50
51impl EventSource for StreamEventSource {
52    fn next(&self, cx: &mut Cx) -> Result<Option<Event>> {
53        if let Some(item) = self.stream.next_packet()? {
54            let mut state = self
55                .state
56                .lock()
57                .map_err(|_| Error::PoisonedLock("stream event source"))?;
58            let event = item.chunk_event(cx, self.run.clone(), state.next_seq)?;
59            state.next_seq = state.next_seq.saturating_add(1);
60            return Ok(Some(event));
61        }
62        let mut state = self
63            .state
64            .lock()
65            .map_err(|_| Error::PoisonedLock("stream event source"))?;
66        if self.stream.is_done()? && !state.done_sent {
67            state.done_sent = true;
68            let event = Event::done(self.run.clone(), state.next_seq)?;
69            state.next_seq = state.next_seq.saturating_add(1);
70            Ok(Some(event))
71        } else {
72            Ok(None)
73        }
74    }
75
76    fn close(&self, _cx: &mut Cx) -> Result<()> {
77        self.stream.cancel()
78    }
79}