1use crate::bus::StreamEvent;
8use crate::observe::Observer;
9use crate::traits::{MediaSink, StorageBackend};
10use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
11use async_trait::async_trait;
12use bytes::Bytes;
13use dashmap::DashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
20 MediaFrame::new_video(
21 pts,
22 pts,
23 Bytes::from_static(b"vframe"),
24 CodecId::H264,
25 keyframe,
26 )
27}
28
29pub fn audio_frame(pts: i64) -> MediaFrame {
31 MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
32}
33
34#[derive(Default, Clone)]
37pub struct InMemoryStorage {
38 objects: Arc<DashMap<String, Bytes>>,
39}
40
41impl InMemoryStorage {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn len(&self) -> usize {
49 self.objects.len()
50 }
51
52 pub fn is_empty(&self) -> bool {
54 self.objects.is_empty()
55 }
56}
57
58#[async_trait]
59impl StorageBackend for InMemoryStorage {
60 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
61 self.objects.insert(key.to_string(), data);
62 Ok(())
63 }
64 async fn get(&self, key: &str) -> Result<Bytes> {
65 self.objects
66 .get(key)
67 .map(|v| v.clone())
68 .ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
69 }
70 async fn delete(&self, key: &str) -> Result<()> {
71 self.objects.remove(key);
72 Ok(())
73 }
74 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
75 Ok(self
76 .objects
77 .iter()
78 .filter(|e| e.key().starts_with(prefix))
79 .map(|e| e.key().clone())
80 .collect())
81 }
82}
83
84#[derive(Default, Clone)]
87pub struct CollectingSink {
88 frames: Arc<Mutex<Vec<MediaFrame>>>,
89}
90
91impl CollectingSink {
92 pub fn new() -> Self {
94 Self::default()
95 }
96
97 pub fn frames(&self) -> Vec<MediaFrame> {
99 self.frames
100 .lock()
101 .expect("collecting sink mutex poisoned")
102 .clone()
103 }
104
105 pub fn len(&self) -> usize {
107 self.frames
108 .lock()
109 .expect("collecting sink mutex poisoned")
110 .len()
111 }
112
113 pub fn is_empty(&self) -> bool {
115 self.frames
116 .lock()
117 .expect("collecting sink mutex poisoned")
118 .is_empty()
119 }
120}
121
122#[async_trait]
123impl MediaSink for CollectingSink {
124 async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
125 self.frames
126 .lock()
127 .expect("collecting sink mutex poisoned")
128 .push(frame);
129 Ok(())
130 }
131}
132
133#[derive(Debug, Default, Clone, Copy)]
135pub struct NullSink;
136
137#[async_trait]
138impl MediaSink for NullSink {
139 async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
140 Ok(())
141 }
142}
143
144#[derive(Default, Clone)]
147pub struct CountingObserver {
148 inner: Arc<CountingInner>,
149}
150
151#[derive(Default)]
152struct CountingInner {
153 frames: AtomicU64,
154 publish_started: AtomicU64,
155 publish_ended: AtomicU64,
156 events: AtomicU64,
157 lagged: AtomicU64,
158}
159
160impl CountingObserver {
161 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn frames(&self) -> u64 {
168 self.inner.frames.load(Ordering::Relaxed)
169 }
170 pub fn publish_started(&self) -> u64 {
172 self.inner.publish_started.load(Ordering::Relaxed)
173 }
174 pub fn publish_ended(&self) -> u64 {
176 self.inner.publish_ended.load(Ordering::Relaxed)
177 }
178 pub fn events(&self) -> u64 {
180 self.inner.events.load(Ordering::Relaxed)
181 }
182 pub fn lagged(&self) -> u64 {
184 self.inner.lagged.load(Ordering::Relaxed)
185 }
186}
187
188impl Observer for CountingObserver {
189 fn on_event(&self, _event: &StreamEvent) {
190 self.inner.events.fetch_add(1, Ordering::Relaxed);
191 }
192 fn on_publish_started(&self, _app: &str) {
193 self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
194 }
195 fn on_publish_ended(&self, _app: &str) {
196 self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
197 }
198 fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
199 self.inner.frames.fetch_add(1, Ordering::Relaxed);
200 }
201 fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
202 self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
203 }
204}