1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
//! This endpoint acts as a server for RTMP clients that want to publish or watch RTMP live streams.
//! Workflow steps send a message requesting to allow RTMP publishers or watchers for specific
//! port, RTMP application and stream key combinations. The RTMP server endpoint will register the
//! specified port with the networking infrastructure for listening for connections, and any
//! networked traffic over that port will be forwarded to this endpoint.
//!
//! It will then perform handshaking and all other RTMP protocol actions, disconnecting clients if
//! they don't conform to the RTMP protocol correctly, or if they attempt to publish or watch an
//! application name and stream key combination that isn't actively registered.
//!
//! Incoming publish actions (such as new metadata, media packets, etc...) are passed to the workflow
//! steps that were registered for that application/stream key combination. Likewise, when the
//! endpoint receives media from workflow steps it will route that media to the correct RTMP watcher
//! clients
mod actor;
use crate::codecs::{AudioCodec, VideoCodec};
use crate::net::tcp::TcpSocketRequest;
use crate::net::{ConnectionId, IpAddress};
use crate::reactors::ReactorWorkflowUpdate;
use crate::StreamId;
use actor::actor_types::RtmpServerEndpointActor;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use rml_rtmp::sessions::StreamMetadata;
use rml_rtmp::time::RtmpTimestamp;
use std::collections::HashMap;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender;
/// Starts a new RTMP server endpoint, returning a channel that can be used to send notifications
/// and requests to it.
pub fn start_rtmp_server_endpoint(
socket_request_sender: UnboundedSender<TcpSocketRequest>,
) -> UnboundedSender<RtmpEndpointRequest> {
let (endpoint_sender, endpoint_receiver) = unbounded_channel();
let endpoint = RtmpServerEndpointActor {
futures: FuturesUnordered::new(),
ports: HashMap::new(),
};
tokio::spawn(endpoint.run(endpoint_receiver, socket_request_sender));
endpoint_sender
}
/// Specifies how a stream key should be registered for playback or publishing
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub enum StreamKeyRegistration {
/// All stream keys for the the rtmp application should be registered
Any,
/// Only set up registration for the exact stream key
Exact(String),
}
/// Specifies if there are any IP address restrictions as part of an RTMP server registration
#[derive(Debug, PartialEq)]
pub enum IpRestriction {
/// All IP addresses are allowed
None,
/// Only the specified IP addresses are allowed.
Allow(Vec<IpAddress>),
/// All IP addresses are allowed except for the ones specified.
Deny(Vec<IpAddress>),
}
/// Type of registration the request is related to
#[derive(Debug)]
pub enum RegistrationType {
Publisher,
Watcher,
}
/// Operations the rtmp server endpoint is being requested to make
#[derive(Debug)]
pub enum RtmpEndpointRequest {
/// Requests the RTMP server to allow publishers on the given port, app, and stream key
/// combinations.
ListenForPublishers {
/// Port to listen for RTMP publisher connections on
port: u16,
/// Name of the RTMP application publishers will connect to
rtmp_app: String,
/// What stream key publishers should be using
rtmp_stream_key: StreamKeyRegistration,
/// Channel that the rtmp server endpoint should respond with
message_channel: UnboundedSender<RtmpEndpointPublisherMessage>,
/// If specified, new media streams being published from this registration will be given
/// the stream id specified. If no id is given than one will be generated. This is useful
/// to correlate media streams that may have been pulled, processed externally, then brought
/// back in for later workflow steps (e.g. an external transcoding workflow).
stream_id: Option<StreamId>,
/// What IP restriction rules should be in place for this registration
ip_restrictions: IpRestriction,
/// If true, this port should be on a TLS socket (i.e. RTMPS)
use_tls: bool,
/// If true, then publishers will not be automatically accepted even if they connect to
/// the correct app/stream key combination and pass ip restrictions. Instead the registrant
/// should be asked for final verification if the publisher should be allowed or not.
requires_registrant_approval: bool,
},
/// Requests the RTMP server to allow clients to receive video on the given port, app,
/// and stream key combinations
ListenForWatchers {
/// Port to listen on
port: u16,
/// Name of the RTMP application playback clients will connect to
rtmp_app: String,
/// Stream keys clients can receive video on
rtmp_stream_key: StreamKeyRegistration,
/// The channel that the rtmp server endpoint will send notifications to
notification_channel: UnboundedSender<RtmpEndpointWatcherNotification>,
/// The channel that the registrant will send updated media data to the rtmp endpoint on
media_channel: UnboundedReceiver<RtmpEndpointMediaMessage>,
/// What IP restriction rules should be in place for this registration
ip_restrictions: IpRestriction,
/// If true, this port should be on a TLS socket (i.e. RTMPS)
use_tls: bool,
/// If true, then watchers will not be automatically accepted even if they connect to
/// the correct app/stream key combination and pass ip restrictions. Instead the registrant
/// should be asked for final verification if the watcher should be allowed or not.
requires_registrant_approval: bool,
},
/// Requests the specified registration should be removed
RemoveRegistration {
/// The type of registration that is being removed
registration_type: RegistrationType,
/// Port the removed registrant was listening on
port: u16,
/// The RTMP application name that the registrant was listening on
rtmp_app: String,
/// The stream key the registrant had registered for
rtmp_stream_key: StreamKeyRegistration,
},
}
/// Response to approval/validation requests
#[derive(Debug)]
pub enum ValidationResponse {
Approve {
reactor_update_channel: UnboundedReceiver<ReactorWorkflowUpdate>,
},
Reject,
}
/// Messages the rtmp server endpoint will send to publisher registrants.
#[derive(Debug)]
pub enum RtmpEndpointPublisherMessage {
/// Notification that the publisher registration failed. No further messages will be sent
/// if this is sent.
PublisherRegistrationFailed,
/// Notification that the publisher registration succeeded.
PublisherRegistrationSuccessful,
/// Notification that a new RTMP connection has been made and they have requested to be a
/// publisher on a stream key, but they require validation before being approved.
PublisherRequiringApproval {
/// Unique identifier for the TCP connection that's requesting to be a publisher
connection_id: ConnectionId,
/// The stream key that the connection is requesting to be a publisher to
stream_key: String,
/// Channel to send the approval or rejection response to
response_channel: Sender<ValidationResponse>,
},
/// Notification that a new RTMP connection has been made and is publishing media
NewPublisherConnected {
/// Unique identifier for the TCP connection that's publishing
connection_id: ConnectionId,
/// Unique identifier for the stream.
stream_id: StreamId,
/// Actual stream key that this stream is coming in from. Mostly used if the registrant
/// specified that Any stream key would be allowed.
stream_key: String,
/// If provided, this is a channel which will receive workflow updates from a reactor
/// tied to this publisher
reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
},
/// Notification that a publisher has stopped publishing. It may still be connected to the
/// server, but it is no longer in a publisher state.
PublishingStopped {
/// Unique identifier for the TCP connection that stopped publishing
connection_id: ConnectionId,
},
/// An RTMP publisher has sent in new stream metadata information
StreamMetadataChanged {
publisher: ConnectionId,
metadata: StreamMetadata,
},
/// An RTMP publisher has sent in new video data
NewVideoData {
publisher: ConnectionId,
codec: VideoCodec,
is_keyframe: bool,
is_sequence_header: bool,
data: Bytes,
timestamp: RtmpTimestamp,
composition_time_offset: i32,
},
/// An RTMP publisher has sent in new audio data
NewAudioData {
publisher: ConnectionId,
codec: AudioCodec,
is_sequence_header: bool,
data: Bytes,
timestamp: RtmpTimestamp,
},
}
/// Messages the rtmp server endpoint will send to watcher registrants
#[derive(Debug)]
pub enum RtmpEndpointWatcherNotification {
/// The request to register for watchers has failed. No further messages will be sent
/// afterwards.
WatcherRegistrationFailed,
/// The request to register for watchers was successful
WatcherRegistrationSuccessful,
/// Notification that a new RTMP connection has been made and they have requested to be a
/// watcher on a stream key, but they require validation before being approved.
WatcherRequiringApproval {
/// Unique identifier for the TCP connection that's requesting to be a watcher
connection_id: ConnectionId,
/// The stream key that the connection is requesting to be a watcher of
stream_key: String,
/// Channel to send the approval or rejection response to
response_channel: Sender<ValidationResponse>,
},
/// Notifies the registrant that at least one watcher is now watching on a particular
/// stream key,
StreamKeyBecameActive {
stream_key: String,
reactor_update_channel: Option<UnboundedReceiver<ReactorWorkflowUpdate>>,
},
/// Notifies the registrant that the last watcher has disconnected on the stream key, and
/// there are no longer anyone watching
StreamKeyBecameInactive { stream_key: String },
}
/// Message watcher registrants send to announce new media data that should be sent to watchers
#[derive(Debug)]
pub struct RtmpEndpointMediaMessage {
pub stream_key: String,
pub data: RtmpEndpointMediaData,
}
/// New media data that should be sent to watchers
#[derive(Debug, Clone, PartialEq)]
pub enum RtmpEndpointMediaData {
NewStreamMetaData {
metadata: StreamMetadata,
},
NewVideoData {
codec: VideoCodec,
is_keyframe: bool,
is_sequence_header: bool,
data: Bytes,
timestamp: RtmpTimestamp,
composition_time_offset: i32,
},
NewAudioData {
codec: AudioCodec,
is_sequence_header: bool,
data: Bytes,
timestamp: RtmpTimestamp,
},
}