mmids_core/endpoints/rtmp_server/
mod.rs

1//! This endpoint acts as a server for RTMP clients that want to publish or watch RTMP live streams.
2//! Workflow steps send a message requesting to allow RTMP publishers or watchers for specific
3//! port, RTMP application and stream key combinations.  The RTMP server endpoint will register the
4//! specified port with the networking infrastructure for listening for connections, and any
5//! networked traffic over that port will be forwarded to this endpoint.
6//!
7//! It will then perform handshaking and all other RTMP protocol actions, disconnecting clients if
8//! they don't conform to the RTMP protocol correctly, or if they attempt to publish or watch an
9//! application name and stream key combination that isn't actively registered.
10//!
11//! Incoming publish actions (such as new metadata, media packets, etc...) are passed to the workflow
12//! steps that were registered for that application/stream key combination.  Likewise, when the
13//! endpoint receives media from workflow steps it will route that media to the correct RTMP watcher
14//! clients
15
16mod actor;
17
18use crate::codecs::{AudioCodec, VideoCodec};
19use crate::net::tcp::TcpSocketRequest;
20use crate::net::{ConnectionId, IpAddress};
21use crate::reactors::ReactorWorkflowUpdate;
22use crate::StreamId;
23use actor::actor_types::RtmpServerEndpointActor;
24use bytes::Bytes;
25use futures::stream::FuturesUnordered;
26use rml_rtmp::sessions::StreamMetadata;
27use rml_rtmp::time::RtmpTimestamp;
28use std::collections::HashMap;
29use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
30use tokio::sync::oneshot::Sender;
31
32/// Starts a new RTMP server endpoint, returning a channel that can be used to send notifications
33/// and requests to it.
34pub fn start_rtmp_server_endpoint(
35    socket_request_sender: UnboundedSender<TcpSocketRequest>,
36) -> UnboundedSender<RtmpEndpointRequest> {
37    let (endpoint_sender, endpoint_receiver) = unbounded_channel();
38
39    let endpoint = RtmpServerEndpointActor {
40        futures: FuturesUnordered::new(),
41        ports: HashMap::new(),
42    };
43
44    tokio::spawn(endpoint.run(endpoint_receiver, socket_request_sender));
45
46    endpoint_sender
47}
48
49/// Specifies how a stream key should be registered for playback or publishing
50#[derive(Clone, Hash, Eq, PartialEq, Debug)]
51pub enum StreamKeyRegistration {
52    /// All stream keys for the the rtmp application should be registered
53    Any,
54
55    /// Only set up registration for the exact stream key
56    Exact(String),
57}
58
59/// Specifies if there are any IP address restrictions as part of an RTMP server registration
60#[derive(Debug, PartialEq)]
61pub enum IpRestriction {
62    /// All IP addresses are allowed
63    None,
64
65    /// Only the specified IP addresses are allowed.
66    Allow(Vec<IpAddress>),
67
68    /// All IP addresses are allowed except for the ones specified.
69    Deny(Vec<IpAddress>),
70}
71
72/// Type of registration the request is related to
73#[derive(Debug)]
74pub enum RegistrationType {
75    Publisher,
76    Watcher,
77}
78
79/// Operations the rtmp server endpoint is being requested to make
80#[derive(Debug)]
81pub enum RtmpEndpointRequest {
82    /// Requests the RTMP server to allow publishers on the given port, app, and stream key
83    /// combinations.
84    ListenForPublishers {
85        /// Port to listen for RTMP publisher connections on
86        port: u16,
87
88        /// Name of the RTMP application publishers will connect to
89        rtmp_app: String,
90
91        /// What stream key publishers should be using
92        rtmp_stream_key: StreamKeyRegistration,
93
94        /// Channel that the rtmp server endpoint should respond with
95        message_channel: UnboundedSender<RtmpEndpointPublisherMessage>,
96
97        /// If specified, new media streams being published from this registration will be given
98        /// the stream id specified.  If no id is given than one will be generated.  This is useful
99        /// to correlate media streams that may have been pulled, processed externally, then brought
100        /// back in for later workflow steps (e.g. an external transcoding workflow).
101        stream_id: Option<StreamId>,
102
103        /// What IP restriction rules should be in place for this registration
104        ip_restrictions: IpRestriction,
105
106        /// If true, this port should be on a TLS socket (i.e. RTMPS)
107        use_tls: bool,
108
109        /// If true, then publishers will not be automatically accepted even if they connect to
110        /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
111        /// should be asked for final verification if the publisher should be allowed or not.
112        requires_registrant_approval: bool,
113    },
114
115    /// Requests the RTMP server to allow clients to receive video on the given port, app,
116    /// and stream key combinations
117    ListenForWatchers {
118        /// Port to listen on
119        port: u16,
120
121        /// Name of the RTMP application playback clients will connect to
122        rtmp_app: String,
123
124        /// Stream keys clients can receive video on
125        rtmp_stream_key: StreamKeyRegistration,
126
127        /// The channel that the rtmp server endpoint will send notifications to
128        notification_channel: UnboundedSender<RtmpEndpointWatcherNotification>,
129
130        /// The channel that the registrant will send updated media data to the rtmp endpoint on
131        media_channel: UnboundedReceiver<RtmpEndpointMediaMessage>,
132
133        /// What IP restriction rules should be in place for this registration
134        ip_restrictions: IpRestriction,
135
136        /// If true, this port should be on a TLS socket (i.e. RTMPS)
137        use_tls: bool,
138
139        /// If true, then watchers will not be automatically accepted even if they connect to
140        /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
141        /// should be asked for final verification if the watcher should be allowed or not.
142        requires_registrant_approval: bool,
143    },
144
145    /// Requests the specified registration should be removed
146    RemoveRegistration {
147        /// The type of registration that is being removed
148        registration_type: RegistrationType,
149
150        /// Port the removed registrant was listening on
151        port: u16,
152
153        /// The RTMP application name that the registrant was listening on
154        rtmp_app: String,
155
156        /// The stream key the registrant had registered for
157        rtmp_stream_key: StreamKeyRegistration,
158    },
159}
160
161/// Response to approval/validation requests
162#[derive(Debug)]
163pub enum ValidationResponse {
164    Approve {
165        reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
166    },
167
168    Reject,
169}
170
171/// Messages the rtmp server endpoint will send to publisher registrants.
172#[derive(Debug)]
173pub enum RtmpEndpointPublisherMessage {
174    /// Notification that the publisher registration failed.  No further messages will be sent
175    /// if this is sent.
176    PublisherRegistrationFailed,
177
178    /// Notification that the publisher registration succeeded.
179    PublisherRegistrationSuccessful,
180
181    /// Notification that a new RTMP connection has been made and they have requested to be a
182    /// publisher on a stream key, but they require validation before being approved.
183    PublisherRequiringApproval {
184        /// Unique identifier for the TCP connection that's requesting to be a publisher
185        connection_id: ConnectionId,
186
187        /// The stream key that the connection is requesting to be a publisher to
188        stream_key: String,
189
190        /// Channel to send the approval or rejection response to
191        response_channel: Sender<ValidationResponse>,
192    },
193
194    /// Notification that a new RTMP connection has been made and is publishing media
195    NewPublisherConnected {
196        /// Unique identifier for the TCP connection that's publishing
197        connection_id: ConnectionId,
198
199        /// Unique identifier for the stream.
200        stream_id: StreamId,
201
202        /// Actual stream key that this stream is coming in from.  Mostly used if the registrant
203        /// specified that Any stream key would be allowed.
204        stream_key: String,
205
206        /// If provided, this is a channel which will receive workflow updates from a reactor
207        /// tied to this publisher
208        reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
209    },
210
211    /// Notification that a publisher has stopped publishing.  It may still be connected to the
212    /// server, but it is no longer in a publisher state.
213    PublishingStopped {
214        /// Unique identifier for the TCP connection that stopped publishing
215        connection_id: ConnectionId,
216    },
217
218    /// An RTMP publisher has sent in new stream metadata information
219    StreamMetadataChanged {
220        publisher: ConnectionId,
221        metadata: StreamMetadata,
222    },
223
224    /// An RTMP publisher has sent in new video data
225    NewVideoData {
226        publisher: ConnectionId,
227        codec: VideoCodec,
228        is_keyframe: bool,
229        is_sequence_header: bool,
230        data: Bytes,
231        timestamp: RtmpTimestamp,
232        composition_time_offset: i32,
233    },
234
235    /// An RTMP publisher has sent in new audio data
236    NewAudioData {
237        publisher: ConnectionId,
238        codec: AudioCodec,
239        is_sequence_header: bool,
240        data: Bytes,
241        timestamp: RtmpTimestamp,
242    },
243}
244
245/// Messages the rtmp server endpoint will send to watcher registrants
246#[derive(Debug)]
247pub enum RtmpEndpointWatcherNotification {
248    /// The request to register for watchers has failed.  No further messages will be sent
249    /// afterwards.
250    WatcherRegistrationFailed,
251
252    /// The request to register for watchers was successful
253    WatcherRegistrationSuccessful,
254
255    /// Notification that a new RTMP connection has been made and they have requested to be a
256    /// watcher on a stream key, but they require validation before being approved.
257    WatcherRequiringApproval {
258        /// Unique identifier for the TCP connection that's requesting to be a watcher
259        connection_id: ConnectionId,
260
261        /// The stream key that the connection is requesting to be a watcher of
262        stream_key: String,
263
264        /// Channel to send the approval or rejection response to
265        response_channel: Sender<ValidationResponse>,
266    },
267
268    /// Notifies the registrant that at least one watcher is now watching on a particular
269    /// stream key,
270    StreamKeyBecameActive {
271        stream_key: String,
272        reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
273    },
274
275    /// Notifies the registrant that the last watcher has disconnected on the stream key, and
276    /// there are no longer anyone watching
277    StreamKeyBecameInactive { stream_key: String },
278}
279
280/// Message watcher registrants send to announce new media data that should be sent to watchers
281#[derive(Debug)]
282pub struct RtmpEndpointMediaMessage {
283    pub stream_key: String,
284    pub data: RtmpEndpointMediaData,
285}
286
287/// New media data that should be sent to watchers
288#[derive(Debug, Clone, PartialEq)]
289pub enum RtmpEndpointMediaData {
290    NewStreamMetaData {
291        metadata: StreamMetadata,
292    },
293
294    NewVideoData {
295        codec: VideoCodec,
296        is_keyframe: bool,
297        is_sequence_header: bool,
298        data: Bytes,
299        timestamp: RtmpTimestamp,
300        composition_time_offset: i32,
301    },
302
303    NewAudioData {
304        codec: AudioCodec,
305        is_sequence_header: bool,
306        data: Bytes,
307        timestamp: RtmpTimestamp,
308    },
309}