mmids_core/endpoints/rtmp_server/mod.rs
1//! This endpoint acts as a server for RTMP clients that want to publish or watch RTMP live streams.
2//! Workflow steps send a message requesting to allow RTMP publishers or watchers for specific
3//! port, RTMP application and stream key combinations. The RTMP server endpoint will register the
4//! specified port with the networking infrastructure for listening for connections, and any
5//! networked traffic over that port will be forwarded to this endpoint.
6//!
7//! It will then perform handshaking and all other RTMP protocol actions, disconnecting clients if
8//! they don't conform to the RTMP protocol correctly, or if they attempt to publish or watch an
9//! application name and stream key combination that isn't actively registered.
10//!
11//! Incoming publish actions (such as new metadata, media packets, etc...) are passed to the workflow
12//! steps that were registered for that application/stream key combination. Likewise, when the
13//! endpoint receives media from workflow steps it will route that media to the correct RTMP watcher
14//! clients
15
16mod actor;
17
18use crate::codecs::{AudioCodec, VideoCodec};
19use crate::net::tcp::TcpSocketRequest;
20use crate::net::{ConnectionId, IpAddress};
21use crate::reactors::ReactorWorkflowUpdate;
22use crate::StreamId;
23use actor::actor_types::RtmpServerEndpointActor;
24use bytes::Bytes;
25use futures::stream::FuturesUnordered;
26use rml_rtmp::sessions::StreamMetadata;
27use rml_rtmp::time::RtmpTimestamp;
28use std::collections::HashMap;
29use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
30use tokio::sync::oneshot::Sender;
31
32/// Starts a new RTMP server endpoint, returning a channel that can be used to send notifications
33/// and requests to it.
34pub fn start_rtmp_server_endpoint(
35 socket_request_sender: UnboundedSender<TcpSocketRequest>,
36) -> UnboundedSender<RtmpEndpointRequest> {
37 let (endpoint_sender, endpoint_receiver) = unbounded_channel();
38
39 let endpoint = RtmpServerEndpointActor {
40 futures: FuturesUnordered::new(),
41 ports: HashMap::new(),
42 };
43
44 tokio::spawn(endpoint.run(endpoint_receiver, socket_request_sender));
45
46 endpoint_sender
47}
48
49/// Specifies how a stream key should be registered for playback or publishing
50#[derive(Clone, Hash, Eq, PartialEq, Debug)]
51pub enum StreamKeyRegistration {
52 /// All stream keys for the the rtmp application should be registered
53 Any,
54
55 /// Only set up registration for the exact stream key
56 Exact(String),
57}
58
59/// Specifies if there are any IP address restrictions as part of an RTMP server registration
60#[derive(Debug, PartialEq)]
61pub enum IpRestriction {
62 /// All IP addresses are allowed
63 None,
64
65 /// Only the specified IP addresses are allowed.
66 Allow(Vec<IpAddress>),
67
68 /// All IP addresses are allowed except for the ones specified.
69 Deny(Vec<IpAddress>),
70}
71
72/// Type of registration the request is related to
73#[derive(Debug)]
74pub enum RegistrationType {
75 Publisher,
76 Watcher,
77}
78
79/// Operations the rtmp server endpoint is being requested to make
80#[derive(Debug)]
81pub enum RtmpEndpointRequest {
82 /// Requests the RTMP server to allow publishers on the given port, app, and stream key
83 /// combinations.
84 ListenForPublishers {
85 /// Port to listen for RTMP publisher connections on
86 port: u16,
87
88 /// Name of the RTMP application publishers will connect to
89 rtmp_app: String,
90
91 /// What stream key publishers should be using
92 rtmp_stream_key: StreamKeyRegistration,
93
94 /// Channel that the rtmp server endpoint should respond with
95 message_channel: UnboundedSender<RtmpEndpointPublisherMessage>,
96
97 /// If specified, new media streams being published from this registration will be given
98 /// the stream id specified. If no id is given than one will be generated. This is useful
99 /// to correlate media streams that may have been pulled, processed externally, then brought
100 /// back in for later workflow steps (e.g. an external transcoding workflow).
101 stream_id: Option<StreamId>,
102
103 /// What IP restriction rules should be in place for this registration
104 ip_restrictions: IpRestriction,
105
106 /// If true, this port should be on a TLS socket (i.e. RTMPS)
107 use_tls: bool,
108
109 /// If true, then publishers will not be automatically accepted even if they connect to
110 /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
111 /// should be asked for final verification if the publisher should be allowed or not.
112 requires_registrant_approval: bool,
113 },
114
115 /// Requests the RTMP server to allow clients to receive video on the given port, app,
116 /// and stream key combinations
117 ListenForWatchers {
118 /// Port to listen on
119 port: u16,
120
121 /// Name of the RTMP application playback clients will connect to
122 rtmp_app: String,
123
124 /// Stream keys clients can receive video on
125 rtmp_stream_key: StreamKeyRegistration,
126
127 /// The channel that the rtmp server endpoint will send notifications to
128 notification_channel: UnboundedSender<RtmpEndpointWatcherNotification>,
129
130 /// The channel that the registrant will send updated media data to the rtmp endpoint on
131 media_channel: UnboundedReceiver<RtmpEndpointMediaMessage>,
132
133 /// What IP restriction rules should be in place for this registration
134 ip_restrictions: IpRestriction,
135
136 /// If true, this port should be on a TLS socket (i.e. RTMPS)
137 use_tls: bool,
138
139 /// If true, then watchers will not be automatically accepted even if they connect to
140 /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
141 /// should be asked for final verification if the watcher should be allowed or not.
142 requires_registrant_approval: bool,
143 },
144
145 /// Requests the specified registration should be removed
146 RemoveRegistration {
147 /// The type of registration that is being removed
148 registration_type: RegistrationType,
149
150 /// Port the removed registrant was listening on
151 port: u16,
152
153 /// The RTMP application name that the registrant was listening on
154 rtmp_app: String,
155
156 /// The stream key the registrant had registered for
157 rtmp_stream_key: StreamKeyRegistration,
158 },
159}
160
161/// Response to approval/validation requests
162#[derive(Debug)]
163pub enum ValidationResponse {
164 Approve {
165 reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
166 },
167
168 Reject,
169}
170
171/// Messages the rtmp server endpoint will send to publisher registrants.
172#[derive(Debug)]
173pub enum RtmpEndpointPublisherMessage {
174 /// Notification that the publisher registration failed. No further messages will be sent
175 /// if this is sent.
176 PublisherRegistrationFailed,
177
178 /// Notification that the publisher registration succeeded.
179 PublisherRegistrationSuccessful,
180
181 /// Notification that a new RTMP connection has been made and they have requested to be a
182 /// publisher on a stream key, but they require validation before being approved.
183 PublisherRequiringApproval {
184 /// Unique identifier for the TCP connection that's requesting to be a publisher
185 connection_id: ConnectionId,
186
187 /// The stream key that the connection is requesting to be a publisher to
188 stream_key: String,
189
190 /// Channel to send the approval or rejection response to
191 response_channel: Sender<ValidationResponse>,
192 },
193
194 /// Notification that a new RTMP connection has been made and is publishing media
195 NewPublisherConnected {
196 /// Unique identifier for the TCP connection that's publishing
197 connection_id: ConnectionId,
198
199 /// Unique identifier for the stream.
200 stream_id: StreamId,
201
202 /// Actual stream key that this stream is coming in from. Mostly used if the registrant
203 /// specified that Any stream key would be allowed.
204 stream_key: String,
205
206 /// If provided, this is a channel which will receive workflow updates from a reactor
207 /// tied to this publisher
208 reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
209 },
210
211 /// Notification that a publisher has stopped publishing. It may still be connected to the
212 /// server, but it is no longer in a publisher state.
213 PublishingStopped {
214 /// Unique identifier for the TCP connection that stopped publishing
215 connection_id: ConnectionId,
216 },
217
218 /// An RTMP publisher has sent in new stream metadata information
219 StreamMetadataChanged {
220 publisher: ConnectionId,
221 metadata: StreamMetadata,
222 },
223
224 /// An RTMP publisher has sent in new video data
225 NewVideoData {
226 publisher: ConnectionId,
227 codec: VideoCodec,
228 is_keyframe: bool,
229 is_sequence_header: bool,
230 data: Bytes,
231 timestamp: RtmpTimestamp,
232 composition_time_offset: i32,
233 },
234
235 /// An RTMP publisher has sent in new audio data
236 NewAudioData {
237 publisher: ConnectionId,
238 codec: AudioCodec,
239 is_sequence_header: bool,
240 data: Bytes,
241 timestamp: RtmpTimestamp,
242 },
243}
244
245/// Messages the rtmp server endpoint will send to watcher registrants
246#[derive(Debug)]
247pub enum RtmpEndpointWatcherNotification {
248 /// The request to register for watchers has failed. No further messages will be sent
249 /// afterwards.
250 WatcherRegistrationFailed,
251
252 /// The request to register for watchers was successful
253 WatcherRegistrationSuccessful,
254
255 /// Notification that a new RTMP connection has been made and they have requested to be a
256 /// watcher on a stream key, but they require validation before being approved.
257 WatcherRequiringApproval {
258 /// Unique identifier for the TCP connection that's requesting to be a watcher
259 connection_id: ConnectionId,
260
261 /// The stream key that the connection is requesting to be a watcher of
262 stream_key: String,
263
264 /// Channel to send the approval or rejection response to
265 response_channel: Sender<ValidationResponse>,
266 },
267
268 /// Notifies the registrant that at least one watcher is now watching on a particular
269 /// stream key,
270 StreamKeyBecameActive {
271 stream_key: String,
272 reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
273 },
274
275 /// Notifies the registrant that the last watcher has disconnected on the stream key, and
276 /// there are no longer anyone watching
277 StreamKeyBecameInactive { stream_key: String },
278}
279
280/// Message watcher registrants send to announce new media data that should be sent to watchers
281#[derive(Debug)]
282pub struct RtmpEndpointMediaMessage {
283 pub stream_key: String,
284 pub data: RtmpEndpointMediaData,
285}
286
287/// New media data that should be sent to watchers
288#[derive(Debug, Clone, PartialEq)]
289pub enum RtmpEndpointMediaData {
290 NewStreamMetaData {
291 metadata: StreamMetadata,
292 },
293
294 NewVideoData {
295 codec: VideoCodec,
296 is_keyframe: bool,
297 is_sequence_header: bool,
298 data: Bytes,
299 timestamp: RtmpTimestamp,
300 composition_time_offset: i32,
301 },
302
303 NewAudioData {
304 codec: AudioCodec,
305 is_sequence_header: bool,
306 data: Bytes,
307 timestamp: RtmpTimestamp,
308 },
309}