use sim_kernel::{Error, Expr, Result, Symbol};
pub(crate) use sim_value::kind::expr_kind;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackpressureOutcome {
Accepted,
DroppedNewest,
DroppedOldest,
Blocked,
TimedOut,
Rejected,
Closed,
}
impl BackpressureOutcome {
pub fn wire_label(self) -> &'static str {
match self {
Self::Accepted => "accepted",
Self::DroppedNewest => "dropped-newest",
Self::DroppedOldest => "dropped-oldest",
Self::Blocked => "blocked",
Self::TimedOut => "timed-out",
Self::Rejected => "rejected",
Self::Closed => "closed",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("stream/backpressure", self.wire_label())
}
pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
match symbol.as_qualified_str().as_str() {
"accepted" | "stream/backpressure/accepted" => Ok(Self::Accepted),
"dropped-newest" | "stream/backpressure/dropped-newest" => Ok(Self::DroppedNewest),
"dropped-oldest" | "stream/backpressure/dropped-oldest" => Ok(Self::DroppedOldest),
"blocked" | "stream/backpressure/blocked" => Ok(Self::Blocked),
"timed-out" | "stream/backpressure/timed-out" => Ok(Self::TimedOut),
"rejected" | "stream/backpressure/rejected" => Ok(Self::Rejected),
"closed" | "stream/backpressure/closed" => Ok(Self::Closed),
other => Err(Error::Eval(format!(
"unknown stream backpressure outcome {other}"
))),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BufferOverflowPolicy {
DropNewest,
DropOldest,
Error,
}
impl BufferOverflowPolicy {
pub fn symbol(self) -> Symbol {
match self {
Self::DropNewest => Symbol::qualified("stream/overflow", "drop-newest"),
Self::DropOldest => Symbol::qualified("stream/overflow", "drop-oldest"),
Self::Error => Symbol::qualified("stream/overflow", "error"),
}
}
pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
match symbol.as_qualified_str().as_str() {
"stream/overflow/drop-newest" => Ok(Self::DropNewest),
"stream/overflow/drop-oldest" => Ok(Self::DropOldest),
"stream/overflow/error" => Ok(Self::Error),
other => Err(Error::Eval(format!(
"unknown stream buffer overflow policy {other}"
))),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BufferPolicy {
capacity: usize,
overflow: BufferOverflowPolicy,
}
impl BufferPolicy {
pub fn bounded(capacity: usize) -> Result<Self> {
Self::bounded_with_overflow(capacity, BufferOverflowPolicy::DropNewest)
}
pub fn bounded_with_overflow(capacity: usize, overflow: BufferOverflowPolicy) -> Result<Self> {
if capacity == 0 {
return Err(Error::Eval(
"stream buffer capacity must be greater than zero".to_owned(),
));
}
Ok(Self { capacity, overflow })
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn overflow(&self) -> BufferOverflowPolicy {
self.overflow
}
pub fn symbol(&self) -> Symbol {
Symbol::qualified("stream/buffer", format!("bounded-{}", self.capacity))
}
pub fn to_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("capacity")),
Expr::String(self.capacity.to_string()),
),
(
Expr::Symbol(Symbol::new("overflow")),
Expr::Symbol(self.overflow.symbol()),
),
])
}
pub fn from_expr(expr: &Expr) -> Result<Self> {
let Expr::Map(entries) = expr else {
return Err(Error::TypeMismatch {
expected: "stream buffer policy map",
found: expr_kind(expr),
});
};
let capacity = string_field(entries, "capacity")?
.parse::<usize>()
.map_err(|err| Error::Eval(format!("invalid stream buffer capacity: {err}")))?;
let overflow = BufferOverflowPolicy::from_symbol(symbol_field(entries, "overflow")?)?;
Self::bounded_with_overflow(capacity, overflow)
}
}
pub(crate) fn string_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a str> {
match field(entries, name)? {
Expr::String(value) => Ok(value),
other => Err(Error::TypeMismatch {
expected: "string field",
found: expr_kind(other),
}),
}
}
pub(crate) fn symbol_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Symbol> {
match field(entries, name)? {
Expr::Symbol(value) => Ok(value),
other => Err(Error::TypeMismatch {
expected: "symbol field",
found: expr_kind(other),
}),
}
}
pub(crate) fn field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Expr> {
entries
.iter()
.find_map(|(key, value)| match key {
Expr::Symbol(symbol) if symbol.namespace.is_none() && symbol.name.as_ref() == name => {
Some(value)
}
_ => None,
})
.ok_or_else(|| Error::Eval(format!("stream value missing {name} field")))
}