Skip to main content

dreamwell_runtime/sync/
stage.rs

1//! Staging buffer — holds authority events until tick boundary application.
2//!
3//! Events from the authority adapter are staged here during PollAuthority,
4//! then applied atomically during ApplyAuthorityEvents phase.
5
6use crate::authority::AuthorityEvent;
7
8/// Staging buffer for authority events.
9/// Events are pushed during poll, consumed during apply.
10pub struct StagingBuffer {
11    events: Vec<AuthorityEvent>,
12}
13
14impl StagingBuffer {
15    pub fn new() -> Self {
16        Self {
17            events: Vec::with_capacity(64),
18        }
19    }
20
21    /// Stage an event for later application.
22    pub fn push(&mut self, event: AuthorityEvent) {
23        self.events.push(event);
24    }
25
26    /// Stage multiple events.
27    pub fn extend(&mut self, events: impl IntoIterator<Item = AuthorityEvent>) {
28        self.events.extend(events);
29    }
30
31    /// Drain all staged events for tick-boundary application.
32    pub fn drain(&mut self) -> std::vec::Drain<'_, AuthorityEvent> {
33        self.events.drain(..)
34    }
35
36    /// Number of staged events.
37    pub fn len(&self) -> usize {
38        self.events.len()
39    }
40
41    /// Whether the buffer is empty.
42    pub fn is_empty(&self) -> bool {
43        self.events.is_empty()
44    }
45
46    /// Discard all staged events.
47    pub fn clear(&mut self) {
48        self.events.clear();
49    }
50}
51
52impl Default for StagingBuffer {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61
62    #[test]
63    fn push_and_drain() {
64        let mut buf = StagingBuffer::new();
65        buf.push(AuthorityEvent::Ack { seq: 1 });
66        buf.push(AuthorityEvent::Ack { seq: 2 });
67        assert_eq!(buf.len(), 2);
68
69        let drained: Vec<_> = buf.drain().collect();
70        assert_eq!(drained.len(), 2);
71        assert!(buf.is_empty());
72    }
73
74    #[test]
75    fn extend_from_iter() {
76        let mut buf = StagingBuffer::new();
77        let events = vec![
78            AuthorityEvent::Ack { seq: 1 },
79            AuthorityEvent::Reject {
80                seq: 2,
81                reason: "test".into(),
82            },
83        ];
84        buf.extend(events);
85        assert_eq!(buf.len(), 2);
86    }
87
88    #[test]
89    fn clear_discards() {
90        let mut buf = StagingBuffer::new();
91        buf.push(AuthorityEvent::Ack { seq: 1 });
92        buf.clear();
93        assert!(buf.is_empty());
94    }
95}