1use sim_kernel::{Error, Expr, Result, Symbol};
13pub(crate) use sim_value::kind::expr_kind;
14
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum BackpressureOutcome {
18 Accepted,
20 DroppedNewest,
22 DroppedOldest,
25 Blocked,
27 TimedOut,
29 Rejected,
31 Closed,
33}
34
35impl BackpressureOutcome {
36 pub fn wire_label(self) -> &'static str {
38 match self {
39 Self::Accepted => "accepted",
40 Self::DroppedNewest => "dropped-newest",
41 Self::DroppedOldest => "dropped-oldest",
42 Self::Blocked => "blocked",
43 Self::TimedOut => "timed-out",
44 Self::Rejected => "rejected",
45 Self::Closed => "closed",
46 }
47 }
48
49 pub fn symbol(self) -> Symbol {
51 Symbol::qualified("stream/backpressure", self.wire_label())
52 }
53
54 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
59 match symbol.as_qualified_str().as_str() {
60 "accepted" | "stream/backpressure/accepted" => Ok(Self::Accepted),
61 "dropped-newest" | "stream/backpressure/dropped-newest" => Ok(Self::DroppedNewest),
62 "dropped-oldest" | "stream/backpressure/dropped-oldest" => Ok(Self::DroppedOldest),
63 "blocked" | "stream/backpressure/blocked" => Ok(Self::Blocked),
64 "timed-out" | "stream/backpressure/timed-out" => Ok(Self::TimedOut),
65 "rejected" | "stream/backpressure/rejected" => Ok(Self::Rejected),
66 "closed" | "stream/backpressure/closed" => Ok(Self::Closed),
67 other => Err(Error::Eval(format!(
68 "unknown stream backpressure outcome {other}"
69 ))),
70 }
71 }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum BufferOverflowPolicy {
77 DropNewest,
79 DropOldest,
81 Error,
83}
84
85impl BufferOverflowPolicy {
86 pub fn symbol(self) -> Symbol {
88 match self {
89 Self::DropNewest => Symbol::qualified("stream/overflow", "drop-newest"),
90 Self::DropOldest => Symbol::qualified("stream/overflow", "drop-oldest"),
91 Self::Error => Symbol::qualified("stream/overflow", "error"),
92 }
93 }
94
95 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
99 match symbol.as_qualified_str().as_str() {
100 "stream/overflow/drop-newest" => Ok(Self::DropNewest),
101 "stream/overflow/drop-oldest" => Ok(Self::DropOldest),
102 "stream/overflow/error" => Ok(Self::Error),
103 other => Err(Error::Eval(format!(
104 "unknown stream buffer overflow policy {other}"
105 ))),
106 }
107 }
108}
109
110#[derive(Clone, Debug, PartialEq, Eq)]
127pub struct BufferPolicy {
128 capacity: usize,
129 overflow: BufferOverflowPolicy,
130}
131
132impl BufferPolicy {
133 pub fn bounded(capacity: usize) -> Result<Self> {
138 Self::bounded_with_overflow(capacity, BufferOverflowPolicy::DropNewest)
139 }
140
141 pub fn bounded_with_overflow(capacity: usize, overflow: BufferOverflowPolicy) -> Result<Self> {
145 if capacity == 0 {
146 return Err(Error::Eval(
147 "stream buffer capacity must be greater than zero".to_owned(),
148 ));
149 }
150 Ok(Self { capacity, overflow })
151 }
152
153 pub fn capacity(&self) -> usize {
155 self.capacity
156 }
157
158 pub fn overflow(&self) -> BufferOverflowPolicy {
160 self.overflow
161 }
162
163 pub fn symbol(&self) -> Symbol {
165 Symbol::qualified("stream/buffer", format!("bounded-{}", self.capacity))
166 }
167
168 pub fn to_expr(&self) -> Expr {
171 Expr::Map(vec![
172 (
173 Expr::Symbol(Symbol::new("capacity")),
174 Expr::String(self.capacity.to_string()),
175 ),
176 (
177 Expr::Symbol(Symbol::new("overflow")),
178 Expr::Symbol(self.overflow.symbol()),
179 ),
180 ])
181 }
182
183 pub fn from_expr(expr: &Expr) -> Result<Self> {
189 let Expr::Map(entries) = expr else {
190 return Err(Error::TypeMismatch {
191 expected: "stream buffer policy map",
192 found: expr_kind(expr),
193 });
194 };
195 let capacity = string_field(entries, "capacity")?
196 .parse::<usize>()
197 .map_err(|err| Error::Eval(format!("invalid stream buffer capacity: {err}")))?;
198 let overflow = BufferOverflowPolicy::from_symbol(symbol_field(entries, "overflow")?)?;
199 Self::bounded_with_overflow(capacity, overflow)
200 }
201}
202
203pub(crate) fn string_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a str> {
204 match field(entries, name)? {
205 Expr::String(value) => Ok(value),
206 other => Err(Error::TypeMismatch {
207 expected: "string field",
208 found: expr_kind(other),
209 }),
210 }
211}
212
213pub(crate) fn symbol_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Symbol> {
214 match field(entries, name)? {
215 Expr::Symbol(value) => Ok(value),
216 other => Err(Error::TypeMismatch {
217 expected: "symbol field",
218 found: expr_kind(other),
219 }),
220 }
221}
222
223pub(crate) fn field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Expr> {
224 entries
225 .iter()
226 .find_map(|(key, value)| match key {
227 Expr::Symbol(symbol) if symbol.namespace.is_none() && symbol.name.as_ref() == name => {
228 Some(value)
229 }
230 _ => None,
231 })
232 .ok_or_else(|| Error::Eval(format!("stream value missing {name} field")))
233}