mmids_core/workflows/
mod.rs1pub mod definitions;
7pub mod manager;
8mod runner;
9pub mod steps;
10
11pub use runner::{start_workflow, WorkflowRequest, WorkflowRequestOperation, WorkflowStatus};
12
13use crate::codecs::{AudioCodec, VideoCodec};
14use crate::endpoints::rtmp_server::RtmpEndpointMediaData;
15use crate::utils::hash_map_to_stream_metadata;
16use crate::{StreamId, VideoTimestamp};
17use bytes::Bytes;
18use rml_rtmp::time::RtmpTimestamp;
19use std::collections::HashMap;
20use std::time::Duration;
21
22pub use runner::{WorkflowState, WorkflowStepState};
23
24#[derive(Clone, Debug, PartialEq)]
26pub struct MediaNotification {
27 pub stream_id: StreamId,
29
30 pub content: MediaNotificationContent,
32}
33
34#[derive(Clone, Debug, PartialEq)]
36pub enum MediaNotificationContent {
37 NewIncomingStream {
40 stream_name: String,
42 },
43
44 StreamDisconnected,
49
50 Video {
52 codec: VideoCodec,
53 is_sequence_header: bool,
54 is_keyframe: bool,
55 data: Bytes,
56 timestamp: VideoTimestamp,
57 },
58
59 Audio {
61 codec: AudioCodec,
62 is_sequence_header: bool,
63 data: Bytes,
64 timestamp: Duration,
65 },
66
67 Metadata { data: HashMap<String, String> },
69}
70
71impl MediaNotificationContent {
72 pub fn to_rtmp_media_data(&self) -> Option<RtmpEndpointMediaData> {
74 match self {
75 MediaNotificationContent::StreamDisconnected => return None,
76 MediaNotificationContent::NewIncomingStream { stream_name: _ } => return None,
77 MediaNotificationContent::Metadata { data } => {
78 Some(RtmpEndpointMediaData::NewStreamMetaData {
79 metadata: hash_map_to_stream_metadata(&data),
80 })
81 }
82
83 MediaNotificationContent::Video {
84 codec,
85 is_keyframe,
86 is_sequence_header,
87 data,
88 timestamp,
89 } => Some(RtmpEndpointMediaData::NewVideoData {
90 data: data.clone(),
91 codec: codec.clone(),
92 is_keyframe: *is_keyframe,
93 is_sequence_header: *is_sequence_header,
94 timestamp: RtmpTimestamp::new(timestamp.dts.as_millis() as u32),
95 composition_time_offset: timestamp.pts_offset,
96 }),
97
98 MediaNotificationContent::Audio {
99 codec,
100 is_sequence_header,
101 timestamp,
102 data,
103 } => Some(RtmpEndpointMediaData::NewAudioData {
104 data: data.clone(),
105 codec: codec.clone(),
106 timestamp: RtmpTimestamp::new(timestamp.as_millis() as u32),
107 is_sequence_header: *is_sequence_header,
108 }),
109 }
110 }
111}