use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use bairelay_rtsp::provider::{StreamError, StreamProvider, SubscriptionHandle};
use bairelay_rtsp::url::StreamKind;
use crate::camera::CameraHandle;
use crate::wake_lock::WakeLockGuard;
const SDP_READY_TIMEOUT: Duration = Duration::from_secs(15);
pub(crate) fn classify_presence(
p: crate::audio_presence::AudioPresence,
) -> (bool, std::time::Duration) {
use crate::audio_presence::AudioPresence;
match p {
AudioPresence::Present { .. } => (true, std::time::Duration::ZERO),
AudioPresence::Absent => (false, std::time::Duration::ZERO),
AudioPresence::Unknown => (false, std::time::Duration::from_secs(2)),
}
}
pub struct CameraProvider {
cameras: Arc<HashMap<String, Arc<CameraHandle>>>,
}
impl CameraProvider {
pub fn new(cameras: Arc<HashMap<String, Arc<CameraHandle>>>) -> Self {
Self { cameras }
}
}
#[async_trait::async_trait]
impl StreamProvider for CameraProvider {
async fn subscribe(
&self,
camera: &str,
kind: StreamKind,
authenticated_user: Option<&str>,
) -> Result<SubscriptionHandle, StreamError> {
let handle = self.cameras.get(camera).ok_or(StreamError::UnknownCamera)?;
if let Some(user) = authenticated_user {
let permitted = &handle.config().permitted_users;
if !permitted.is_empty() && !permitted.iter().any(|u| u == user) {
return Err(StreamError::AccessDenied);
}
}
let guard: WakeLockGuard = handle.wake_lock().acquire();
let source = handle.stream_source(kind).await?;
let presence = *handle
.audio_presence()
.read()
.expect("presence lock poisoned");
let (wait_audio, bonus_window) = classify_presence(presence);
let sdp_params = if wait_audio {
source
.await_sdp_both_ready(SDP_READY_TIMEOUT)
.await
.map_err(|e| StreamError::Unavailable(e.to_string()))?
} else {
source
.await_sdp_ready(SDP_READY_TIMEOUT)
.await
.map_err(|e| StreamError::Unavailable(e.to_string()))?;
if !bonus_window.is_zero() {
let _ = source.await_audio(bonus_window).await;
}
source.sdp_params()
};
let frames = source.subscribe();
let last_frame = source.last_frame();
Ok(SubscriptionHandle {
frames,
sdp_params,
last_frame,
guard: Box::new(guard),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn camera_provider_implements_stream_provider() {
fn assert_impl<T: StreamProvider>() {}
assert_impl::<CameraProvider>();
}
#[test]
fn classify_present_waits_for_audio_no_bonus() {
use bairelay_rtsp::codec::AudioCodec;
let (wait, bonus) = classify_presence(crate::audio_presence::AudioPresence::Present {
codec: AudioCodec::Aac,
});
assert!(wait);
assert_eq!(bonus, std::time::Duration::ZERO);
}
#[test]
fn classify_absent_no_wait_no_bonus() {
let (wait, bonus) = classify_presence(crate::audio_presence::AudioPresence::Absent);
assert!(!wait);
assert_eq!(bonus, std::time::Duration::ZERO);
}
#[test]
fn classify_unknown_bonus_window_two_seconds() {
let (wait, bonus) = classify_presence(crate::audio_presence::AudioPresence::Unknown);
assert!(!wait);
assert_eq!(bonus, std::time::Duration::from_secs(2));
}
#[tokio::test]
async fn camera_provider_subscribe_unknown_camera_rejects() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let handle = Arc::new(CameraHandle::new(
minimal_camera_config("registered"),
cancel,
None,
));
let mut map = HashMap::new();
map.insert("registered".to_string(), handle);
let provider = CameraProvider::new(Arc::new(map));
let err = provider
.subscribe("nope", StreamKind::Main, None)
.await
.err()
.expect("unknown camera");
assert!(matches!(err, StreamError::UnknownCamera));
}
#[tokio::test]
async fn camera_provider_subscribe_rejects_user_not_in_acl() {
use crate::config::test_helpers::minimal_camera_config;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let mut cfg = minimal_camera_config("locked");
cfg.permitted_users = vec!["alice".to_string()];
let handle = Arc::new(CameraHandle::new(cfg, cancel, None));
let mut map = HashMap::new();
map.insert("locked".to_string(), handle);
let provider = CameraProvider::new(Arc::new(map));
let err = provider
.subscribe("locked", StreamKind::Main, Some("bob"))
.await
.err()
.expect("acl denies bob");
assert!(matches!(err, StreamError::AccessDenied));
}
}