1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
//! This endpoint acts as a server for RTMP clients that want to publish or watch RTMP live streams.
//! Workflow steps send a message requesting to allow RTMP publishers or watchers for specific
//! port, RTMP application and stream key combinations.  The RTMP server endpoint will register the
//! specified port with the networking infrastructure for listening for connections, and any
//! networked traffic over that port will be forwarded to this endpoint.
//!
//! It will then perform handshaking and all other RTMP protocol actions, disconnecting clients if
//! they don't conform to the RTMP protocol correctly, or if they attempt to publish or watch an
//! application name and stream key combination that isn't actively registered.
//!
//! Incoming publish actions (such as new metadata, media packets, etc...) are passed to the workflow
//! steps that were registered for that application/stream key combination.  Likewise, when the
//! endpoint receives media from workflow steps it will route that media to the correct RTMP watcher
//! clients

mod actor;

use crate::codecs::{AudioCodec, VideoCodec};
use crate::net::tcp::TcpSocketRequest;
use crate::net::{ConnectionId, IpAddress};
use crate::reactors::ReactorWorkflowUpdate;
use crate::StreamId;
use actor::actor_types::RtmpServerEndpointActor;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use rml_rtmp::sessions::StreamMetadata;
use rml_rtmp::time::RtmpTimestamp;
use std::collections::HashMap;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender;

/// Starts a new RTMP server endpoint, returning a channel that can be used to send notifications
/// and requests to it.
pub fn start_rtmp_server_endpoint(
    socket_request_sender: UnboundedSender<TcpSocketRequest>,
) -> UnboundedSender<RtmpEndpointRequest> {
    let (endpoint_sender, endpoint_receiver) = unbounded_channel();

    let endpoint = RtmpServerEndpointActor {
        futures: FuturesUnordered::new(),
        ports: HashMap::new(),
    };

    tokio::spawn(endpoint.run(endpoint_receiver, socket_request_sender));

    endpoint_sender
}

/// Specifies how a stream key should be registered for playback or publishing
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub enum StreamKeyRegistration {
    /// All stream keys for the the rtmp application should be registered
    Any,

    /// Only set up registration for the exact stream key
    Exact(String),
}

/// Specifies if there are any IP address restrictions as part of an RTMP server registration
#[derive(Debug, PartialEq)]
pub enum IpRestriction {
    /// All IP addresses are allowed
    None,

    /// Only the specified IP addresses are allowed.
    Allow(Vec<IpAddress>),

    /// All IP addresses are allowed except for the ones specified.
    Deny(Vec<IpAddress>),
}

/// Type of registration the request is related to
#[derive(Debug)]
pub enum RegistrationType {
    Publisher,
    Watcher,
}

/// Operations the rtmp server endpoint is being requested to make
#[derive(Debug)]
pub enum RtmpEndpointRequest {
    /// Requests the RTMP server to allow publishers on the given port, app, and stream key
    /// combinations.
    ListenForPublishers {
        /// Port to listen for RTMP publisher connections on
        port: u16,

        /// Name of the RTMP application publishers will connect to
        rtmp_app: String,

        /// What stream key publishers should be using
        rtmp_stream_key: StreamKeyRegistration,

        /// Channel that the rtmp server endpoint should respond with
        message_channel: UnboundedSender<RtmpEndpointPublisherMessage>,

        /// If specified, new media streams being published from this registration will be given
        /// the stream id specified.  If no id is given than one will be generated.  This is useful
        /// to correlate media streams that may have been pulled, processed externally, then brought
        /// back in for later workflow steps (e.g. an external transcoding workflow).
        stream_id: Option<StreamId>,

        /// What IP restriction rules should be in place for this registration
        ip_restrictions: IpRestriction,

        /// If true, this port should be on a TLS socket (i.e. RTMPS)
        use_tls: bool,

        /// If true, then publishers will not be automatically accepted even if they connect to
        /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
        /// should be asked for final verification if the publisher should be allowed or not.
        requires_registrant_approval: bool,
    },

    /// Requests the RTMP server to allow clients to receive video on the given port, app,
    /// and stream key combinations
    ListenForWatchers {
        /// Port to listen on
        port: u16,

        /// Name of the RTMP application playback clients will connect to
        rtmp_app: String,

        /// Stream keys clients can receive video on
        rtmp_stream_key: StreamKeyRegistration,

        /// The channel that the rtmp server endpoint will send notifications to
        notification_channel: UnboundedSender<RtmpEndpointWatcherNotification>,

        /// The channel that the registrant will send updated media data to the rtmp endpoint on
        media_channel: UnboundedReceiver<RtmpEndpointMediaMessage>,

        /// What IP restriction rules should be in place for this registration
        ip_restrictions: IpRestriction,

        /// If true, this port should be on a TLS socket (i.e. RTMPS)
        use_tls: bool,

        /// If true, then watchers will not be automatically accepted even if they connect to
        /// the correct app/stream key combination and pass ip restrictions. Instead the registrant
        /// should be asked for final verification if the watcher should be allowed or not.
        requires_registrant_approval: bool,
    },

