Skip to main content

arcly_stream/
testing.rs

1//! In-memory helpers for exercising the engine in downstream tests.
2//!
3//! None of this is required at runtime; it exists so consumers can verify
4//! their `MediaSource`/`MediaSink`/`Observer` implementations against a real
5//! engine without standing up a socket.
6
7use crate::bus::StreamEvent;
8use crate::observe::Observer;
9use crate::traits::{MediaSink, StorageBackend};
10use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
11use async_trait::async_trait;
12use bytes::Bytes;
13use dashmap::DashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17/// Build a video [`MediaFrame`] at `pts` (ms), keyframe or delta. A terse
18/// fixture for tests and examples.
19pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
20    MediaFrame::new_video(
21        pts,
22        pts,
23        Bytes::from_static(b"vframe"),
24        CodecId::H264,
25        keyframe,
26    )
27}
28
29/// Build an audio [`MediaFrame`] at `pts` (ms).
30pub fn audio_frame(pts: i64) -> MediaFrame {
31    MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
32}
33
34/// An in-memory [`StorageBackend`] for testing packagers and recorders without
35/// touching the filesystem. Cheap to clone (shared object map).
36#[derive(Default, Clone)]
37pub struct InMemoryStorage {
38    objects: Arc<DashMap<String, Bytes>>,
39}
40
41impl InMemoryStorage {
42    /// A new, empty store.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Number of stored objects.
48    pub fn len(&self) -> usize {
49        self.objects.len()
50    }
51
52    /// Whether the store is empty.
53    pub fn is_empty(&self) -> bool {
54        self.objects.is_empty()
55    }
56}
57
58#[async_trait]
59impl StorageBackend for InMemoryStorage {
60    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
61        self.objects.insert(key.to_string(), data);
62        Ok(())
63    }
64    async fn get(&self, key: &str) -> Result<Bytes> {
65        self.objects
66            .get(key)
67            .map(|v| v.clone())
68            .ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
69    }
70    async fn delete(&self, key: &str) -> Result<()> {
71        self.objects.remove(key);
72        Ok(())
73    }
74    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
75        Ok(self
76            .objects
77            .iter()
78            .filter(|e| e.key().starts_with(prefix))
79            .map(|e| e.key().clone())
80            .collect())
81    }
82}
83
84/// A [`MediaSink`] that records every frame it receives. Useful for asserting
85/// what a pipeline produced.
86#[derive(Default, Clone)]
87pub struct CollectingSink {
88    frames: Arc<Mutex<Vec<MediaFrame>>>,
89}
90
91impl CollectingSink {
92    /// A new, empty collecting sink.
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    /// Snapshot the frames collected so far.
98    pub fn frames(&self) -> Vec<MediaFrame> {
99        self.frames
100            .lock()
101            .expect("collecting sink mutex poisoned")
102            .clone()
103    }
104
105    /// Number of frames collected so far.
106    pub fn len(&self) -> usize {
107        self.frames
108            .lock()
109            .expect("collecting sink mutex poisoned")
110            .len()
111    }
112
113    /// Whether no frames have been collected yet.
114    pub fn is_empty(&self) -> bool {
115        self.frames
116            .lock()
117            .expect("collecting sink mutex poisoned")
118            .is_empty()
119    }
120}
121
122#[async_trait]
123impl MediaSink for CollectingSink {
124    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
125        self.frames
126            .lock()
127            .expect("collecting sink mutex poisoned")
128            .push(frame);
129        Ok(())
130    }
131}
132
133/// A [`MediaSink`] that discards everything — the `/dev/null` of sinks.
134#[derive(Debug, Default, Clone, Copy)]
135pub struct NullSink;
136
137#[async_trait]
138impl MediaSink for NullSink {
139    async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
140        Ok(())
141    }
142}
143
144/// An [`Observer`] that counts the events it sees. Cheap to clone (shared
145/// counters) so a copy can be handed to the engine and another kept for asserts.
146#[derive(Default, Clone)]
147pub struct CountingObserver {
148    inner: Arc<CountingInner>,
149}
150
151#[derive(Default)]
152struct CountingInner {
153    frames: AtomicU64,
154    publish_started: AtomicU64,
155    publish_ended: AtomicU64,
156    events: AtomicU64,
157    lagged: AtomicU64,
158}
159
160impl CountingObserver {
161    /// A new observer with all counters at zero.
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Total frames observed via [`Observer::on_frame`].
167    pub fn frames(&self) -> u64 {
168        self.inner.frames.load(Ordering::Relaxed)
169    }
170    /// Count of [`Observer::on_publish_started`] calls.
171    pub fn publish_started(&self) -> u64 {
172        self.inner.publish_started.load(Ordering::Relaxed)
173    }
174    /// Count of [`Observer::on_publish_ended`] calls.
175    pub fn publish_ended(&self) -> u64 {
176        self.inner.publish_ended.load(Ordering::Relaxed)
177    }
178    /// Count of [`Observer::on_event`] calls.
179    pub fn events(&self) -> u64 {
180        self.inner.events.load(Ordering::Relaxed)
181    }
182    /// Total frames reported dropped via [`Observer::on_subscriber_lagged`].
183    pub fn lagged(&self) -> u64 {
184        self.inner.lagged.load(Ordering::Relaxed)
185    }
186}
187
188impl Observer for CountingObserver {
189    fn on_event(&self, _event: &StreamEvent) {
190        self.inner.events.fetch_add(1, Ordering::Relaxed);
191    }
192    fn on_publish_started(&self, _app: &str) {
193        self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
194    }
195    fn on_publish_ended(&self, _app: &str) {
196        self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
197    }
198    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
199        self.inner.frames.fetch_add(1, Ordering::Relaxed);
200    }
201    fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
202        self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
203    }
204}