sim_lib_stream_core/spine/
queue.rs1use std::{
13 collections::VecDeque,
14 sync::{Condvar, Mutex},
15 time::Duration,
16};
17
18use sim_kernel::{Error, Result};
19
20use crate::{BackpressureOutcome, BufferOverflowPolicy, BufferPolicy};
21
22use super::StreamItem;
23
24#[derive(Clone, Debug, Default, PartialEq, Eq)]
33pub struct StreamStats {
34 pub pushed: u64,
36 pub accepted: u64,
39 pub yielded: u64,
41 pub dropped_newest: u64,
43 pub dropped_oldest: u64,
45 pub overflow_errors: u64,
47 pub rejected: u64,
49 pub timeouts: u64,
51 pub timed_out: u64,
53 pub blocked: u64,
55 pub closed: bool,
57 pub cancelled: bool,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum PushResult {
70 Accepted,
72 DroppedNewest(StreamItem),
75 DroppedOldest(StreamItem),
78 Rejected(StreamItem),
81 Closed(StreamItem),
83}
84
85impl PushResult {
86 pub fn outcome(&self) -> BackpressureOutcome {
88 match self {
89 Self::Accepted => BackpressureOutcome::Accepted,
90 Self::DroppedNewest(_) => BackpressureOutcome::DroppedNewest,
91 Self::DroppedOldest(_) => BackpressureOutcome::DroppedOldest,
92 Self::Rejected(_) => BackpressureOutcome::Rejected,
93 Self::Closed(_) => BackpressureOutcome::Closed,
94 }
95 }
96}
97
98pub(super) struct PullSpine {
99 state: Mutex<PullState>,
100}
101
102struct PullState {
103 items: VecDeque<StreamItem>,
104 closed: bool,
105 stats: StreamStats,
106}
107
108impl PullSpine {
109 pub(super) fn new(items: Vec<StreamItem>) -> Self {
110 Self {
111 state: Mutex::new(PullState {
112 items: items.into(),
113 closed: false,
114 stats: StreamStats::default(),
115 }),
116 }
117 }
118
119 pub(super) fn next(&self) -> Result<Option<StreamItem>> {
120 let mut state = self
121 .state
122 .lock()
123 .map_err(|_| Error::PoisonedLock("pull stream"))?;
124 if state.closed {
125 return Ok(None);
126 }
127 let item = state.items.pop_front();
128 if item.is_some() {
129 state.stats.yielded += 1;
130 } else {
131 state.closed = true;
132 }
133 Ok(item)
134 }
135
136 pub(super) fn peek(&self) -> Result<Option<StreamItem>> {
137 let state = self
138 .state
139 .lock()
140 .map_err(|_| Error::PoisonedLock("pull stream"))?;
141 Ok((!state.closed)
142 .then(|| state.items.front().cloned())
143 .flatten())
144 }
145
146 pub(super) fn is_done(&self) -> Result<bool> {
147 let state = self
148 .state
149 .lock()
150 .map_err(|_| Error::PoisonedLock("pull stream"))?;
151 Ok(state.closed || state.items.is_empty())
152 }
153
154 pub(super) fn close(&self) -> Result<()> {
155 let mut state = self
156 .state
157 .lock()
158 .map_err(|_| Error::PoisonedLock("pull stream"))?;
159 state.closed = true;
160 state.stats.closed = true;
161 Ok(())
162 }
163
164 pub(super) fn cancel(&self) -> Result<()> {
165 let mut state = self
166 .state
167 .lock()
168 .map_err(|_| Error::PoisonedLock("pull stream"))?;
169 state.items.clear();
170 state.closed = true;
171 state.stats.closed = true;
172 state.stats.cancelled = true;
173 Ok(())
174 }
175
176 pub(super) fn stats(&self) -> Result<StreamStats> {
177 let state = self
178 .state
179 .lock()
180 .map_err(|_| Error::PoisonedLock("pull stream"))?;
181 Ok(state.stats.clone())
182 }
183
184 pub(super) fn depth(&self) -> Result<usize> {
185 let state = self
186 .state
187 .lock()
188 .map_err(|_| Error::PoisonedLock("pull stream"))?;
189 Ok(state.items.len())
190 }
191}
192
193pub(super) struct PushSpine {
194 policy: BufferPolicy,
195 state: Mutex<PushState>,
196 ready: Condvar,
197}
198
199struct PushState {
200 queue: VecDeque<StreamItem>,
201 closed: bool,
202 stats: StreamStats,
203}
204
205impl PushSpine {
206 pub(super) fn new(policy: BufferPolicy) -> Self {
207 Self {
208 policy,
209 state: Mutex::new(PushState {
210 queue: VecDeque::new(),
211 closed: false,
212 stats: StreamStats::default(),
213 }),
214 ready: Condvar::new(),
215 }
216 }
217
218 pub(super) fn push(&self, item: StreamItem) -> Result<PushResult> {
219 let mut state = self
220 .state
221 .lock()
222 .map_err(|_| Error::PoisonedLock("push stream"))?;
223 state.stats.pushed += 1;
224 if state.closed {
225 state.stats.closed = true;
226 return Ok(PushResult::Closed(item));
227 }
228 if state.queue.len() < self.policy.capacity() {
229 state.queue.push_back(item);
230 state.stats.accepted += 1;
231 self.ready.notify_one();
232 return Ok(PushResult::Accepted);
233 }
234 match self.policy.overflow() {
235 BufferOverflowPolicy::DropNewest => {
236 state.stats.dropped_newest += 1;
237 Ok(PushResult::DroppedNewest(item))
238 }
239 BufferOverflowPolicy::DropOldest => {
240 let dropped = state.queue.pop_front().ok_or_else(|| {
241 Error::Eval("stream queue overflowed without an oldest item".to_owned())
242 })?;
243 state.queue.push_back(item);
244 state.stats.dropped_oldest += 1;
245 self.ready.notify_one();
246 Ok(PushResult::DroppedOldest(dropped))
247 }
248 BufferOverflowPolicy::Error => {
249 state.stats.overflow_errors += 1;
250 state.stats.rejected += 1;
251 Ok(PushResult::Rejected(item))
252 }
253 }
254 }
255
256 pub(super) fn next(&self) -> Result<Option<StreamItem>> {
257 let mut state = self
258 .state
259 .lock()
260 .map_err(|_| Error::PoisonedLock("push stream"))?;
261 let item = state.queue.pop_front();
262 if item.is_some() {
263 state.stats.yielded += 1;
264 }
265 Ok(item)
266 }
267
268 pub(super) fn next_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
269 let mut state = self
270 .state
271 .lock()
272 .map_err(|_| Error::PoisonedLock("push stream"))?;
273 if state.queue.is_empty() && !state.closed {
274 let (guard, wait) = self
275 .ready
276 .wait_timeout(state, timeout)
277 .map_err(|_| Error::PoisonedLock("push stream"))?;
278 state = guard;
279 if wait.timed_out() && state.queue.is_empty() {
280 state.stats.timeouts += 1;
281 state.stats.timed_out += 1;
282 return Ok(None);
283 }
284 }
285 let item = state.queue.pop_front();
286 if item.is_some() {
287 state.stats.yielded += 1;
288 }
289 Ok(item)
290 }
291
292 pub(super) fn peek(&self) -> Result<Option<StreamItem>> {
293 let state = self
294 .state
295 .lock()
296 .map_err(|_| Error::PoisonedLock("push stream"))?;
297 Ok(state.queue.front().cloned())
298 }
299
300 pub(super) fn is_done(&self) -> Result<bool> {
301 let state = self
302 .state
303 .lock()
304 .map_err(|_| Error::PoisonedLock("push stream"))?;
305 Ok(state.closed && state.queue.is_empty())
306 }
307
308 pub(super) fn close(&self) -> Result<()> {
309 let mut state = self
310 .state
311 .lock()
312 .map_err(|_| Error::PoisonedLock("push stream"))?;
313 state.closed = true;
314 state.stats.closed = true;
315 self.ready.notify_all();
316 Ok(())
317 }
318
319 pub(super) fn cancel(&self) -> Result<()> {
320 let mut state = self
321 .state
322 .lock()
323 .map_err(|_| Error::PoisonedLock("push stream"))?;
324 state.queue.clear();
325 state.closed = true;
326 state.stats.closed = true;
327 state.stats.cancelled = true;
328 self.ready.notify_all();
329 Ok(())
330 }
331
332 pub(super) fn stats(&self) -> Result<StreamStats> {
333 let state = self
334 .state
335 .lock()
336 .map_err(|_| Error::PoisonedLock("push stream"))?;
337 Ok(state.stats.clone())
338 }
339
340 pub(super) fn depth(&self) -> Result<usize> {
341 let state = self
342 .state
343 .lock()
344 .map_err(|_| Error::PoisonedLock("push stream"))?;
345 Ok(state.queue.len())
346 }
347}