    /// Requests the specified registration should be removed
    RemoveRegistration {
        /// The type of registration that is being removed
        registration_type: RegistrationType,

        /// Port the removed registrant was listening on
        port: u16,

        /// The RTMP application name that the registrant was listening on
        rtmp_app: String,

        /// The stream key the registrant had registered for
        rtmp_stream_key: StreamKeyRegistration,
    },
}

/// Response to approval/validation requests
#[derive(Debug)]
pub enum ValidationResponse {
    Approve {
        reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
    },

    Reject,
}

/// Messages the rtmp server endpoint will send to publisher registrants.
#[derive(Debug)]
pub enum RtmpEndpointPublisherMessage {
    /// Notification that the publisher registration failed.  No further messages will be sent
    /// if this is sent.
    PublisherRegistrationFailed,

    /// Notification that the publisher registration succeeded.
    PublisherRegistrationSuccessful,

    /// Notification that a new RTMP connection has been made and they have requested to be a
    /// publisher on a stream key, but they require validation before being approved.
    PublisherRequiringApproval {
        /// Unique identifier for the TCP connection that's requesting to be a publisher
        connection_id: ConnectionId,

        /// The stream key that the connection is requesting to be a publisher to
        stream_key: String,

        /// Channel to send the approval or rejection response to
        response_channel: Sender<ValidationResponse>,
    },

    /// Notification that a new RTMP connection has been made and is publishing media
    NewPublisherConnected {
        /// Unique identifier for the TCP connection that's publishing
        connection_id: ConnectionId,

        /// Unique identifier for the stream.
        stream_id: StreamId,

        /// Actual stream key that this stream is coming in from.  Mostly used if the registrant
        /// specified that Any stream key would be allowed.
        stream_key: String,

        /// If provided, this is a channel which will receive workflow updates from a reactor
        /// tied to this publisher
        reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
    },

    /// Notification that a publisher has stopped publishing.  It may still be connected to the
    /// server, but it is no longer in a publisher state.
    PublishingStopped {
        /// Unique identifier for the TCP connection that stopped publishing
        connection_id: ConnectionId,
    },

    /// An RTMP publisher has sent in new stream metadata information
    StreamMetadataChanged {
        publisher: ConnectionId,
        metadata: StreamMetadata,
    },

    /// An RTMP publisher has sent in new video data
    NewVideoData {
        publisher: ConnectionId,
        codec: VideoCodec,
        is_keyframe: bool,
        is_sequence_header: bool,
        data: Bytes,
        timestamp: RtmpTimestamp,
        composition_time_offset: i32,
    },

    /// An RTMP publisher has sent in new audio data
    NewAudioData {
        publisher: ConnectionId,
        codec: AudioCodec,
        is_sequence_header: bool,
        data: Bytes,
        timestamp: RtmpTimestamp,
    },
}

/// Messages the rtmp server endpoint will send to watcher registrants
#[derive(Debug)]
pub enum RtmpEndpointWatcherNotification {
    /// The request to register for watchers has failed.  No further messages will be sent
    /// afterwards.
    WatcherRegistrationFailed,

    /// The request to register for watchers was successful
    WatcherRegistrationSuccessful,

    /// Notification that a new RTMP connection has been made and they have requested to be a
    /// watcher on a stream key, but they require validation before being approved.
    WatcherRequiringApproval {
        /// Unique identifier for the TCP connection that's requesting to be a watcher
        connection_id: ConnectionId,

        /// The stream key that the connection is requesting to be a watcher of
        stream_key: String,

        /// Channel to send the approval or rejection response to
        response_channel: Sender<ValidationResponse>,
    },

    /// Notifies the registrant that at least one watcher is now watching on a particular
    /// stream key,
    StreamKeyBecameActive {
        stream_key: String,
        reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
    },

    /// Notifies the registrant that the last watcher has disconnected on the stream key, and
    /// there are no longer anyone watching
    StreamKeyBecameInactive { stream_key: String },
}

/// Message watcher registrants send to announce new media data that should be sent to watchers
#[derive(Debug)]
pub struct RtmpEndpointMediaMessage {
    pub stream_key: String,
    pub data: RtmpEndpointMediaData,
}

/// New media data that should be sent to watchers
#[derive(Debug, Clone, PartialEq)]
pub enum RtmpEndpointMediaData {
    NewStreamMetaData {
        metadata: StreamMetadata,
    },

    NewVideoData {
        codec: VideoCodec,
        is_keyframe: bool,
        is_sequence_header: bool,
        data: Bytes,
        timestamp: RtmpTimestamp,
        composition_time_offset: i32,
    },

    NewAudioData {
        codec: AudioCodec,
        is_sequence_header: bool,
        data: Bytes,
        timestamp: RtmpTimestamp,
    },
}