1use serde::{Deserialize, Serialize};
32use std::path::PathBuf;
33#[cfg(not(target_arch = "wasm32"))]
34use std::sync::Arc;
35#[cfg(not(target_arch = "wasm32"))]
36use std::time::Duration;
37use thiserror::Error;
38#[cfg(not(target_arch = "wasm32"))]
39use tokio::sync::{mpsc, RwLock};
40
41pub mod ad_insertion;
43#[cfg(not(target_arch = "wasm32"))]
44pub mod api;
45#[cfg(not(target_arch = "wasm32"))]
46pub mod asrun;
47#[cfg(not(target_arch = "wasm32"))]
48pub mod automation;
49pub mod branding;
50#[cfg(not(target_arch = "wasm32"))]
51pub mod bxf;
52pub mod catchup;
53pub mod cg;
54#[cfg(not(target_arch = "wasm32"))]
55pub mod channel;
56pub mod channel_config;
58pub mod clip_store;
59pub mod compliance_ingest;
60#[cfg(not(target_arch = "wasm32"))]
61pub mod content;
62#[cfg(not(target_arch = "wasm32"))]
63pub mod device;
64pub mod event_log;
65#[cfg(not(target_arch = "wasm32"))]
66pub mod failover;
67pub mod frame_buffer;
69pub mod frame_trim;
71pub mod gap_filler;
73pub mod graphics;
74pub mod highlight_automation;
75pub mod ingest;
76pub mod media_router_playout;
78pub mod monitoring;
79#[cfg(not(target_arch = "wasm32"))]
80pub mod output;
81pub mod output_router;
82#[cfg(not(target_arch = "wasm32"))]
83pub mod playback;
84pub mod playlist;
85pub mod playlist_ingest;
87pub mod playout_log;
89pub mod playout_schedule;
91pub mod predecode;
93pub mod preflight;
94pub mod ptp_clock;
96pub mod rundown;
97pub mod schedule_block;
99pub mod schedule_slot;
101pub mod scheduler;
102pub mod secondary_events;
103pub mod signal_chain;
105pub mod simulcast;
106pub mod subtitle_inserter;
107pub mod tally_system;
108pub mod timecode_overlay;
110pub mod transitions;
111
112pub type Result<T> = std::result::Result<T, PlayoutError>;
114
115#[derive(Error, Debug)]
117pub enum PlayoutError {
118 #[error("Configuration error: {0}")]
119 Config(String),
120
121 #[error("Scheduler error: {0}")]
122 Scheduler(String),
123
124 #[error("Playlist error: {0}")]
125 Playlist(String),
126
127 #[error("Playback error: {0}")]
128 Playback(String),
129
130 #[error("Output error: {0}")]
131 Output(String),
132
133 #[error("Graphics error: {0}")]
134 Graphics(String),
135
136 #[error("Monitoring error: {0}")]
137 Monitoring(String),
138
139 #[error("IO error: {0}")]
140 Io(#[from] std::io::Error),
141
142 #[error("Synchronization error: {0}")]
143 Sync(String),
144
145 #[error("Timing error: {0}")]
146 Timing(String),
147
148 #[error("Resource not found: {0}")]
149 NotFound(String),
150
151 #[error("Emergency fallback activated: {0}")]
152 EmergencyFallback(String),
153
154 #[error("Checksum error: {0}")]
155 Checksum(String),
156
157 #[error("PTP error: {0}")]
158 Ptp(String),
159
160 #[error("Integrity error: {0}")]
161 Integrity(String),
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub enum VideoFormat {
167 HD1080p2398,
169 HD1080p24,
171 HD1080p25,
173 HD1080p2997,
175 HD1080p30,
177 HD1080p50,
179 HD1080p5994,
181 HD1080p60,
183 HD1080i50,
185 HD1080i5994,
187 UHD2160p25,
189 UHD2160p2997,
191 UHD2160p50,
193 UHD2160p5994,
195}
196
197impl VideoFormat {
198 pub fn fps(&self) -> f64 {
200 match self {
201 Self::HD1080p2398 => 23.976,
202 Self::HD1080p24 => 24.0,
203 Self::HD1080p25 | Self::UHD2160p25 => 25.0,
204 Self::HD1080p2997 | Self::UHD2160p2997 => 29.97,
205 Self::HD1080p30 => 30.0,
206 Self::HD1080p50 | Self::HD1080i50 | Self::UHD2160p50 => 50.0,
207 Self::HD1080p5994 | Self::HD1080i5994 | Self::UHD2160p5994 => 59.94,
208 Self::HD1080p60 => 60.0,
209 }
210 }
211
212 pub fn width(&self) -> u32 {
214 match self {
215 Self::HD1080p2398
216 | Self::HD1080p24
217 | Self::HD1080p25
218 | Self::HD1080p2997
219 | Self::HD1080p30
220 | Self::HD1080p50
221 | Self::HD1080p5994
222 | Self::HD1080p60
223 | Self::HD1080i50
224 | Self::HD1080i5994 => 1920,
225 Self::UHD2160p25 | Self::UHD2160p2997 | Self::UHD2160p50 | Self::UHD2160p5994 => 3840,
226 }
227 }
228
229 pub fn height(&self) -> u32 {
231 match self {
232 Self::HD1080p2398
233 | Self::HD1080p24
234 | Self::HD1080p25
235 | Self::HD1080p2997
236 | Self::HD1080p30
237 | Self::HD1080p50
238 | Self::HD1080p5994
239 | Self::HD1080p60
240 | Self::HD1080i50
241 | Self::HD1080i5994 => 1080,
242 Self::UHD2160p25 | Self::UHD2160p2997 | Self::UHD2160p50 | Self::UHD2160p5994 => 2160,
243 }
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
249pub struct AudioFormat {
250 pub sample_rate: u32,
251 pub channels: u16,
252 pub bit_depth: u16,
253}
254
255impl Default for AudioFormat {
256 fn default() -> Self {
257 Self {
258 sample_rate: 48000,
259 channels: 2,
260 bit_depth: 24,
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct PlayoutConfig {
268 pub video_format: VideoFormat,
270
271 pub audio_format: AudioFormat,
273
274 pub genlock_enabled: bool,
276
277 pub clock_source: String,
279
280 pub buffer_size: usize,
282
283 pub fallback_content: PathBuf,
285
286 pub max_latency_ms: u64,
288
289 pub detect_frame_drops: bool,
291
292 pub playlist_dir: PathBuf,
294
295 pub content_root: PathBuf,
297
298 pub monitoring_enabled: bool,
300
301 pub monitoring_port: u16,
303}
304
305impl Default for PlayoutConfig {
306 fn default() -> Self {
307 Self {
308 video_format: VideoFormat::HD1080p25,
309 audio_format: AudioFormat::default(),
310 genlock_enabled: false,
311 clock_source: "internal".to_string(),
312 buffer_size: 10,
313 fallback_content: PathBuf::from("/var/oximedia/fallback.mxf"),
314 max_latency_ms: 100,
315 detect_frame_drops: true,
316 playlist_dir: PathBuf::from("/var/oximedia/playlists"),
317 content_root: PathBuf::from("/var/oximedia/content"),
318 monitoring_enabled: true,
319 monitoring_port: 8080,
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
326pub enum PlayoutState {
327 Stopped,
329 Starting,
331 Running,
333 Paused,
335 Fallback,
337 Stopping,
339}
340
341#[cfg(not(target_arch = "wasm32"))]
343struct ServerState {
344 state: PlayoutState,
345 scheduler: Option<Arc<scheduler::Scheduler>>,
346 playback: Option<Arc<playback::PlaybackEngine>>,
347 outputs: Vec<Arc<output::Output>>,
348 graphics: Option<Arc<graphics::GraphicsEngine>>,
349 monitor: Option<Arc<monitoring::Monitor>>,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct ShutdownConfig {
355 pub drain_timeout_ms: u64,
357 pub flush_buffers: bool,
359 pub wait_for_current_item: bool,
361}
362
363impl Default for ShutdownConfig {
364 fn default() -> Self {
365 Self {
366 drain_timeout_ms: 5000,
367 flush_buffers: true,
368 wait_for_current_item: false,
369 }
370 }
371}
372
373#[cfg(not(target_arch = "wasm32"))]
375pub struct PlayoutServer {
376 config: Arc<RwLock<PlayoutConfig>>,
377 state: Arc<RwLock<ServerState>>,
378 shutdown_config: ShutdownConfig,
379 #[allow(dead_code)]
380 control_tx: mpsc::Sender<ControlCommand>,
381 #[allow(dead_code)]
382 control_rx: Arc<RwLock<mpsc::Receiver<ControlCommand>>>,
383}
384
385#[cfg(not(target_arch = "wasm32"))]
387#[derive(Debug, Clone)]
388#[allow(dead_code)]
389enum ControlCommand {
390 Start,
391 Stop,
392 Pause,
393 Resume,
394 LoadPlaylist(PathBuf),
395 EmergencyFallback,
396 Shutdown,
397 Reconfigure(PlayoutConfig),
399}
400
401#[cfg(not(target_arch = "wasm32"))]
402impl PlayoutServer {
403 pub async fn new(config: PlayoutConfig) -> Result<Self> {
405 let (control_tx, control_rx) = mpsc::channel(100);
406
407 let state = ServerState {
408 state: PlayoutState::Stopped,
409 scheduler: None,
410 playback: None,
411 outputs: Vec::new(),
412 graphics: None,
413 monitor: None,
414 };
415
416 Ok(Self {
417 config: Arc::new(RwLock::new(config)),
418 state: Arc::new(RwLock::new(state)),
419 shutdown_config: ShutdownConfig::default(),
420 control_tx,
421 control_rx: Arc::new(RwLock::new(control_rx)),
422 })
423 }
424
425 pub async fn with_shutdown_config(
427 config: PlayoutConfig,
428 shutdown_config: ShutdownConfig,
429 ) -> Result<Self> {
430 let (control_tx, control_rx) = mpsc::channel(100);
431
432 let state = ServerState {
433 state: PlayoutState::Stopped,
434 scheduler: None,
435 playback: None,
436 outputs: Vec::new(),
437 graphics: None,
438 monitor: None,
439 };
440
441 Ok(Self {
442 config: Arc::new(RwLock::new(config)),
443 state: Arc::new(RwLock::new(state)),
444 shutdown_config,
445 control_tx,
446 control_rx: Arc::new(RwLock::new(control_rx)),
447 })
448 }
449
450 pub async fn start(&self) -> Result<()> {
452 let config = self.config.read().await.clone();
453 let mut state = self.state.write().await;
454
455 if state.state != PlayoutState::Stopped {
456 return Err(PlayoutError::Config(
457 "Server is already running".to_string(),
458 ));
459 }
460
461 state.state = PlayoutState::Starting;
462
463 let scheduler_config = scheduler::SchedulerConfig::default();
465 state.scheduler = Some(Arc::new(scheduler::Scheduler::new(scheduler_config)));
466
467 let playback_config = playback::PlaybackConfig::from_playout_config(&config);
469 state.playback = Some(Arc::new(playback::PlaybackEngine::new(playback_config)?));
470
471 let graphics_config = graphics::GraphicsConfig::default();
473 state.graphics = Some(Arc::new(graphics::GraphicsEngine::new(graphics_config)?));
474
475 if config.monitoring_enabled {
477 let monitor_config = monitoring::MonitorConfig {
478 port: config.monitoring_port,
479 audio_meters: true,
480 waveform: false,
481 vectorscope: false,
482 alert_history_size: 100,
483 metrics_retention_seconds: 3600,
484 };
485 state.monitor = Some(Arc::new(monitoring::Monitor::new(monitor_config)?));
486 }
487
488 state.state = PlayoutState::Running;
489
490 Ok(())
491 }
492
493 pub async fn stop(&self) -> Result<()> {
498 let mut state = self.state.write().await;
499
500 if state.state == PlayoutState::Stopped {
501 return Ok(());
502 }
503
504 state.state = PlayoutState::Stopping;
505
506 if self.shutdown_config.flush_buffers {
508 if let Some(playback) = &state.playback {
509 let deadline = tokio::time::Instant::now()
510 + Duration::from_millis(self.shutdown_config.drain_timeout_ms);
511
512 loop {
514 let level = playback.buffer_level();
515 if level == 0 {
516 break;
517 }
518 if tokio::time::Instant::now() >= deadline {
519 break;
521 }
522 let _ = playback.get_next_frame();
524 tokio::task::yield_now().await;
526 }
527 }
528 }
529
530 state.monitor = None;
532 state.graphics = None;
533
534 if let Some(playback) = state.playback.take() {
536 let _ = playback.stop().await;
537 }
538
539 state.scheduler = None;
540 state.outputs.clear();
541
542 state.state = PlayoutState::Stopped;
543
544 Ok(())
545 }
546
547 pub async fn pause(&self) -> Result<()> {
549 let mut state = self.state.write().await;
550 if state.state == PlayoutState::Running {
551 state.state = PlayoutState::Paused;
552 }
553 Ok(())
554 }
555
556 pub async fn resume(&self) -> Result<()> {
558 let mut state = self.state.write().await;
559 if state.state == PlayoutState::Paused {
560 state.state = PlayoutState::Running;
561 }
562 Ok(())
563 }
564
565 pub async fn state(&self) -> PlayoutState {
567 self.state.read().await.state
568 }
569
570 pub async fn load_playlist(&self, path: PathBuf) -> Result<()> {
572 let state = self.state.read().await;
573 if let Some(scheduler) = &state.scheduler {
574 scheduler.load_playlist(path).await?;
575 }
576 Ok(())
577 }
578
579 pub async fn emergency_fallback(&self) -> Result<()> {
581 let mut state = self.state.write().await;
582 state.state = PlayoutState::Fallback;
583 Ok(())
584 }
585
586 pub async fn config(&self) -> PlayoutConfig {
588 self.config.read().await.clone()
589 }
590
591 pub async fn reconfigure(&self, new_config: PlayoutConfig) -> Result<()> {
605 let old_config = self.config.read().await.clone();
606
607 if new_config.buffer_size == 0 {
609 return Err(PlayoutError::Config("buffer_size must be > 0".to_string()));
610 }
611
612 let monitoring_changed = old_config.monitoring_enabled != new_config.monitoring_enabled
614 || old_config.monitoring_port != new_config.monitoring_port;
615
616 if monitoring_changed {
618 let mut state = self.state.write().await;
619 if new_config.monitoring_enabled && state.monitor.is_none() {
620 let monitor_config = monitoring::MonitorConfig {
621 port: new_config.monitoring_port,
622 audio_meters: true,
623 waveform: false,
624 vectorscope: false,
625 alert_history_size: 100,
626 metrics_retention_seconds: 3600,
627 };
628 state.monitor = Some(Arc::new(monitoring::Monitor::new(monitor_config)?));
629 } else if !new_config.monitoring_enabled {
630 state.monitor = None;
631 }
632 }
633
634 *self.config.write().await = new_config;
636
637 Ok(())
638 }
639
640 pub async fn wait(&self) -> Result<()> {
642 loop {
643 let state = self.state().await;
644 if state == PlayoutState::Stopped {
645 break;
646 }
647 tokio::time::sleep(Duration::from_millis(100)).await;
648 }
649 Ok(())
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn test_video_format_properties() {
659 let format = VideoFormat::HD1080p25;
660 assert_eq!(format.fps(), 25.0);
661 assert_eq!(format.width(), 1920);
662 assert_eq!(format.height(), 1080);
663 }
664
665 #[test]
666 fn test_default_config() {
667 let config = PlayoutConfig::default();
668 assert_eq!(config.video_format, VideoFormat::HD1080p25);
669 assert_eq!(config.audio_format.sample_rate, 48000);
670 assert_eq!(config.buffer_size, 10);
671 }
672
673 #[tokio::test]
674 async fn test_server_lifecycle() {
675 let config = PlayoutConfig::default();
676 let server = PlayoutServer::new(config)
677 .await
678 .expect("should succeed in test");
679 assert_eq!(server.state().await, PlayoutState::Stopped);
680 }
681
682 #[tokio::test]
685 async fn test_graceful_shutdown_stopped_server() {
686 let config = PlayoutConfig::default();
687 let server = PlayoutServer::new(config)
688 .await
689 .expect("should succeed in test");
690 server.stop().await.expect("should succeed");
692 assert_eq!(server.state().await, PlayoutState::Stopped);
693 }
694
695 #[tokio::test]
696 async fn test_graceful_shutdown_running_server() {
697 let config = PlayoutConfig::default();
698 let server = PlayoutServer::new(config)
699 .await
700 .expect("should succeed in test");
701 server.start().await.expect("should start");
702 assert_eq!(server.state().await, PlayoutState::Running);
703
704 server.stop().await.expect("should stop gracefully");
705 assert_eq!(server.state().await, PlayoutState::Stopped);
706 }
707
708 #[tokio::test]
709 async fn test_graceful_shutdown_with_custom_config() {
710 let config = PlayoutConfig::default();
711 let shutdown_cfg = ShutdownConfig {
712 drain_timeout_ms: 100,
713 flush_buffers: true,
714 wait_for_current_item: false,
715 };
716 let server = PlayoutServer::with_shutdown_config(config, shutdown_cfg)
717 .await
718 .expect("should succeed in test");
719 server.start().await.expect("should start");
720 server.stop().await.expect("should stop");
721 assert_eq!(server.state().await, PlayoutState::Stopped);
722 }
723
724 #[tokio::test]
725 async fn test_graceful_shutdown_no_flush() {
726 let config = PlayoutConfig::default();
727 let shutdown_cfg = ShutdownConfig {
728 drain_timeout_ms: 100,
729 flush_buffers: false,
730 wait_for_current_item: false,
731 };
732 let server = PlayoutServer::with_shutdown_config(config, shutdown_cfg)
733 .await
734 .expect("should succeed in test");
735 server.start().await.expect("should start");
736 server.stop().await.expect("should stop immediately");
737 assert_eq!(server.state().await, PlayoutState::Stopped);
738 }
739
740 #[test]
741 fn test_shutdown_config_default() {
742 let cfg = ShutdownConfig::default();
743 assert_eq!(cfg.drain_timeout_ms, 5000);
744 assert!(cfg.flush_buffers);
745 assert!(!cfg.wait_for_current_item);
746 }
747
748 #[tokio::test]
751 async fn test_hot_swap_config_while_stopped() {
752 let config = PlayoutConfig::default();
753 let server = PlayoutServer::new(config)
754 .await
755 .expect("should succeed in test");
756
757 let mut new_config = PlayoutConfig::default();
758 new_config.max_latency_ms = 200;
759 new_config.buffer_size = 20;
760
761 server
762 .reconfigure(new_config)
763 .await
764 .expect("should reconfigure");
765
766 let current = server.config().await;
767 assert_eq!(current.max_latency_ms, 200);
768 assert_eq!(current.buffer_size, 20);
769 }
770
771 #[tokio::test]
772 async fn test_hot_swap_config_while_running() {
773 let config = PlayoutConfig::default();
774 let server = PlayoutServer::new(config)
775 .await
776 .expect("should succeed in test");
777 server.start().await.expect("should start");
778
779 let mut new_config = PlayoutConfig::default();
780 new_config.max_latency_ms = 50;
781 new_config.detect_frame_drops = false;
782
783 server
784 .reconfigure(new_config)
785 .await
786 .expect("should reconfigure while running");
787
788 let current = server.config().await;
789 assert_eq!(current.max_latency_ms, 50);
790 assert!(!current.detect_frame_drops);
791
792 server.stop().await.expect("should stop");
793 }
794
795 #[tokio::test]
796 async fn test_hot_swap_invalid_config() {
797 let config = PlayoutConfig::default();
798 let server = PlayoutServer::new(config)
799 .await
800 .expect("should succeed in test");
801
802 let mut bad_config = PlayoutConfig::default();
803 bad_config.buffer_size = 0;
804
805 let result = server.reconfigure(bad_config).await;
806 assert!(result.is_err());
807 }
808
809 #[tokio::test]
810 async fn test_hot_swap_enable_monitoring() {
811 let mut config = PlayoutConfig::default();
812 config.monitoring_enabled = false;
813 let server = PlayoutServer::new(config)
814 .await
815 .expect("should succeed in test");
816 server.start().await.expect("should start");
817
818 let mut new_config = server.config().await;
820 new_config.monitoring_enabled = true;
821 new_config.monitoring_port = 19090;
822 server
823 .reconfigure(new_config)
824 .await
825 .expect("should enable monitoring");
826
827 let current = server.config().await;
828 assert!(current.monitoring_enabled);
829 assert_eq!(current.monitoring_port, 19090);
830
831 server.stop().await.expect("should stop");
832 }
833
834 #[tokio::test]
835 async fn test_hot_swap_disable_monitoring() {
836 let config = PlayoutConfig::default();
837 let server = PlayoutServer::new(config)
838 .await
839 .expect("should succeed in test");
840 server.start().await.expect("should start");
841
842 let mut new_config = server.config().await;
844 new_config.monitoring_enabled = false;
845 server
846 .reconfigure(new_config)
847 .await
848 .expect("should disable monitoring");
849
850 let current = server.config().await;
851 assert!(!current.monitoring_enabled);
852
853 server.stop().await.expect("should stop");
854 }
855
856 #[tokio::test]
857 async fn test_hot_swap_video_format_stored() {
858 let config = PlayoutConfig::default();
859 let server = PlayoutServer::new(config)
860 .await
861 .expect("should succeed in test");
862
863 let mut new_config = PlayoutConfig::default();
864 new_config.video_format = VideoFormat::UHD2160p50;
865 server
866 .reconfigure(new_config)
867 .await
868 .expect("should store format change");
869
870 let current = server.config().await;
871 assert_eq!(current.video_format, VideoFormat::UHD2160p50);
872 }
873}