mmids_core/workflows/
mod.rs

1//! A workflow represents a single media pipeline.  Each workflow contains one or more steps that
2//! can either receive video, transform video, or send video to other sources.  Media data
3//! transitions from one step to the next in a linear fashion based on the order in which they
4//! were defined.
5
6pub mod definitions;
7pub mod manager;
8mod runner;
9pub mod steps;
10
11pub use runner::{start_workflow, WorkflowRequest, WorkflowRequestOperation, WorkflowStatus};
12
13use crate::codecs::{AudioCodec, VideoCodec};
14use crate::endpoints::rtmp_server::RtmpEndpointMediaData;
15use crate::utils::hash_map_to_stream_metadata;
16use crate::{StreamId, VideoTimestamp};
17use bytes::Bytes;
18use rml_rtmp::time::RtmpTimestamp;
19use std::collections::HashMap;
20use std::time::Duration;
21
22pub use runner::{WorkflowState, WorkflowStepState};
23
24/// Notification about media coming across a specific stream
25#[derive(Clone, Debug, PartialEq)]
26pub struct MediaNotification {
27    /// The identifier for the stream that this notification pertains to
28    pub stream_id: StreamId,
29
30    /// The content of the notification message
31    pub content: MediaNotificationContent,
32}
33
34/// The detailed information contained within a media notification
35#[derive(Clone, Debug, PartialEq)]
36pub enum MediaNotificationContent {
37    /// Announces that this stream has now connected, and steps that receive this notification
38    /// should prepare for media data to start coming through
39    NewIncomingStream {
40        /// The name for the stream that's being published
41        stream_name: String,
42    },
43
44    /// Announces that this stream's source has disconnected and will no longer be sending any
45    /// new notifications down.  Steps that receive this message can use this to clean up any
46    /// information they are tracking about this stream, as no new media will arrive without
47    /// a new `NewIncomingStream` announcement.
48    StreamDisconnected,
49
50    /// Video content
51    Video {
52        codec: VideoCodec,
53        is_sequence_header: bool,
54        is_keyframe: bool,
55        data: Bytes,
56        timestamp: VideoTimestamp,
57    },
58
59    /// Audio content
60    Audio {
61        codec: AudioCodec,
62        is_sequence_header: bool,
63        data: Bytes,
64        timestamp: Duration,
65    },
66
67    /// New stream metadata
68    Metadata { data: HashMap<String, String> },
69}
70
71impl MediaNotificationContent {
72    /// Creates an RTMP representation of the media data from the specified media content
73    pub fn to_rtmp_media_data(&self) -> Option<RtmpEndpointMediaData> {
74        match self {
75            MediaNotificationContent::StreamDisconnected => return None,
76            MediaNotificationContent::NewIncomingStream { stream_name: _ } => return None,
77            MediaNotificationContent::Metadata { data } => {
78                Some(RtmpEndpointMediaData::NewStreamMetaData {
79                    metadata: hash_map_to_stream_metadata(&data),
80                })
81            }
82
83            MediaNotificationContent::Video {
84                codec,
85                is_keyframe,
86                is_sequence_header,
87                data,
88                timestamp,
89            } => Some(RtmpEndpointMediaData::NewVideoData {
90                data: data.clone(),
91                codec: codec.clone(),
92                is_keyframe: *is_keyframe,
93                is_sequence_header: *is_sequence_header,
94                timestamp: RtmpTimestamp::new(timestamp.dts.as_millis() as u32),
95                composition_time_offset: timestamp.pts_offset,
96            }),
97
98            MediaNotificationContent::Audio {
99                codec,
100                is_sequence_header,
101                timestamp,
102                data,
103            } => Some(RtmpEndpointMediaData::NewAudioData {
104                data: data.clone(),
105                codec: codec.clone(),
106                timestamp: RtmpTimestamp::new(timestamp.as_millis() as u32),
107                is_sequence_header: *is_sequence_header,
108            }),
109        }
110    }
111}