use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::traits::{MediaSink, StorageBackend};
use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
use async_trait::async_trait;
use bytes::Bytes;
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
MediaFrame::new_video(
pts,
pts,
Bytes::from_static(b"vframe"),
CodecId::H264,
keyframe,
)
}
pub fn audio_frame(pts: i64) -> MediaFrame {
MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
}
#[derive(Default, Clone)]
pub struct InMemoryStorage {
objects: Arc<DashMap<String, Bytes>>,
}
impl InMemoryStorage {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.objects.len()
}
pub fn is_empty(&self) -> bool {
self.objects.is_empty()
}
}
#[async_trait]
impl StorageBackend for InMemoryStorage {
async fn put(&self, key: &str, data: Bytes) -> Result<()> {
self.objects.insert(key.to_string(), data);
Ok(())
}
async fn get(&self, key: &str) -> Result<Bytes> {
self.objects
.get(key)
.map(|v| v.clone())
.ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
}
async fn delete(&self, key: &str) -> Result<()> {
self.objects.remove(key);
Ok(())
}
async fn list(&self, prefix: &str) -> Result<Vec<String>> {
Ok(self
.objects
.iter()
.filter(|e| e.key().starts_with(prefix))
.map(|e| e.key().clone())
.collect())
}
}
#[derive(Default, Clone)]
pub struct CollectingSink {
frames: Arc<Mutex<Vec<MediaFrame>>>,
}
impl CollectingSink {
pub fn new() -> Self {
Self::default()
}
pub fn frames(&self) -> Vec<MediaFrame> {
self.frames
.lock()
.expect("collecting sink mutex poisoned")
.clone()
}
pub fn len(&self) -> usize {
self.frames
.lock()
.expect("collecting sink mutex poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.frames
.lock()
.expect("collecting sink mutex poisoned")
.is_empty()
}
}
#[async_trait]
impl MediaSink for CollectingSink {
async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
self.frames
.lock()
.expect("collecting sink mutex poisoned")
.push(frame);
Ok(())
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NullSink;
#[async_trait]
impl MediaSink for NullSink {
async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
Ok(())
}
}
#[derive(Default, Clone)]
pub struct CountingObserver {
inner: Arc<CountingInner>,
}
#[derive(Default)]
struct CountingInner {
frames: AtomicU64,
publish_started: AtomicU64,
publish_ended: AtomicU64,
events: AtomicU64,
lagged: AtomicU64,
}
impl CountingObserver {
pub fn new() -> Self {
Self::default()
}
pub fn frames(&self) -> u64 {
self.inner.frames.load(Ordering::Relaxed)
}
pub fn publish_started(&self) -> u64 {
self.inner.publish_started.load(Ordering::Relaxed)
}
pub fn publish_ended(&self) -> u64 {
self.inner.publish_ended.load(Ordering::Relaxed)
}
pub fn events(&self) -> u64 {
self.inner.events.load(Ordering::Relaxed)
}
pub fn lagged(&self) -> u64 {
self.inner.lagged.load(Ordering::Relaxed)
}
}
impl Observer for CountingObserver {
fn on_event(&self, _event: &StreamEvent) {
self.inner.events.fetch_add(1, Ordering::Relaxed);
}
fn on_publish_started(&self, _app: &str) {
self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
}
fn on_publish_ended(&self, _app: &str) {
self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
}
fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
self.inner.frames.fetch_add(1, Ordering::Relaxed);
}
fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
}
}