Skip to main content

arcly_stream/
testing.rs

1//! In-memory helpers for exercising the engine in downstream tests.
2//!
3//! None of this is required at runtime; it exists so consumers can verify
4//! their `MediaSource`/`MediaSink`/`Observer` implementations against a real
5//! engine without standing up a socket.
6
7use crate::bus::StreamEvent;
8use crate::observe::Observer;
9use crate::traits::{MediaSink, StorageBackend};
10use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
11use async_trait::async_trait;
12use bytes::Bytes;
13use dashmap::DashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17/// Build a video [`MediaFrame`] at `pts` (ms), keyframe or delta. A terse
18/// fixture for tests and examples.
19pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
20    MediaFrame::new_video(
21        pts,
22        pts,
23        Bytes::from_static(b"vframe"),
24        CodecId::H264,
25        keyframe,
26    )
27}
28
29/// Build an audio [`MediaFrame`] at `pts` (ms).
30pub fn audio_frame(pts: i64) -> MediaFrame {
31    MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
32}
33
34/// An in-memory [`StorageBackend`] for testing packagers and recorders without
35/// touching the filesystem. Cheap to clone (shared object map).
36#[derive(Default, Clone)]
37pub struct InMemoryStorage {
38    objects: Arc<DashMap<String, Bytes>>,
39}
40
41impl InMemoryStorage {
42    /// A new, empty store.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Number of stored objects.
48    pub fn len(&self) -> usize {
49        self.objects.len()
50    }
51
52    /// Whether the store is empty.
53    pub fn is_empty(&self) -> bool {
54        self.objects.is_empty()
55    }
56}
57
58#[async_trait]
59impl StorageBackend for InMemoryStorage {
60    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
61        self.objects.insert(key.to_string(), data);
62        Ok(())
63    }
64    async fn get(&self, key: &str) -> Result<Bytes> {
65        self.objects
66            .get(key)
67            .map(|v| v.clone())
68            .ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
69    }
70    async fn delete(&self, key: &str) -> Result<()> {
71        self.objects.remove(key);
72        Ok(())
73    }
74    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
75        Ok(self
76            .objects
77            .iter()
78            .filter(|e| e.key().starts_with(prefix))
79            .map(|e| e.key().clone())
80            .collect())
81    }
82}
83
84/// A [`MediaSink`] that records every frame it receives. Useful for asserting
85/// what a pipeline produced.
86#[derive(Default, Clone)]
87pub struct CollectingSink {
88    frames: Arc<Mutex<Vec<MediaFrame>>>,
89}
90
91impl CollectingSink {
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Snapshot the frames collected so far.
97    pub fn frames(&self) -> Vec<MediaFrame> {
98        self.frames
99            .lock()
100            .expect("collecting sink mutex poisoned")
101            .clone()
102    }
103
104    pub fn len(&self) -> usize {
105        self.frames
106            .lock()
107            .expect("collecting sink mutex poisoned")
108            .len()
109    }
110
111    pub fn is_empty(&self) -> bool {
112        self.frames
113            .lock()
114            .expect("collecting sink mutex poisoned")
115            .is_empty()
116    }
117}
118
119#[async_trait]
120impl MediaSink for CollectingSink {
121    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
122        self.frames
123            .lock()
124            .expect("collecting sink mutex poisoned")
125            .push(frame);
126        Ok(())
127    }
128}
129
130/// A [`MediaSink`] that discards everything — the `/dev/null` of sinks.
131#[derive(Debug, Default, Clone, Copy)]
132pub struct NullSink;
133
134#[async_trait]
135impl MediaSink for NullSink {
136    async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
137        Ok(())
138    }
139}
140
141/// An [`Observer`] that counts the events it sees. Cheap to clone (shared
142/// counters) so a copy can be handed to the engine and another kept for asserts.
143#[derive(Default, Clone)]
144pub struct CountingObserver {
145    inner: Arc<CountingInner>,
146}
147
148#[derive(Default)]
149struct CountingInner {
150    frames: AtomicU64,
151    publish_started: AtomicU64,
152    publish_ended: AtomicU64,
153    events: AtomicU64,
154    lagged: AtomicU64,
155}
156
157impl CountingObserver {
158    pub fn new() -> Self {
159        Self::default()
160    }
161
162    pub fn frames(&self) -> u64 {
163        self.inner.frames.load(Ordering::Relaxed)
164    }
165    pub fn publish_started(&self) -> u64 {
166        self.inner.publish_started.load(Ordering::Relaxed)
167    }
168    pub fn publish_ended(&self) -> u64 {
169        self.inner.publish_ended.load(Ordering::Relaxed)
170    }
171    pub fn events(&self) -> u64 {
172        self.inner.events.load(Ordering::Relaxed)
173    }
174    /// Total frames reported dropped via [`Observer::on_subscriber_lagged`].
175    pub fn lagged(&self) -> u64 {
176        self.inner.lagged.load(Ordering::Relaxed)
177    }
178}
179
180impl Observer for CountingObserver {
181    fn on_event(&self, _event: &StreamEvent) {
182        self.inner.events.fetch_add(1, Ordering::Relaxed);
183    }
184    fn on_publish_started(&self, _app: &str) {
185        self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
186    }
187    fn on_publish_ended(&self, _app: &str) {
188        self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
189    }
190    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
191        self.inner.frames.fetch_add(1, Ordering::Relaxed);
192    }
193    fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
194        self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
195    }
196}