rtmp_rs/server/
handler.rs

1//! RTMP handler trait
2//!
3//! The main extension point for RTMP applications. Implement this trait
4//! to handle connection events, authentication, and media data.
5
6use std::collections::HashMap;
7
8use crate::amf::AmfValue;
9use crate::media::{AacData, FlvTag, H264Data};
10use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
11use crate::session::{SessionContext, StreamContext};
12
13/// Result of authentication/authorization checks
14#[derive(Debug, Clone)]
15pub enum AuthResult {
16    /// Accept the request
17    Accept,
18
19    /// Reject the request with a reason
20    Reject(String),
21
22    /// Redirect to another URL
23    Redirect { url: String },
24}
25
26impl AuthResult {
27    /// Check if the result is Accept
28    pub fn is_accept(&self) -> bool {
29        matches!(self, AuthResult::Accept)
30    }
31
32    /// Check if the result is Reject
33    pub fn is_reject(&self) -> bool {
34        matches!(self, AuthResult::Reject(_))
35    }
36}
37
38/// Media delivery mode configuration
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum MediaDeliveryMode {
41    /// Deliver raw FLV tags (minimal parsing)
42    RawFlv,
43
44    /// Deliver parsed frames (H.264 NALUs, AAC frames)
45    ParsedFrames,
46
47    /// Deliver both raw tags and parsed frames
48    #[default]
49    Both,
50}
51
52/// Handler trait for RTMP applications
53///
54/// Implement this trait to customize RTMP server behavior. All methods
55/// have default implementations that accept/allow everything.
56///
57/// # Example
58///
59/// ```ignore
60/// use rtmp_rs::{RtmpHandler, AuthResult};
61/// use rtmp_rs::session::SessionContext;
62/// use rtmp_rs::protocol::message::{ConnectParams, PublishParams};
63///
64/// struct MyHandler;
65///
66/// impl RtmpHandler for MyHandler {
67///     async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
68///         // Validate application name
69///         if params.app == "live" {
70///             AuthResult::Accept
71///         } else {
72///             AuthResult::Reject("Unknown application".into())
73///         }
74///     }
75///
76///     async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
77///         // Validate stream key (e.g., check against database)
78///         if params.stream_key.starts_with("valid_") {
79///             AuthResult::Accept
80///         } else {
81///             AuthResult::Reject("Invalid stream key".into())
82///         }
83///     }
84/// }
85/// ```
86pub trait RtmpHandler: Send + Sync + 'static {
87    /// Called when a new TCP connection is established
88    ///
89    /// Return false to immediately close the connection.
90    /// Use this for IP-based rate limiting or blocklists.
91    fn on_connection(
92        &self,
93        _ctx: &SessionContext,
94    ) -> impl std::future::Future<Output = bool> + Send {
95        async { true }
96    }
97
98    /// Called after successful handshake, before connect command
99    fn on_handshake_complete(
100        &self,
101        _ctx: &SessionContext,
102    ) -> impl std::future::Future<Output = ()> + Send {
103        async {}
104    }
105
106    /// Called on RTMP 'connect' command
107    ///
108    /// Validate the application name, auth tokens in tcUrl, etc.
109    fn on_connect(
110        &self,
111        _ctx: &SessionContext,
112        _params: &ConnectParams,
113    ) -> impl std::future::Future<Output = AuthResult> + Send {
114        async { AuthResult::Accept }
115    }
116
117    /// Called on FCPublish command (OBS/Twitch compatibility)
118    ///
119    /// This is called before 'publish' and can be used for early stream key validation.
120    fn on_fc_publish(
121        &self,
122        _ctx: &SessionContext,
123        _stream_key: &str,
124    ) -> impl std::future::Future<Output = AuthResult> + Send {
125        async { AuthResult::Accept }
126    }
127
128    /// Called on 'publish' command
129    ///
130    /// Validate the stream key. This is the main authentication point for publishers.
131    fn on_publish(
132        &self,
133        _ctx: &SessionContext,
134        _params: &PublishParams,
135    ) -> impl std::future::Future<Output = AuthResult> + Send {
136        async { AuthResult::Accept }
137    }
138
139    /// Called on 'play' command
140    ///
141    /// Validate play access. Return Reject to deny playback.
142    fn on_play(
143        &self,
144        _ctx: &SessionContext,
145        _params: &PlayParams,
146    ) -> impl std::future::Future<Output = AuthResult> + Send {
147        async { AuthResult::Accept }
148    }
149
150    /// Called when stream metadata is received (@setDataFrame/onMetaData)
151    fn on_metadata(
152        &self,
153        _ctx: &StreamContext,
154        _metadata: &HashMap<String, AmfValue>,
155    ) -> impl std::future::Future<Output = ()> + Send {
156        async {}
157    }
158
159    /// Called for each raw FLV tag (when MediaDeliveryMode includes RawFlv)
160    ///
161    /// Return true to continue processing, false to drop the tag.
162    fn on_media_tag(
163        &self,
164        _ctx: &StreamContext,
165        _tag: &FlvTag,
166    ) -> impl std::future::Future<Output = bool> + Send {
167        async { true }
168    }
169
170    /// Called for each video frame (when MediaDeliveryMode includes ParsedFrames)
171    fn on_video_frame(
172        &self,
173        _ctx: &StreamContext,
174        _frame: &H264Data,
175        _timestamp: u32,
176    ) -> impl std::future::Future<Output = ()> + Send {
177        async {}
178    }
179
180    /// Called for each audio frame (when MediaDeliveryMode includes ParsedFrames)
181    fn on_audio_frame(
182        &self,
183        _ctx: &StreamContext,
184        _frame: &AacData,
185        _timestamp: u32,
186    ) -> impl std::future::Future<Output = ()> + Send {
187        async {}
188    }
189
190    /// Called when a keyframe is received
191    fn on_keyframe(
192        &self,
193        _ctx: &StreamContext,
194        _timestamp: u32,
195    ) -> impl std::future::Future<Output = ()> + Send {
196        async {}
197    }
198
199    /// Called when the publish stream ends
200    #[deprecated(since = "0.3.0", note = "Use on_unpublish instead")]
201    fn on_publish_stop(
202        &self,
203        _ctx: &StreamContext,
204    ) -> impl std::future::Future<Output = ()> + Send {
205        async {}
206    }
207
208    /// Called when the publish stream ends
209    fn on_unpublish(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
210        async {}
211    }
212
213    /// Called when the play stream ends
214    fn on_play_stop(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
215        async {}
216    }
217
218    /// Called when a subscriber pauses playback
219    fn on_pause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
220        async {}
221    }
222
223    /// Called when a subscriber resumes playback
224    fn on_unpause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
225        async {}
226    }
227
228    /// Called when the connection closes
229    fn on_disconnect(&self, _ctx: &SessionContext) -> impl std::future::Future<Output = ()> + Send {
230        async {}
231    }
232
233    /// Get the media delivery mode for this handler
234    fn media_delivery_mode(&self) -> MediaDeliveryMode {
235        MediaDeliveryMode::Both
236    }
237
238    /// Called periodically with stats update
239    fn on_stats_update(
240        &self,
241        _ctx: &SessionContext,
242    ) -> impl std::future::Future<Output = ()> + Send {
243        async {}
244    }
245}
246
247/// A simple handler that accepts all connections and logs events
248pub struct LoggingHandler;
249
250impl RtmpHandler for LoggingHandler {
251    async fn on_connection(&self, ctx: &SessionContext) -> bool {
252        tracing::info!(
253            session_id = ctx.session_id,
254            peer = %ctx.peer_addr,
255            "New connection"
256        );
257        true
258    }
259
260    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
261        tracing::info!(
262            session_id = ctx.session_id,
263            app = %params.app,
264            "Connect request"
265        );
266        AuthResult::Accept
267    }
268
269    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
270        tracing::info!(
271            session_id = ctx.session_id,
272            stream_key = %params.stream_key,
273            "Publish request"
274        );
275        AuthResult::Accept
276    }
277
278    async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
279        tracing::debug!(
280            session_id = ctx.session.session_id,
281            stream_key = %ctx.stream_key,
282            keys = ?metadata.keys().collect::<Vec<_>>(),
283            "Received metadata"
284        );
285    }
286
287    async fn on_disconnect(&self, ctx: &SessionContext) {
288        tracing::info!(session_id = ctx.session_id, "Connection closed");
289    }
290}
291
292/// A handler wrapper that chains multiple handlers
293pub struct ChainedHandler<H1, H2> {
294    first: H1,
295    second: H2,
296}
297
298impl<H1, H2> ChainedHandler<H1, H2>
299where
300    H1: RtmpHandler,
301    H2: RtmpHandler,
302{
303    pub fn new(first: H1, second: H2) -> Self {
304        Self { first, second }
305    }
306}
307
308impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
309where
310    H1: RtmpHandler,
311    H2: RtmpHandler,
312{
313    async fn on_connection(&self, ctx: &SessionContext) -> bool {
314        self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
315    }
316
317    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
318        let result = self.first.on_connect(ctx, params).await;
319        if result.is_accept() {
320            self.second.on_connect(ctx, params).await
321        } else {
322            result
323        }
324    }
325
326    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
327        let result = self.first.on_publish(ctx, params).await;
328        if result.is_accept() {
329            self.second.on_publish(ctx, params).await
330        } else {
331            result
332        }
333    }
334
335    async fn on_disconnect(&self, ctx: &SessionContext) {
336        self.first.on_disconnect(ctx).await;
337        self.second.on_disconnect(ctx).await;
338    }
339}