use sim_kernel::{Error, Expr, Result, Symbol};
use crate::{
BufferPolicy, StreamItem, StreamMedia, StreamMetadata, StreamStats, StreamValue,
TransportProfile,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum StreamInspectorStatus {
Live,
Ended,
Cancelled,
BufferOverflow,
Disconnected,
Reconnecting,
RefusedProfile,
Faulted,
}
impl StreamInspectorStatus {
pub fn wire_label(self) -> &'static str {
match self {
Self::Live => "live",
Self::Ended => "ended",
Self::Cancelled => "cancelled",
Self::BufferOverflow => "buffer-overflow",
Self::Disconnected => "disconnected",
Self::Reconnecting => "reconnecting",
Self::RefusedProfile => "refused-profile",
Self::Faulted => "faulted",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("stream/inspector-status", self.wire_label())
}
pub fn from_stats(stats: &StreamStats, done: bool) -> Self {
if stats.cancelled {
Self::Cancelled
} else if stats.dropped_newest > 0
|| stats.dropped_oldest > 0
|| stats.overflow_errors > 0
|| stats.rejected > 0
{
Self::BufferOverflow
} else if done || stats.closed {
Self::Ended
} else {
Self::Live
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamInspectorSnapshot {
pub stream_id: Symbol,
pub route: Symbol,
pub media: StreamMedia,
pub profile: Symbol,
pub clock: Symbol,
pub status: StreamInspectorStatus,
pub buffer: BufferPolicy,
pub queue_depth: usize,
pub dropped_count: u64,
pub last_sequence: Option<u64>,
pub recent_diagnostics: Vec<Symbol>,
}
impl StreamInspectorSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
metadata: &StreamMetadata,
route: Symbol,
profile: Symbol,
status: StreamInspectorStatus,
queue_depth: usize,
stats: &StreamStats,
last_sequence: Option<u64>,
recent_diagnostics: Vec<Symbol>,
) -> Self {
Self {
stream_id: metadata.id().clone(),
route,
media: metadata.media(),
profile,
clock: metadata.clock().clone(),
status,
buffer: metadata.buffer().clone(),
queue_depth,
dropped_count: stats.dropped_newest.saturating_add(stats.dropped_oldest),
last_sequence,
recent_diagnostics,
}
}
pub fn from_stream_value(
stream: &StreamValue,
route: Symbol,
profile: &TransportProfile,
recent_diagnostics: Vec<Symbol>,
) -> Result<Self> {
let stats = stream.stats()?;
let queue_depth = stream.queue_depth()?;
let observed = stats
.accepted
.max(stats.yielded.saturating_add(queue_depth as u64));
let last_sequence = observed.checked_sub(1);
let status = StreamInspectorStatus::from_stats(&stats, stream.is_done()?);
Ok(Self::new(
stream.metadata(),
route,
profile.name().clone(),
status,
queue_depth,
&stats,
last_sequence,
recent_diagnostics,
))
}
pub fn to_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("inspector")),
Expr::Symbol(stream_inspector_model_symbol()),
),
(
Expr::Symbol(Symbol::new("id")),
Expr::Symbol(self.stream_id.clone()),
),
(
Expr::Symbol(Symbol::new("route")),
Expr::Symbol(self.route.clone()),
),
(
Expr::Symbol(Symbol::new("media")),
Expr::Symbol(self.media.symbol()),
),
(
Expr::Symbol(Symbol::new("profile")),
Expr::Symbol(self.profile.clone()),
),
(
Expr::Symbol(Symbol::new("clock")),
Expr::Symbol(self.clock.clone()),
),
(
Expr::Symbol(Symbol::new("status")),
Expr::Symbol(self.status.symbol()),
),
(Expr::Symbol(Symbol::new("buffer")), self.buffer.to_expr()),
(
Expr::Symbol(Symbol::new("queue-depth")),
Expr::String(self.queue_depth.to_string()),
),
(
Expr::Symbol(Symbol::new("dropped-count")),
Expr::String(self.dropped_count.to_string()),
),
(
Expr::Symbol(Symbol::new("last-sequence")),
optional_u64_expr(self.last_sequence),
),
(
Expr::Symbol(Symbol::new("recent-diagnostics")),
Expr::List(
self.recent_diagnostics
.iter()
.cloned()
.map(Expr::Symbol)
.collect(),
),
),
])
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum StreamFaultKind {
Drop,
Reorder,
Duplicate,
Delay,
Cancel,
Timeout,
Disconnect,
Reconnect,
UnsupportedProfile,
}
impl StreamFaultKind {
pub fn wire_label(self) -> &'static str {
match self {
Self::Drop => "drop",
Self::Reorder => "reorder",
Self::Duplicate => "duplicate",
Self::Delay => "delay",
Self::Cancel => "cancel",
Self::Timeout => "timeout",
Self::Disconnect => "disconnect",
Self::Reconnect => "reconnect",
Self::UnsupportedProfile => "unsupported-profile",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("stream/fault", self.wire_label())
}
pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
match symbol.as_qualified_str().as_str() {
"drop" | "stream/fault/drop" => Ok(Self::Drop),
"reorder" | "stream/fault/reorder" => Ok(Self::Reorder),
"duplicate" | "stream/fault/duplicate" => Ok(Self::Duplicate),
"delay" | "stream/fault/delay" => Ok(Self::Delay),
"cancel" | "stream/fault/cancel" => Ok(Self::Cancel),
"timeout" | "stream/fault/timeout" => Ok(Self::Timeout),
"disconnect" | "stream/fault/disconnect" => Ok(Self::Disconnect),
"reconnect" | "stream/fault/reconnect" => Ok(Self::Reconnect),
"unsupported-profile" | "stream/fault/unsupported-profile" => {
Ok(Self::UnsupportedProfile)
}
other => Err(Error::Eval(format!("unknown stream fault {other}"))),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamFaultSpec {
pub kind: StreamFaultKind,
pub count: usize,
}
impl StreamFaultSpec {
pub fn new(kind: StreamFaultKind, count: usize) -> Self {
Self {
kind,
count: count.max(1),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct StreamFaultPlan {
faults: Vec<StreamFaultSpec>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamFaultResult {
pub items: Vec<StreamItem>,
pub diagnostics: Vec<Symbol>,
}
impl StreamFaultPlan {
pub fn new(faults: Vec<StreamFaultSpec>) -> Self {
Self { faults }
}
pub fn faults(&self) -> &[StreamFaultSpec] {
&self.faults
}
pub fn apply(&self, items: &[StreamItem]) -> StreamFaultResult {
let mut items = items.to_vec();
let mut diagnostics = Vec::new();
for fault in &self.faults {
diagnostics.push(fault.kind.symbol());
match fault.kind {
StreamFaultKind::Drop => {
let remove = fault.count.min(items.len());
items.drain(0..remove);
}
StreamFaultKind::Reorder => {
if items.len() > 1 {
items.swap(0, 1);
}
}
StreamFaultKind::Duplicate => {
if let Some(item) = items.first().cloned() {
for _ in 0..fault.count {
items.insert(0, item.clone());
}
}
}
StreamFaultKind::Delay => {
if !items.is_empty() {
let rotate = fault.count.min(items.len());
items.rotate_left(rotate);
}
}
StreamFaultKind::Cancel
| StreamFaultKind::Timeout
| StreamFaultKind::Disconnect
| StreamFaultKind::Reconnect
| StreamFaultKind::UnsupportedProfile => {}
}
}
StreamFaultResult { items, diagnostics }
}
pub fn to_expr(&self) -> Expr {
Expr::List(
self.faults
.iter()
.map(|fault| {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("fault")),
Expr::Symbol(fault.kind.symbol()),
),
(
Expr::Symbol(Symbol::new("count")),
Expr::String(fault.count.to_string()),
),
])
})
.collect(),
)
}
}
pub fn stream_inspector_model_symbol() -> Symbol {
Symbol::qualified("stream/inspector", "v1")
}
pub fn stream_inspector_route_local_symbol() -> Symbol {
Symbol::qualified("stream/route", "local")
}
pub fn stream_inspector_status_symbols() -> [Symbol; 8] {
[
StreamInspectorStatus::Live.symbol(),
StreamInspectorStatus::Ended.symbol(),
StreamInspectorStatus::Cancelled.symbol(),
StreamInspectorStatus::BufferOverflow.symbol(),
StreamInspectorStatus::Disconnected.symbol(),
StreamInspectorStatus::Reconnecting.symbol(),
StreamInspectorStatus::RefusedProfile.symbol(),
StreamInspectorStatus::Faulted.symbol(),
]
}
pub fn stream_fault_symbols() -> [Symbol; 9] {
[
StreamFaultKind::Drop.symbol(),
StreamFaultKind::Reorder.symbol(),
StreamFaultKind::Duplicate.symbol(),
StreamFaultKind::Delay.symbol(),
StreamFaultKind::Cancel.symbol(),
StreamFaultKind::Timeout.symbol(),
StreamFaultKind::Disconnect.symbol(),
StreamFaultKind::Reconnect.symbol(),
StreamFaultKind::UnsupportedProfile.symbol(),
]
}
pub fn ensure_fault_supported(kind: StreamFaultKind) -> Result<()> {
if stream_fault_symbols().contains(&kind.symbol()) {
Ok(())
} else {
Err(Error::Eval("unsupported stream fault".to_owned()))
}
}
fn optional_u64_expr(value: Option<u64>) -> Expr {
value
.map(|value| Expr::String(value.to_string()))
.unwrap_or(Expr::Nil)
}