1use sim_kernel::{Error, Expr, Result, Symbol};
14
15use crate::{
16 BufferPolicy, StreamItem, StreamMedia, StreamMetadata, StreamStats, StreamValue,
17 TransportProfile,
18};
19
20#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub enum StreamInspectorStatus {
27 Live,
29 Ended,
31 Cancelled,
33 BufferOverflow,
35 Disconnected,
37 Reconnecting,
39 RefusedProfile,
41 Faulted,
43}
44
45impl StreamInspectorStatus {
46 pub fn wire_label(self) -> &'static str {
48 match self {
49 Self::Live => "live",
50 Self::Ended => "ended",
51 Self::Cancelled => "cancelled",
52 Self::BufferOverflow => "buffer-overflow",
53 Self::Disconnected => "disconnected",
54 Self::Reconnecting => "reconnecting",
55 Self::RefusedProfile => "refused-profile",
56 Self::Faulted => "faulted",
57 }
58 }
59
60 pub fn symbol(self) -> Symbol {
62 Symbol::qualified("stream/inspector-status", self.wire_label())
63 }
64
65 pub fn from_stats(stats: &StreamStats, done: bool) -> Self {
71 if stats.cancelled {
72 Self::Cancelled
73 } else if stats.dropped_newest > 0
74 || stats.dropped_oldest > 0
75 || stats.overflow_errors > 0
76 || stats.rejected > 0
77 {
78 Self::BufferOverflow
79 } else if done || stats.closed {
80 Self::Ended
81 } else {
82 Self::Live
83 }
84 }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct StreamInspectorSnapshot {
95 pub stream_id: Symbol,
97 pub route: Symbol,
99 pub media: StreamMedia,
101 pub profile: Symbol,
103 pub clock: Symbol,
105 pub status: StreamInspectorStatus,
107 pub buffer: BufferPolicy,
109 pub queue_depth: usize,
111 pub dropped_count: u64,
113 pub last_sequence: Option<u64>,
115 pub recent_diagnostics: Vec<Symbol>,
117}
118
119impl StreamInspectorSnapshot {
120 #[allow(clippy::too_many_arguments)]
122 pub fn new(
123 metadata: &StreamMetadata,
124 route: Symbol,
125 profile: Symbol,
126 status: StreamInspectorStatus,
127 queue_depth: usize,
128 stats: &StreamStats,
129 last_sequence: Option<u64>,
130 recent_diagnostics: Vec<Symbol>,
131 ) -> Self {
132 Self {
133 stream_id: metadata.id().clone(),
134 route,
135 media: metadata.media(),
136 profile,
137 clock: metadata.clock().clone(),
138 status,
139 buffer: metadata.buffer().clone(),
140 queue_depth,
141 dropped_count: stats.dropped_newest.saturating_add(stats.dropped_oldest),
142 last_sequence,
143 recent_diagnostics,
144 }
145 }
146
147 pub fn from_stream_value(
153 stream: &StreamValue,
154 route: Symbol,
155 profile: &TransportProfile,
156 recent_diagnostics: Vec<Symbol>,
157 ) -> Result<Self> {
158 let stats = stream.stats()?;
159 let queue_depth = stream.queue_depth()?;
160 let observed = stats
161 .accepted
162 .max(stats.yielded.saturating_add(queue_depth as u64));
163 let last_sequence = observed.checked_sub(1);
164 let status = StreamInspectorStatus::from_stats(&stats, stream.is_done()?);
165 Ok(Self::new(
166 stream.metadata(),
167 route,
168 profile.name().clone(),
169 status,
170 queue_depth,
171 &stats,
172 last_sequence,
173 recent_diagnostics,
174 ))
175 }
176
177 pub fn to_expr(&self) -> Expr {
179 Expr::Map(vec![
180 (
181 Expr::Symbol(Symbol::new("inspector")),
182 Expr::Symbol(stream_inspector_model_symbol()),
183 ),
184 (
185 Expr::Symbol(Symbol::new("id")),
186 Expr::Symbol(self.stream_id.clone()),
187 ),
188 (
189 Expr::Symbol(Symbol::new("route")),
190 Expr::Symbol(self.route.clone()),
191 ),
192 (
193 Expr::Symbol(Symbol::new("media")),
194 Expr::Symbol(self.media.symbol()),
195 ),
196 (
197 Expr::Symbol(Symbol::new("profile")),
198 Expr::Symbol(self.profile.clone()),
199 ),
200 (
201 Expr::Symbol(Symbol::new("clock")),
202 Expr::Symbol(self.clock.clone()),
203 ),
204 (
205 Expr::Symbol(Symbol::new("status")),
206 Expr::Symbol(self.status.symbol()),
207 ),
208 (Expr::Symbol(Symbol::new("buffer")), self.buffer.to_expr()),
209 (
210 Expr::Symbol(Symbol::new("queue-depth")),
211 Expr::String(self.queue_depth.to_string()),
212 ),
213 (
214 Expr::Symbol(Symbol::new("dropped-count")),
215 Expr::String(self.dropped_count.to_string()),
216 ),
217 (
218 Expr::Symbol(Symbol::new("last-sequence")),
219 optional_u64_expr(self.last_sequence),
220 ),
221 (
222 Expr::Symbol(Symbol::new("recent-diagnostics")),
223 Expr::List(
224 self.recent_diagnostics
225 .iter()
226 .cloned()
227 .map(Expr::Symbol)
228 .collect(),
229 ),
230 ),
231 ])
232 }
233}
234
235#[derive(Clone, Copy, Debug, PartialEq, Eq)]
242pub enum StreamFaultKind {
243 Drop,
245 Reorder,
247 Duplicate,
249 Delay,
251 Cancel,
253 Timeout,
255 Disconnect,
257 Reconnect,
259 UnsupportedProfile,
261}
262
263impl StreamFaultKind {
264 pub fn wire_label(self) -> &'static str {
266 match self {
267 Self::Drop => "drop",
268 Self::Reorder => "reorder",
269 Self::Duplicate => "duplicate",
270 Self::Delay => "delay",
271 Self::Cancel => "cancel",
272 Self::Timeout => "timeout",
273 Self::Disconnect => "disconnect",
274 Self::Reconnect => "reconnect",
275 Self::UnsupportedProfile => "unsupported-profile",
276 }
277 }
278
279 pub fn symbol(self) -> Symbol {
281 Symbol::qualified("stream/fault", self.wire_label())
282 }
283
284 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
289 match symbol.as_qualified_str().as_str() {
290 "drop" | "stream/fault/drop" => Ok(Self::Drop),
291 "reorder" | "stream/fault/reorder" => Ok(Self::Reorder),
292 "duplicate" | "stream/fault/duplicate" => Ok(Self::Duplicate),
293 "delay" | "stream/fault/delay" => Ok(Self::Delay),
294 "cancel" | "stream/fault/cancel" => Ok(Self::Cancel),
295 "timeout" | "stream/fault/timeout" => Ok(Self::Timeout),
296 "disconnect" | "stream/fault/disconnect" => Ok(Self::Disconnect),
297 "reconnect" | "stream/fault/reconnect" => Ok(Self::Reconnect),
298 "unsupported-profile" | "stream/fault/unsupported-profile" => {
299 Ok(Self::UnsupportedProfile)
300 }
301 other => Err(Error::Eval(format!("unknown stream fault {other}"))),
302 }
303 }
304}
305
306#[derive(Clone, Debug, PartialEq, Eq)]
308pub struct StreamFaultSpec {
309 pub kind: StreamFaultKind,
311 pub count: usize,
313}
314
315impl StreamFaultSpec {
316 pub fn new(kind: StreamFaultKind, count: usize) -> Self {
318 Self {
319 kind,
320 count: count.max(1),
321 }
322 }
323}
324
325#[derive(Clone, Debug, Default, PartialEq, Eq)]
327pub struct StreamFaultPlan {
328 faults: Vec<StreamFaultSpec>,
329}
330
331#[derive(Clone, Debug, PartialEq, Eq)]
333pub struct StreamFaultResult {
334 pub items: Vec<StreamItem>,
336 pub diagnostics: Vec<Symbol>,
338}
339
340impl StreamFaultPlan {
341 pub fn new(faults: Vec<StreamFaultSpec>) -> Self {
343 Self { faults }
344 }
345
346 pub fn faults(&self) -> &[StreamFaultSpec] {
348 &self.faults
349 }
350
351 pub fn apply(&self, items: &[StreamItem]) -> StreamFaultResult {
357 let mut items = items.to_vec();
358 let mut diagnostics = Vec::new();
359 for fault in &self.faults {
360 diagnostics.push(fault.kind.symbol());
361 match fault.kind {
362 StreamFaultKind::Drop => {
363 let remove = fault.count.min(items.len());
364 items.drain(0..remove);
365 }
366 StreamFaultKind::Reorder => {
367 if items.len() > 1 {
368 items.swap(0, 1);
369 }
370 }
371 StreamFaultKind::Duplicate => {
372 if let Some(item) = items.first().cloned() {
373 for _ in 0..fault.count {
374 items.insert(0, item.clone());
375 }
376 }
377 }
378 StreamFaultKind::Delay => {
379 if !items.is_empty() {
380 let rotate = fault.count.min(items.len());
381 items.rotate_left(rotate);
382 }
383 }
384 StreamFaultKind::Cancel
385 | StreamFaultKind::Timeout
386 | StreamFaultKind::Disconnect
387 | StreamFaultKind::Reconnect
388 | StreamFaultKind::UnsupportedProfile => {}
389 }
390 }
391 StreamFaultResult { items, diagnostics }
392 }
393
394 pub fn to_expr(&self) -> Expr {
396 Expr::List(
397 self.faults
398 .iter()
399 .map(|fault| {
400 Expr::Map(vec![
401 (
402 Expr::Symbol(Symbol::new("fault")),
403 Expr::Symbol(fault.kind.symbol()),
404 ),
405 (
406 Expr::Symbol(Symbol::new("count")),
407 Expr::String(fault.count.to_string()),
408 ),
409 ])
410 })
411 .collect(),
412 )
413 }
414}
415
416pub fn stream_inspector_model_symbol() -> Symbol {
418 Symbol::qualified("stream/inspector", "v1")
419}
420
421pub fn stream_inspector_route_local_symbol() -> Symbol {
423 Symbol::qualified("stream/route", "local")
424}
425
426pub fn stream_inspector_status_symbols() -> [Symbol; 8] {
428 [
429 StreamInspectorStatus::Live.symbol(),
430 StreamInspectorStatus::Ended.symbol(),
431 StreamInspectorStatus::Cancelled.symbol(),
432 StreamInspectorStatus::BufferOverflow.symbol(),
433 StreamInspectorStatus::Disconnected.symbol(),
434 StreamInspectorStatus::Reconnecting.symbol(),
435 StreamInspectorStatus::RefusedProfile.symbol(),
436 StreamInspectorStatus::Faulted.symbol(),
437 ]
438}
439
440pub fn stream_fault_symbols() -> [Symbol; 9] {
442 [
443 StreamFaultKind::Drop.symbol(),
444 StreamFaultKind::Reorder.symbol(),
445 StreamFaultKind::Duplicate.symbol(),
446 StreamFaultKind::Delay.symbol(),
447 StreamFaultKind::Cancel.symbol(),
448 StreamFaultKind::Timeout.symbol(),
449 StreamFaultKind::Disconnect.symbol(),
450 StreamFaultKind::Reconnect.symbol(),
451 StreamFaultKind::UnsupportedProfile.symbol(),
452 ]
453}
454
455pub fn ensure_fault_supported(kind: StreamFaultKind) -> Result<()> {
457 if stream_fault_symbols().contains(&kind.symbol()) {
458 Ok(())
459 } else {
460 Err(Error::Eval("unsupported stream fault".to_owned()))
461 }
462}
463
464fn optional_u64_expr(value: Option<u64>) -> Expr {
465 value
466 .map(|value| Expr::String(value.to_string()))
467 .unwrap_or(Expr::Nil)
468}