rtmp_rs/server/
handler.rs

1//! RTMP handler trait
2//!
3//! The main extension point for RTMP applications. Implement this trait
4//! to handle connection events, authentication, and media data.
5
6use std::collections::HashMap;
7
8use crate::amf::AmfValue;
9use crate::media::{AacData, FlvTag, H264Data};
10use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
11use crate::session::{SessionContext, StreamContext};
12
13/// Result of authentication/authorization checks
14#[derive(Debug, Clone)]
15pub enum AuthResult {
16    /// Accept the request
17    Accept,
18
19    /// Reject the request with a reason
20    Reject(String),
21
22    /// Redirect to another URL
23    Redirect { url: String },
24}
25
26impl AuthResult {
27    /// Check if the result is Accept
28    pub fn is_accept(&self) -> bool {
29        matches!(self, AuthResult::Accept)
30    }
31
32    /// Check if the result is Reject
33    pub fn is_reject(&self) -> bool {
34        matches!(self, AuthResult::Reject(_))
35    }
36}
37
38/// Media delivery mode configuration
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum MediaDeliveryMode {
41    /// Deliver raw FLV tags (minimal parsing)
42    RawFlv,
43
44    /// Deliver parsed frames (H.264 NALUs, AAC frames)
45    ParsedFrames,
46
47    /// Deliver both raw tags and parsed frames
48    #[default]
49    Both,
50}
51
52/// Handler trait for RTMP applications
53///
54/// Implement this trait to customize RTMP server behavior. All methods
55/// have default implementations that accept/allow everything.
56///
57/// # Example
58///
59/// ```ignore
60/// use rtmp_rs::{RtmpHandler, AuthResult};
61/// use rtmp_rs::session::SessionContext;
62/// use rtmp_rs::protocol::message::{ConnectParams, PublishParams};
63///
64/// struct MyHandler;
65///
66/// impl RtmpHandler for MyHandler {
67///     async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
68///         // Validate application name
69///         if params.app == "live" {
70///             AuthResult::Accept
71///         } else {
72///             AuthResult::Reject("Unknown application".into())
73///         }
74///     }
75///
76///     async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
77///         // Validate stream key (e.g., check against database)
78///         if params.stream_key.starts_with("valid_") {
79///             AuthResult::Accept
80///         } else {
81///             AuthResult::Reject("Invalid stream key".into())
82///         }
83///     }
84/// }
85/// ```
86pub trait RtmpHandler: Send + Sync + 'static {
87    /// Called when a new TCP connection is established
88    ///
89    /// Return false to immediately close the connection.
90    /// Use this for IP-based rate limiting or blocklists.
91    fn on_connection(
92        &self,
93        _ctx: &SessionContext,
94    ) -> impl std::future::Future<Output = bool> + Send {
95        async { true }
96    }
97
98    /// Called after successful handshake, before connect command
99    fn on_handshake_complete(
100        &self,
101        _ctx: &SessionContext,
102    ) -> impl std::future::Future<Output = ()> + Send {
103        async {}
104    }
105
106    /// Called on RTMP 'connect' command
107    ///
108    /// Validate the application name, auth tokens in tcUrl, etc.
109    fn on_connect(
110        &self,
111        _ctx: &SessionContext,
112        _params: &ConnectParams,
113    ) -> impl std::future::Future<Output = AuthResult> + Send {
114        async { AuthResult::Accept }
115    }
116
117    /// Called on FCPublish command (OBS/Twitch compatibility)
118    ///
119    /// This is called before 'publish' and can be used for early stream key validation.
120    fn on_fc_publish(
121        &self,
122        _ctx: &SessionContext,
123        _stream_key: &str,
124    ) -> impl std::future::Future<Output = AuthResult> + Send {
125        async { AuthResult::Accept }
126    }
127
128    /// Called on 'publish' command
129    ///
130    /// Validate the stream key. This is the main authentication point for publishers.
131    fn on_publish(
132        &self,
133        _ctx: &SessionContext,
134        _params: &PublishParams,
135    ) -> impl std::future::Future<Output = AuthResult> + Send {
136        async { AuthResult::Accept }
137    }
138
139    /// Called on 'play' command
140    ///
141    /// Validate play access. Return Reject to deny playback.
142    fn on_play(
143        &self,
144        _ctx: &SessionContext,
145        _params: &PlayParams,
146    ) -> impl std::future::Future<Output = AuthResult> + Send {
147        async { AuthResult::Accept }
148    }
149
150    /// Called when stream metadata is received (@setDataFrame/onMetaData)
151    fn on_metadata(
152        &self,
153        _ctx: &StreamContext,
154        _metadata: &HashMap<String, AmfValue>,
155    ) -> impl std::future::Future<Output = ()> + Send {
156        async {}
157    }
158
159    /// Called for each raw FLV tag (when MediaDeliveryMode includes RawFlv)
160    ///
161    /// Return true to continue processing, false to drop the tag.
162    fn on_media_tag(
163        &self,
164        _ctx: &StreamContext,
165        _tag: &FlvTag,
166    ) -> impl std::future::Future<Output = bool> + Send {
167        async { true }
168    }
169
170    /// Called for each video frame (when MediaDeliveryMode includes ParsedFrames)
171    fn on_video_frame(
172        &self,
173        _ctx: &StreamContext,
174        _frame: &H264Data,
175        _timestamp: u32,
176    ) -> impl std::future::Future<Output = ()> + Send {
177        async {}
178    }
179
180    /// Called for each audio frame (when MediaDeliveryMode includes ParsedFrames)
181    fn on_audio_frame(
182        &self,
183        _ctx: &StreamContext,
184        _frame: &AacData,
185        _timestamp: u32,
186    ) -> impl std::future::Future<Output = ()> + Send {
187        async {}
188    }
189
190    /// Called when a keyframe is received
191    fn on_keyframe(
192        &self,
193        _ctx: &StreamContext,
194        _timestamp: u32,
195    ) -> impl std::future::Future<Output = ()> + Send {
196        async {}
197    }
198
199    /// Called when the publish stream ends
200    fn on_publish_stop(
201        &self,
202        _ctx: &StreamContext,
203    ) -> impl std::future::Future<Output = ()> + Send {
204        async {}
205    }
206
207    /// Called when the play stream ends
208    fn on_play_stop(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
209        async {}
210    }
211
212    /// Called when a subscriber pauses playback
213    fn on_pause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
214        async {}
215    }
216
217    /// Called when a subscriber resumes playback
218    fn on_unpause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
219        async {}
220    }
221
222    /// Called when the connection closes
223    fn on_disconnect(&self, _ctx: &SessionContext) -> impl std::future::Future<Output = ()> + Send {
224        async {}
225    }
226
227    /// Get the media delivery mode for this handler
228    fn media_delivery_mode(&self) -> MediaDeliveryMode {
229        MediaDeliveryMode::Both
230    }
231
232    /// Called periodically with stats update
233    fn on_stats_update(
234        &self,
235        _ctx: &SessionContext,
236    ) -> impl std::future::Future<Output = ()> + Send {
237        async {}
238    }
239}
240
241/// A simple handler that accepts all connections and logs events
242pub struct LoggingHandler;
243
244impl RtmpHandler for LoggingHandler {
245    async fn on_connection(&self, ctx: &SessionContext) -> bool {
246        tracing::info!(
247            session_id = ctx.session_id,
248            peer = %ctx.peer_addr,
249            "New connection"
250        );
251        true
252    }
253
254    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
255        tracing::info!(
256            session_id = ctx.session_id,
257            app = %params.app,
258            "Connect request"
259        );
260        AuthResult::Accept
261    }
262
263    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
264        tracing::info!(
265            session_id = ctx.session_id,
266            stream_key = %params.stream_key,
267            "Publish request"
268        );
269        AuthResult::Accept
270    }
271
272    async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
273        tracing::debug!(
274            session_id = ctx.session.session_id,
275            stream_key = %ctx.stream_key,
276            keys = ?metadata.keys().collect::<Vec<_>>(),
277            "Received metadata"
278        );
279    }
280
281    async fn on_disconnect(&self, ctx: &SessionContext) {
282        tracing::info!(session_id = ctx.session_id, "Connection closed");
283    }
284}
285
286/// A handler wrapper that chains multiple handlers
287pub struct ChainedHandler<H1, H2> {
288    first: H1,
289    second: H2,
290}
291
292impl<H1, H2> ChainedHandler<H1, H2>
293where
294    H1: RtmpHandler,
295    H2: RtmpHandler,
296{
297    pub fn new(first: H1, second: H2) -> Self {
298        Self { first, second }
299    }
300}
301
302impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
303where
304    H1: RtmpHandler,
305    H2: RtmpHandler,
306{
307    async fn on_connection(&self, ctx: &SessionContext) -> bool {
308        self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
309    }
310
311    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
312        let result = self.first.on_connect(ctx, params).await;
313        if result.is_accept() {
314            self.second.on_connect(ctx, params).await
315        } else {
316            result
317        }
318    }
319
320    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
321        let result = self.first.on_publish(ctx, params).await;
322        if result.is_accept() {
323            self.second.on_publish(ctx, params).await
324        } else {
325            result
326        }
327    }
328
329    async fn on_disconnect(&self, ctx: &SessionContext) {
330        self.first.on_disconnect(ctx).await;
331        self.second.on_disconnect(ctx).await;
332    }
333}