Skip to main content

sim_lib_stream_core/
buffer.rs

1//! Buffer policy values and small expr field-extraction helpers.
2//!
3//! This module supplies the stream fabric's buffering contract:
4//! [`BufferPolicy`] (a bounded capacity plus an overflow rule),
5//! [`BufferOverflowPolicy`] (what to do when a full buffer receives a packet),
6//! and [`BackpressureOutcome`] (the result a producer observes when it offers a
7//! packet). Each carries a stable `stream/*` symbol so the policy round-trips
8//! through the runtime's symbol and [`Expr`] surfaces. The crate-private
9//! `field`/`string_field`/`symbol_field` helpers read named entries out of an
10//! [`Expr::Map`] and are reused by sibling modules that decode stream values.
11
12use sim_kernel::{Error, Expr, Result, Symbol};
13pub(crate) use sim_value::kind::expr_kind;
14
15/// Result a producer observes when it offers a packet to a buffered stream.
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum BackpressureOutcome {
18    /// The packet was buffered.
19    Accepted,
20    /// The buffer was full and this newest packet was dropped.
21    DroppedNewest,
22    /// The buffer was full and the oldest buffered packet was dropped to admit
23    /// this one.
24    DroppedOldest,
25    /// The producer is blocked until capacity frees up.
26    Blocked,
27    /// The offer timed out before capacity freed up.
28    TimedOut,
29    /// The offer was rejected by policy.
30    Rejected,
31    /// The stream is closed and accepts no further packets.
32    Closed,
33}
34
35impl BackpressureOutcome {
36    /// Returns the stable wire label for this outcome.
37    pub fn wire_label(self) -> &'static str {
38        match self {
39            Self::Accepted => "accepted",
40            Self::DroppedNewest => "dropped-newest",
41            Self::DroppedOldest => "dropped-oldest",
42            Self::Blocked => "blocked",
43            Self::TimedOut => "timed-out",
44            Self::Rejected => "rejected",
45            Self::Closed => "closed",
46        }
47    }
48
49    /// Returns the `stream/backpressure/<label>` symbol for this outcome.
50    pub fn symbol(self) -> Symbol {
51        Symbol::qualified("stream/backpressure", self.wire_label())
52    }
53
54    /// Parses an outcome from its bare or `stream/backpressure`-qualified
55    /// symbol.
56    ///
57    /// Returns an error for an unrecognized outcome symbol.
58    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
59        match symbol.as_qualified_str().as_str() {
60            "accepted" | "stream/backpressure/accepted" => Ok(Self::Accepted),
61            "dropped-newest" | "stream/backpressure/dropped-newest" => Ok(Self::DroppedNewest),
62            "dropped-oldest" | "stream/backpressure/dropped-oldest" => Ok(Self::DroppedOldest),
63            "blocked" | "stream/backpressure/blocked" => Ok(Self::Blocked),
64            "timed-out" | "stream/backpressure/timed-out" => Ok(Self::TimedOut),
65            "rejected" | "stream/backpressure/rejected" => Ok(Self::Rejected),
66            "closed" | "stream/backpressure/closed" => Ok(Self::Closed),
67            other => Err(Error::Eval(format!(
68                "unknown stream backpressure outcome {other}"
69            ))),
70        }
71    }
72}
73
74/// Rule applied when a full buffer receives another packet.
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum BufferOverflowPolicy {
77    /// Drop the incoming (newest) packet.
78    DropNewest,
79    /// Drop the oldest buffered packet to make room.
80    DropOldest,
81    /// Treat the overflow as an error.
82    Error,
83}
84
85impl BufferOverflowPolicy {
86    /// Returns the `stream/overflow/<rule>` symbol for this policy.
87    pub fn symbol(self) -> Symbol {
88        match self {
89            Self::DropNewest => Symbol::qualified("stream/overflow", "drop-newest"),
90            Self::DropOldest => Symbol::qualified("stream/overflow", "drop-oldest"),
91            Self::Error => Symbol::qualified("stream/overflow", "error"),
92        }
93    }
94
95    /// Parses an overflow policy from its `stream/overflow`-qualified symbol.
96    ///
97    /// Returns an error for an unrecognized overflow symbol.
98    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
99        match symbol.as_qualified_str().as_str() {
100            "stream/overflow/drop-newest" => Ok(Self::DropNewest),
101            "stream/overflow/drop-oldest" => Ok(Self::DropOldest),
102            "stream/overflow/error" => Ok(Self::Error),
103            other => Err(Error::Eval(format!(
104                "unknown stream buffer overflow policy {other}"
105            ))),
106        }
107    }
108}
109
110/// Buffering contract for a stream: a bounded capacity plus an overflow rule.
111///
112/// A policy is always bounded with a capacity of at least one; the constructors
113/// reject a zero capacity. The overflow rule decides what happens when the
114/// buffer is full.
115///
116/// # Examples
117///
118/// ```
119/// use sim_lib_stream_core::{BufferOverflowPolicy, BufferPolicy};
120///
121/// let policy = BufferPolicy::bounded(8).expect("capacity is nonzero");
122/// assert_eq!(policy.capacity(), 8);
123/// assert_eq!(policy.overflow(), BufferOverflowPolicy::DropNewest);
124/// assert!(BufferPolicy::bounded(0).is_err());
125/// ```
126#[derive(Clone, Debug, PartialEq, Eq)]
127pub struct BufferPolicy {
128    capacity: usize,
129    overflow: BufferOverflowPolicy,
130}
131
132impl BufferPolicy {
133    /// Builds a bounded policy of `capacity` with the default
134    /// [`BufferOverflowPolicy::DropNewest`] overflow rule.
135    ///
136    /// Returns an error if `capacity` is zero.
137    pub fn bounded(capacity: usize) -> Result<Self> {
138        Self::bounded_with_overflow(capacity, BufferOverflowPolicy::DropNewest)
139    }
140
141    /// Builds a bounded policy of `capacity` with an explicit overflow rule.
142    ///
143    /// Returns an error if `capacity` is zero.
144    pub fn bounded_with_overflow(capacity: usize, overflow: BufferOverflowPolicy) -> Result<Self> {
145        if capacity == 0 {
146            return Err(Error::Eval(
147                "stream buffer capacity must be greater than zero".to_owned(),
148            ));
149        }
150        Ok(Self { capacity, overflow })
151    }
152
153    /// Returns the buffer capacity.
154    pub fn capacity(&self) -> usize {
155        self.capacity
156    }
157
158    /// Returns the overflow rule.
159    pub fn overflow(&self) -> BufferOverflowPolicy {
160        self.overflow
161    }
162
163    /// Returns the `stream/buffer/bounded-<capacity>` symbol for this policy.
164    pub fn symbol(&self) -> Symbol {
165        Symbol::qualified("stream/buffer", format!("bounded-{}", self.capacity))
166    }
167
168    /// Encodes this policy as an [`Expr::Map`] with `capacity` and `overflow`
169    /// fields.
170    pub fn to_expr(&self) -> Expr {
171        Expr::Map(vec![
172            (
173                Expr::Symbol(Symbol::new("capacity")),
174                Expr::String(self.capacity.to_string()),
175            ),
176            (
177                Expr::Symbol(Symbol::new("overflow")),
178                Expr::Symbol(self.overflow.symbol()),
179            ),
180        ])
181    }
182
183    /// Decodes a policy from an [`Expr::Map`] produced by
184    /// [`to_expr`](Self::to_expr).
185    ///
186    /// Returns an error if the expression is not a map, a field is missing or
187    /// the wrong type, or the capacity fails to parse.
188    pub fn from_expr(expr: &Expr) -> Result<Self> {
189        let Expr::Map(entries) = expr else {
190            return Err(Error::TypeMismatch {
191                expected: "stream buffer policy map",
192                found: expr_kind(expr),
193            });
194        };
195        let capacity = string_field(entries, "capacity")?
196            .parse::<usize>()
197            .map_err(|err| Error::Eval(format!("invalid stream buffer capacity: {err}")))?;
198        let overflow = BufferOverflowPolicy::from_symbol(symbol_field(entries, "overflow")?)?;
199        Self::bounded_with_overflow(capacity, overflow)
200    }
201}
202
203pub(crate) fn string_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a str> {
204    match field(entries, name)? {
205        Expr::String(value) => Ok(value),
206        other => Err(Error::TypeMismatch {
207            expected: "string field",
208            found: expr_kind(other),
209        }),
210    }
211}
212
213pub(crate) fn symbol_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Symbol> {
214    match field(entries, name)? {
215        Expr::Symbol(value) => Ok(value),
216        other => Err(Error::TypeMismatch {
217            expected: "symbol field",
218            found: expr_kind(other),
219        }),
220    }
221}
222
223pub(crate) fn field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Expr> {
224    entries
225        .iter()
226        .find_map(|(key, value)| match key {
227            Expr::Symbol(symbol) if symbol.namespace.is_none() && symbol.name.as_ref() == name => {
228                Some(value)
229            }
230            _ => None,
231        })
232        .ok_or_else(|| Error::Eval(format!("stream value missing {name} field")))
233}