1use crate::bus::StreamEvent;
8use crate::observe::Observer;
9use crate::traits::{MediaSink, StorageBackend};
10use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
11use async_trait::async_trait;
12use bytes::Bytes;
13use dashmap::DashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16
17pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
20 MediaFrame::new_video(
21 pts,
22 pts,
23 Bytes::from_static(b"vframe"),
24 CodecId::H264,
25 keyframe,
26 )
27}
28
29pub fn audio_frame(pts: i64) -> MediaFrame {
31 MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
32}
33
34#[derive(Default, Clone)]
37pub struct InMemoryStorage {
38 objects: Arc<DashMap<String, Bytes>>,
39}
40
41impl InMemoryStorage {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn len(&self) -> usize {
49 self.objects.len()
50 }
51
52 pub fn is_empty(&self) -> bool {
54 self.objects.is_empty()
55 }
56}
57
58#[async_trait]
59impl StorageBackend for InMemoryStorage {
60 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
61 self.objects.insert(key.to_string(), data);
62 Ok(())
63 }
64 async fn get(&self, key: &str) -> Result<Bytes> {
65 self.objects
66 .get(key)
67 .map(|v| v.clone())
68 .ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
69 }
70 async fn delete(&self, key: &str) -> Result<()> {
71 self.objects.remove(key);
72 Ok(())
73 }
74 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
75 Ok(self
76 .objects
77 .iter()
78 .filter(|e| e.key().starts_with(prefix))
79 .map(|e| e.key().clone())
80 .collect())
81 }
82}
83
84#[derive(Default, Clone)]
87pub struct CollectingSink {
88 frames: Arc<Mutex<Vec<MediaFrame>>>,
89}
90
91impl CollectingSink {
92 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn frames(&self) -> Vec<MediaFrame> {
98 self.frames
99 .lock()
100 .expect("collecting sink mutex poisoned")
101 .clone()
102 }
103
104 pub fn len(&self) -> usize {
105 self.frames
106 .lock()
107 .expect("collecting sink mutex poisoned")
108 .len()
109 }
110
111 pub fn is_empty(&self) -> bool {
112 self.frames
113 .lock()
114 .expect("collecting sink mutex poisoned")
115 .is_empty()
116 }
117}
118
119#[async_trait]
120impl MediaSink for CollectingSink {
121 async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
122 self.frames
123 .lock()
124 .expect("collecting sink mutex poisoned")
125 .push(frame);
126 Ok(())
127 }
128}
129
130#[derive(Debug, Default, Clone, Copy)]
132pub struct NullSink;
133
134#[async_trait]
135impl MediaSink for NullSink {
136 async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
137 Ok(())
138 }
139}
140
141#[derive(Default, Clone)]
144pub struct CountingObserver {
145 inner: Arc<CountingInner>,
146}
147
148#[derive(Default)]
149struct CountingInner {
150 frames: AtomicU64,
151 publish_started: AtomicU64,
152 publish_ended: AtomicU64,
153 events: AtomicU64,
154 lagged: AtomicU64,
155}
156
157impl CountingObserver {
158 pub fn new() -> Self {
159 Self::default()
160 }
161
162 pub fn frames(&self) -> u64 {
163 self.inner.frames.load(Ordering::Relaxed)
164 }
165 pub fn publish_started(&self) -> u64 {
166 self.inner.publish_started.load(Ordering::Relaxed)
167 }
168 pub fn publish_ended(&self) -> u64 {
169 self.inner.publish_ended.load(Ordering::Relaxed)
170 }
171 pub fn events(&self) -> u64 {
172 self.inner.events.load(Ordering::Relaxed)
173 }
174 pub fn lagged(&self) -> u64 {
176 self.inner.lagged.load(Ordering::Relaxed)
177 }
178}
179
180impl Observer for CountingObserver {
181 fn on_event(&self, _event: &StreamEvent) {
182 self.inner.events.fetch_add(1, Ordering::Relaxed);
183 }
184 fn on_publish_started(&self, _app: &str) {
185 self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
186 }
187 fn on_publish_ended(&self, _app: &str) {
188 self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
189 }
190 fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
191 self.inner.frames.fetch_add(1, Ordering::Relaxed);
192 }
193 fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
194 self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
195 }
196}