Skip to main content

sim_lib_stream_core/spine/
queue.rs

1//! Internal queues backing a [`StreamValue`], plus the observable counters and
2//! push outcomes they expose.
3//!
4//! A stream value is driven by one of two spines: a `PullSpine` over a fixed
5//! buffer of pre-built items, or a `PushSpine` whose bounded queue is fed by an
6//! external producer under a [`BufferPolicy`]. Both spines are private to the
7//! [`super`] module; the runtime-visible surface they share is [`StreamStats`]
8//! (lifetime counters) and [`PushResult`] (the per-push backpressure decision).
9//!
10//! [`StreamValue`]: super::StreamValue
11
12use std::{
13    collections::VecDeque,
14    sync::{Condvar, Mutex},
15    time::Duration,
16};
17
18use sim_kernel::{Error, Result};
19
20use crate::{BackpressureOutcome, BufferOverflowPolicy, BufferPolicy};
21
22use super::StreamItem;
23
24/// Lifetime counters observed on a stream spine.
25///
26/// Every field accumulates over the life of the stream and is snapshotted by
27/// [`StreamValue::stats`]; the two boolean flags record terminal state. The
28/// counters are advanced by the queue as packets are pushed, yielded, dropped,
29/// or rejected, giving a behavioral record of how backpressure played out.
30///
31/// [`StreamValue::stats`]: super::StreamValue::stats
32#[derive(Clone, Debug, Default, PartialEq, Eq)]
33pub struct StreamStats {
34    /// Total packets handed to the spine via push, regardless of outcome.
35    pub pushed: u64,
36    /// Packets admitted to the queue (excludes drops, rejects, and closed
37    /// pushes).
38    pub accepted: u64,
39    /// Packets pulled out of the spine and handed to a consumer.
40    pub yielded: u64,
41    /// Packets dropped on overflow under the drop-newest policy.
42    pub dropped_newest: u64,
43    /// Packets evicted from the front on overflow under the drop-oldest policy.
44    pub dropped_oldest: u64,
45    /// Overflows that occurred while the error policy was in force.
46    pub overflow_errors: u64,
47    /// Pushes refused (returned as [`PushResult::Rejected`]).
48    pub rejected: u64,
49    /// Timed waits that elapsed without a packet becoming available.
50    pub timeouts: u64,
51    /// Timed pulls that returned empty because the wait timed out.
52    pub timed_out: u64,
53    /// Pulls that had to block waiting for a packet.
54    pub blocked: u64,
55    /// Whether the stream has been closed to further input.
56    pub closed: bool,
57    /// Whether the stream was cancelled (closed and drained of buffered items).
58    pub cancelled: bool,
59}
60
61/// Outcome of pushing a single packet into a push stream.
62///
63/// The variant records what the spine did with the packet under the active
64/// [`BufferPolicy`]. Every variant except [`PushResult::Accepted`] carries the
65/// packet back so the caller can route the rejected or evicted item elsewhere.
66/// Map a result to its kernel-facing [`BackpressureOutcome`] with
67/// [`PushResult::outcome`].
68#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum PushResult {
70    /// The packet was admitted to the queue.
71    Accepted,
72    /// The queue was full; this newest packet was dropped under the drop-newest
73    /// policy.
74    DroppedNewest(StreamItem),
75    /// The queue was full; the oldest queued packet was evicted to make room,
76    /// and that evicted packet is returned here.
77    DroppedOldest(StreamItem),
78    /// The queue was full and the error policy refused the push; the packet is
79    /// returned unaccepted.
80    Rejected(StreamItem),
81    /// The stream was already closed; the packet is returned unaccepted.
82    Closed(StreamItem),
83}
84
85impl PushResult {
86    /// Maps this push result to the kernel-facing [`BackpressureOutcome`].
87    pub fn outcome(&self) -> BackpressureOutcome {
88        match self {
89            Self::Accepted => BackpressureOutcome::Accepted,
90            Self::DroppedNewest(_) => BackpressureOutcome::DroppedNewest,
91            Self::DroppedOldest(_) => BackpressureOutcome::DroppedOldest,
92            Self::Rejected(_) => BackpressureOutcome::Rejected,
93            Self::Closed(_) => BackpressureOutcome::Closed,
94        }
95    }
96}
97
98pub(super) struct PullSpine {
99    state: Mutex<PullState>,
100}
101
102struct PullState {
103    items: VecDeque<StreamItem>,
104    closed: bool,
105    stats: StreamStats,
106}
107
108impl PullSpine {
109    pub(super) fn new(items: Vec<StreamItem>) -> Self {
110        Self {
111            state: Mutex::new(PullState {
112                items: items.into(),
113                closed: false,
114                stats: StreamStats::default(),
115            }),
116        }
117    }
118
119    pub(super) fn next(&self) -> Result<Option<StreamItem>> {
120        let mut state = self
121            .state
122            .lock()
123            .map_err(|_| Error::PoisonedLock("pull stream"))?;
124        if state.closed {
125            return Ok(None);
126        }
127        let item = state.items.pop_front();
128        if item.is_some() {
129            state.stats.yielded += 1;
130        } else {
131            state.closed = true;
132        }
133        Ok(item)
134    }
135
136    pub(super) fn peek(&self) -> Result<Option<StreamItem>> {
137        let state = self
138            .state
139            .lock()
140            .map_err(|_| Error::PoisonedLock("pull stream"))?;
141        Ok((!state.closed)
142            .then(|| state.items.front().cloned())
143            .flatten())
144    }
145
146    pub(super) fn is_done(&self) -> Result<bool> {
147        let state = self
148            .state
149            .lock()
150            .map_err(|_| Error::PoisonedLock("pull stream"))?;
151        Ok(state.closed || state.items.is_empty())
152    }
153
154    pub(super) fn close(&self) -> Result<()> {
155        let mut state = self
156            .state
157            .lock()
158            .map_err(|_| Error::PoisonedLock("pull stream"))?;
159        state.closed = true;
160        state.stats.closed = true;
161        Ok(())
162    }
163
164    pub(super) fn cancel(&self) -> Result<()> {
165        let mut state = self
166            .state
167            .lock()
168            .map_err(|_| Error::PoisonedLock("pull stream"))?;
169        state.items.clear();
170        state.closed = true;
171        state.stats.closed = true;
172        state.stats.cancelled = true;
173        Ok(())
174    }
175
176    pub(super) fn stats(&self) -> Result<StreamStats> {
177        let state = self
178            .state
179            .lock()
180            .map_err(|_| Error::PoisonedLock("pull stream"))?;
181        Ok(state.stats.clone())
182    }
183
184    pub(super) fn depth(&self) -> Result<usize> {
185        let state = self
186            .state
187            .lock()
188            .map_err(|_| Error::PoisonedLock("pull stream"))?;
189        Ok(state.items.len())
190    }
191}
192
193pub(super) struct PushSpine {
194    policy: BufferPolicy,
195    state: Mutex<PushState>,
196    ready: Condvar,
197}
198
199struct PushState {
200    queue: VecDeque<StreamItem>,
201    closed: bool,
202    stats: StreamStats,
203}
204
205impl PushSpine {
206    pub(super) fn new(policy: BufferPolicy) -> Self {
207        Self {
208            policy,
209            state: Mutex::new(PushState {
210                queue: VecDeque::new(),
211                closed: false,
212                stats: StreamStats::default(),
213            }),
214            ready: Condvar::new(),
215        }
216    }
217
218    pub(super) fn push(&self, item: StreamItem) -> Result<PushResult> {
219        let mut state = self
220            .state
221            .lock()
222            .map_err(|_| Error::PoisonedLock("push stream"))?;
223        state.stats.pushed += 1;
224        if state.closed {
225            state.stats.closed = true;
226            return Ok(PushResult::Closed(item));
227        }
228        if state.queue.len() < self.policy.capacity() {
229            state.queue.push_back(item);
230            state.stats.accepted += 1;
231            self.ready.notify_one();
232            return Ok(PushResult::Accepted);
233        }
234        match self.policy.overflow() {
235            BufferOverflowPolicy::DropNewest => {
236                state.stats.dropped_newest += 1;
237                Ok(PushResult::DroppedNewest(item))
238            }
239            BufferOverflowPolicy::DropOldest => {
240                let dropped = state.queue.pop_front().ok_or_else(|| {
241                    Error::Eval("stream queue overflowed without an oldest item".to_owned())
242                })?;
243                state.queue.push_back(item);
244                state.stats.dropped_oldest += 1;
245                self.ready.notify_one();
246                Ok(PushResult::DroppedOldest(dropped))
247            }
248            BufferOverflowPolicy::Error => {
249                state.stats.overflow_errors += 1;
250                state.stats.rejected += 1;
251                Ok(PushResult::Rejected(item))
252            }
253        }
254    }
255
256    pub(super) fn next(&self) -> Result<Option<StreamItem>> {
257        let mut state = self
258            .state
259            .lock()
260            .map_err(|_| Error::PoisonedLock("push stream"))?;
261        let item = state.queue.pop_front();
262        if item.is_some() {
263            state.stats.yielded += 1;
264        }
265        Ok(item)
266    }
267
268    pub(super) fn next_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
269        let mut state = self
270            .state
271            .lock()
272            .map_err(|_| Error::PoisonedLock("push stream"))?;
273        if state.queue.is_empty() && !state.closed {
274            let (guard, wait) = self
275                .ready
276                .wait_timeout(state, timeout)
277                .map_err(|_| Error::PoisonedLock("push stream"))?;
278            state = guard;
279            if wait.timed_out() && state.queue.is_empty() {
280                state.stats.timeouts += 1;
281                state.stats.timed_out += 1;
282                return Ok(None);
283            }
284        }
285        let item = state.queue.pop_front();
286        if item.is_some() {
287            state.stats.yielded += 1;
288        }
289        Ok(item)
290    }
291
292    pub(super) fn peek(&self) -> Result<Option<StreamItem>> {
293        let state = self
294            .state
295            .lock()
296            .map_err(|_| Error::PoisonedLock("push stream"))?;
297        Ok(state.queue.front().cloned())
298    }
299
300    pub(super) fn is_done(&self) -> Result<bool> {
301        let state = self
302            .state
303            .lock()
304            .map_err(|_| Error::PoisonedLock("push stream"))?;
305        Ok(state.closed && state.queue.is_empty())
306    }
307
308    pub(super) fn close(&self) -> Result<()> {
309        let mut state = self
310            .state
311            .lock()
312            .map_err(|_| Error::PoisonedLock("push stream"))?;
313        state.closed = true;
314        state.stats.closed = true;
315        self.ready.notify_all();
316        Ok(())
317    }
318
319    pub(super) fn cancel(&self) -> Result<()> {
320        let mut state = self
321            .state
322            .lock()
323            .map_err(|_| Error::PoisonedLock("push stream"))?;
324        state.queue.clear();
325        state.closed = true;
326        state.stats.closed = true;
327        state.stats.cancelled = true;
328        self.ready.notify_all();
329        Ok(())
330    }
331
332    pub(super) fn stats(&self) -> Result<StreamStats> {
333        let state = self
334            .state
335            .lock()
336            .map_err(|_| Error::PoisonedLock("push stream"))?;
337        Ok(state.stats.clone())
338    }
339
340    pub(super) fn depth(&self) -> Result<usize> {
341        let state = self
342            .state
343            .lock()
344            .map_err(|_| Error::PoisonedLock("push stream"))?;
345        Ok(state.queue.len())
346    }
347}