#![allow(dead_code)]
pub mod analytics;
pub mod auth;
pub mod cluster;
pub mod dash;
pub mod dvr;
pub mod hls;
pub mod ingest;
pub mod segment;
pub mod thumbnail;
use crate::error::{NetError, NetResult};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
pub use analytics::{Analytics, StreamMetrics, ViewerMetrics};
pub use auth::{AuthHandler, AuthResult, TokenValidator};
pub use cluster::{ClusterConfig, ClusterNode, NodeState};
pub use dash::server::DashServer;
pub use dvr::{DvrBuffer, DvrConfig};
pub use hls::server::HlsServer;
pub use ingest::{IngestConfig, IngestServer, IngestSource};
pub use segment::{MediaSegment, SegmentConfig, SegmentGenerator};
pub use thumbnail::ThumbnailGenerator;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveServerConfig {
pub bind_addr: SocketAddr,
pub segment_duration: Duration,
pub dvr_window: Duration,
pub max_streams: usize,
pub enable_hls: bool,
pub enable_dash: bool,
pub enable_ll_hls: bool,
pub enable_ll_dash: bool,
pub hls_segment_count: usize,
pub dash_availability_duration: Duration,
pub enable_dvr: bool,
pub enable_auth: bool,
pub enable_analytics: bool,
pub enable_thumbnails: bool,
pub thumbnail_interval: Duration,
pub cors_origins: Vec<String>,
pub enable_cluster: bool,
pub cluster_config: Option<ClusterConfig>,
}
impl Default for LiveServerConfig {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:8080".parse().expect("valid address"),
segment_duration: Duration::from_secs(2),
dvr_window: Duration::from_secs(3600),
max_streams: 1000,
enable_hls: true,
enable_dash: true,
enable_ll_hls: true,
enable_ll_dash: true,
hls_segment_count: 6,
dash_availability_duration: Duration::from_secs(60),
enable_dvr: true,
enable_auth: false,
enable_analytics: true,
enable_thumbnails: true,
thumbnail_interval: Duration::from_secs(10),
cors_origins: vec!["*".to_string()],
enable_cluster: false,
cluster_config: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QualityVariant {
pub id: String,
pub bandwidth: u64,
pub width: u32,
pub height: u32,
pub framerate: f64,
pub video_codec: String,
pub audio_codec: String,
pub video_bitrate: u64,
pub audio_bitrate: u64,
}
impl QualityVariant {
#[must_use]
pub fn new(
id: impl Into<String>,
width: u32,
height: u32,
video_bitrate: u64,
audio_bitrate: u64,
) -> Self {
let bandwidth = video_bitrate + audio_bitrate;
Self {
id: id.into(),
bandwidth,
width,
height,
framerate: 30.0,
video_codec: "avc1.4d401f".to_string(),
audio_codec: "mp4a.40.2".to_string(),
video_bitrate,
audio_bitrate,
}
}
#[must_use]
pub fn standard_variants() -> Vec<Self> {
vec![
Self::new("1080p", 1920, 1080, 4_500_000, 128_000),
Self::new("720p", 1280, 720, 2_500_000, 128_000),
Self::new("480p", 854, 480, 1_400_000, 96_000),
Self::new("360p", 640, 360, 800_000, 96_000),
Self::new("240p", 426, 240, 400_000, 64_000),
]
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MediaType {
Video,
Audio,
Metadata,
}
#[derive(Debug, Clone)]
pub struct MediaPacket {
pub media_type: MediaType,
pub timestamp: u64,
pub pts: u64,
pub dts: u64,
pub duration: u64,
pub keyframe: bool,
pub data: Bytes,
pub variant_id: Option<String>,
}
impl MediaPacket {
#[must_use]
pub fn new(media_type: MediaType, timestamp: u64, data: Bytes) -> Self {
Self {
media_type,
timestamp,
pts: timestamp,
dts: timestamp,
duration: 0,
keyframe: false,
data,
variant_id: None,
}
}
#[must_use]
pub const fn with_keyframe(mut self, keyframe: bool) -> Self {
self.keyframe = keyframe;
self
}
#[must_use]
pub fn with_variant(mut self, variant_id: impl Into<String>) -> Self {
self.variant_id = Some(variant_id.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StreamState {
Initializing,
Active,
Paused,
Stopping,
Stopped,
Error,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamInfo {
pub id: Uuid,
pub stream_key: String,
pub app_name: String,
pub state: StreamState,
pub variants: Vec<QualityVariant>,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub viewer_count: u64,
pub peak_viewer_count: u64,
pub bytes_ingested: u64,
pub bytes_served: u64,
pub metadata: HashMap<String, String>,
}
impl StreamInfo {
#[must_use]
pub fn new(stream_key: impl Into<String>, app_name: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
stream_key: stream_key.into(),
app_name: app_name.into(),
state: StreamState::Initializing,
variants: QualityVariant::standard_variants(),
start_time: Utc::now(),
end_time: None,
viewer_count: 0,
peak_viewer_count: 0,
bytes_ingested: 0,
bytes_served: 0,
metadata: HashMap::new(),
}
}
#[must_use]
pub fn key_path(&self) -> String {
format!("{}/{}", self.app_name, self.stream_key)
}
}
pub struct LiveStream {
info: RwLock<StreamInfo>,
media_tx: broadcast::Sender<MediaPacket>,
dvr_buffer: Option<Arc<RwLock<DvrBuffer>>>,
segment_generator: Arc<segment::SegmentGenerator>,
analytics: Option<Arc<Analytics>>,
thumbnail_gen: Option<Arc<ThumbnailGenerator>>,
}
impl LiveStream {
pub fn new(
stream_key: impl Into<String>,
app_name: impl Into<String>,
config: &LiveServerConfig,
) -> Self {
let info = StreamInfo::new(stream_key, app_name);
let (media_tx, _) = broadcast::channel(1000);
let dvr_buffer = if config.enable_dvr {
Some(Arc::new(RwLock::new(DvrBuffer::new(DvrConfig {
window_duration: config.dvr_window,
segment_duration: config.segment_duration,
}))))
} else {
None
};
let segment_config = SegmentConfig {
duration: config.segment_duration,
keyframe_interval: 60,
};
let segment_generator = Arc::new(segment::SegmentGenerator::new(segment_config));
let analytics = if config.enable_analytics {
Some(Arc::new(Analytics::new(info.id)))
} else {
None
};
let thumbnail_gen = if config.enable_thumbnails {
Some(Arc::new(ThumbnailGenerator::new(config.thumbnail_interval)))
} else {
None
};
Self {
info: RwLock::new(info),
media_tx,
dvr_buffer,
segment_generator,
analytics,
thumbnail_gen,
}
}
#[must_use]
pub fn info(&self) -> StreamInfo {
self.info.read().clone()
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<MediaPacket> {
self.media_tx.subscribe()
}
pub fn publish(&self, packet: MediaPacket) -> NetResult<()> {
if let Some(analytics) = &self.analytics {
analytics.record_packet(&packet);
}
if let Some(dvr) = &self.dvr_buffer {
dvr.write().add_packet(packet.clone());
}
self.segment_generator.add_packet(&packet)?;
if let Some(thumb_gen) = &self.thumbnail_gen {
if packet.media_type == MediaType::Video && packet.keyframe {
thumb_gen.generate_from_packet(&packet);
}
}
let _ = self.media_tx.send(packet);
let mut info = self.info.write();
info.bytes_ingested += 1;
Ok(())
}
pub fn set_state(&self, state: StreamState) {
let mut info = self.info.write();
info.state = state;
if state == StreamState::Stopped {
info.end_time = Some(Utc::now());
}
}
pub fn add_viewer(&self) {
let mut info = self.info.write();
info.viewer_count += 1;
if info.viewer_count > info.peak_viewer_count {
info.peak_viewer_count = info.viewer_count;
}
}
pub fn remove_viewer(&self) {
let mut info = self.info.write();
if info.viewer_count > 0 {
info.viewer_count -= 1;
}
}
#[must_use]
pub fn dvr_buffer(&self) -> Option<Arc<RwLock<DvrBuffer>>> {
self.dvr_buffer.clone()
}
#[must_use]
pub fn analytics(&self) -> Option<Arc<Analytics>> {
self.analytics.clone()
}
}
pub struct StreamRegistry {
streams: RwLock<HashMap<String, Arc<LiveStream>>>,
config: LiveServerConfig,
}
impl StreamRegistry {
#[must_use]
pub fn new(config: LiveServerConfig) -> Self {
Self {
streams: RwLock::new(HashMap::new()),
config,
}
}
pub fn register_stream(
&self,
stream_key: impl Into<String>,
app_name: impl Into<String>,
) -> NetResult<Arc<LiveStream>> {
let stream_key = stream_key.into();
let app_name = app_name.into();
let key_path = format!("{app_name}/{stream_key}");
let mut streams = self.streams.write();
if streams.len() >= self.config.max_streams {
return Err(NetError::invalid_state("Maximum stream limit reached"));
}
if streams.contains_key(&key_path) {
return Err(NetError::invalid_state(format!(
"Stream already exists: {key_path}"
)));
}
let stream = Arc::new(LiveStream::new(&stream_key, &app_name, &self.config));
streams.insert(key_path, stream.clone());
Ok(stream)
}
pub fn unregister_stream(&self, app_name: &str, stream_key: &str) {
let key_path = format!("{app_name}/{stream_key}");
let mut streams = self.streams.write();
if let Some(stream) = streams.remove(&key_path) {
stream.set_state(StreamState::Stopped);
}
}
#[must_use]
pub fn get_stream(&self, app_name: &str, stream_key: &str) -> Option<Arc<LiveStream>> {
let key_path = format!("{app_name}/{stream_key}");
let streams = self.streams.read();
streams.get(&key_path).cloned()
}
#[must_use]
pub fn list_streams(&self) -> Vec<StreamInfo> {
let streams = self.streams.read();
streams.values().map(|s| s.info()).collect()
}
#[must_use]
pub fn stream_count(&self) -> usize {
let streams = self.streams.read();
streams.len()
}
}
pub struct LiveServer {
config: LiveServerConfig,
registry: Arc<StreamRegistry>,
ingest_server: Option<Arc<IngestServer>>,
hls_server: Option<Arc<HlsServer>>,
dash_server: Option<Arc<DashServer>>,
auth_handler: Option<Arc<dyn AuthHandler>>,
cluster_node: Option<Arc<ClusterNode>>,
shutdown_tx: mpsc::Sender<()>,
shutdown_rx: mpsc::Receiver<()>,
}
impl LiveServer {
#[must_use]
pub fn new(config: LiveServerConfig) -> Self {
let registry = Arc::new(StreamRegistry::new(config.clone()));
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
Self {
config,
registry,
ingest_server: None,
hls_server: None,
dash_server: None,
auth_handler: None,
cluster_node: None,
shutdown_tx,
shutdown_rx,
}
}
pub fn with_auth_handler(mut self, handler: Arc<dyn AuthHandler>) -> Self {
self.auth_handler = Some(handler);
self
}
pub async fn start(mut self) -> NetResult<()> {
if self.config.enable_cluster {
if let Some(cluster_config) = &self.config.cluster_config {
let node = Arc::new(ClusterNode::new(cluster_config.clone()).await?);
node.start().await?;
self.cluster_node = Some(node);
}
}
if self.config.enable_hls {
let hls_server = Arc::new(HlsServer::new(
self.config.clone(),
Arc::clone(&self.registry),
));
self.hls_server = Some(hls_server);
}
if self.config.enable_dash {
let dash_server = Arc::new(DashServer::new(
self.config.clone(),
Arc::clone(&self.registry),
));
self.dash_server = Some(dash_server);
}
self.start_http_server().await?;
Ok(())
}
async fn start_http_server(&mut self) -> NetResult<()> {
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
let listener = TcpListener::bind(self.config.bind_addr)
.await
.map_err(|e| NetError::connection(format!("Failed to bind: {e}")))?;
let registry = Arc::clone(&self.registry);
let hls_server = self.hls_server.clone();
let dash_server = self.dash_server.clone();
let config = self.config.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let io = TokioIo::new(stream);
let registry = Arc::clone(®istry);
let hls = hls_server.clone();
let dash = dash_server.clone();
let cfg = config.clone();
tokio::spawn(async move {
let service = service_fn(move |req| {
handle_request(
req,
Arc::clone(®istry),
hls.clone(),
dash.clone(),
cfg.clone(),
)
});
if let Err(e) =
http1::Builder::new().serve_connection(io, service).await
{
eprintln!("Error serving connection: {e}");
}
});
}
Err(e) => {
eprintln!("Accept error: {e}");
}
}
}
});
Ok(())
}
#[must_use]
pub fn registry(&self) -> &Arc<StreamRegistry> {
&self.registry
}
pub async fn shutdown(&self) -> NetResult<()> {
let _ = self.shutdown_tx.send(()).await;
Ok(())
}
}
async fn handle_request(
req: hyper::Request<hyper::body::Incoming>,
registry: Arc<StreamRegistry>,
hls_server: Option<Arc<HlsServer>>,
dash_server: Option<Arc<DashServer>>,
config: LiveServerConfig,
) -> Result<hyper::Response<http_body_util::Full<Bytes>>, hyper::Error> {
use http_body_util::Full;
let path = req.uri().path();
let mut response = hyper::Response::builder();
for origin in &config.cors_origins {
response = response.header("Access-Control-Allow-Origin", origin);
}
response = response.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
response = response.header("Access-Control-Allow-Headers", "Content-Type");
if req.method() == hyper::Method::OPTIONS {
return Ok(response
.status(200)
.body(Full::new(Bytes::new()))
.expect("invariant: HTTP response builder is valid"));
}
if path.starts_with("/hls/") {
if let Some(hls) = hls_server {
return hls.handle_request(req, response).await;
}
}
if path.starts_with("/dash/") {
if let Some(dash) = dash_server {
return dash.handle_request(req, response).await;
}
}
if path.starts_with("/api/") {
return handle_api_request(req, registry, response).await;
}
Ok(response
.status(404)
.body(Full::new(Bytes::from("Not Found")))
.expect("invariant: HTTP response builder is valid"))
}
async fn handle_api_request(
req: hyper::Request<hyper::body::Incoming>,
registry: Arc<StreamRegistry>,
response: hyper::http::response::Builder,
) -> Result<hyper::Response<http_body_util::Full<Bytes>>, hyper::Error> {
use http_body_util::Full;
let path = req.uri().path();
if path == "/api/streams" && req.method() == hyper::Method::GET {
let streams = registry.list_streams();
let json = serde_json::to_string(&streams)
.expect("invariant: streams serialization always succeeds");
return Ok(response
.status(200)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(json)))
.expect("invariant: HTTP response builder is valid"));
}
Ok(response
.status(404)
.body(Full::new(Bytes::from("Not Found")))
.expect("invariant: HTTP response builder is valid"))
}