rtmp_rs/server/
handler.rs

1//! RTMP handler trait
2//!
3//! The main extension point for RTMP applications. Implement this trait
4//! to handle connection events, authentication, and media data.
5
6use async_trait::async_trait;
7use std::collections::HashMap;
8
9use crate::amf::AmfValue;
10use crate::media::{AacData, FlvTag, H264Data};
11use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
12use crate::session::{SessionContext, StreamContext};
13
14/// Result of authentication/authorization checks
15#[derive(Debug, Clone)]
16pub enum AuthResult {
17    /// Accept the request
18    Accept,
19
20    /// Reject the request with a reason
21    Reject(String),
22
23    /// Redirect to another URL
24    Redirect { url: String },
25}
26
27impl AuthResult {
28    /// Check if the result is Accept
29    pub fn is_accept(&self) -> bool {
30        matches!(self, AuthResult::Accept)
31    }
32
33    /// Check if the result is Reject
34    pub fn is_reject(&self) -> bool {
35        matches!(self, AuthResult::Reject(_))
36    }
37}
38
39/// Media delivery mode configuration
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum MediaDeliveryMode {
42    /// Deliver raw FLV tags (minimal parsing)
43    RawFlv,
44
45    /// Deliver parsed frames (H.264 NALUs, AAC frames)
46    ParsedFrames,
47
48    /// Deliver both raw tags and parsed frames
49    #[default]
50    Both,
51}
52
53/// Handler trait for RTMP applications
54///
55/// Implement this trait to customize RTMP server behavior. All methods
56/// have default implementations that accept/allow everything.
57///
58/// # Example
59///
60/// ```ignore
61/// use rtmp_rs::{RtmpHandler, AuthResult};
62/// use rtmp_rs::session::SessionContext;
63/// use rtmp_rs::protocol::message::{ConnectParams, PublishParams};
64///
65/// struct MyHandler;
66///
67/// #[async_trait::async_trait]
68/// impl RtmpHandler for MyHandler {
69///     async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
70///         // Validate application name
71///         if params.app == "live" {
72///             AuthResult::Accept
73///         } else {
74///             AuthResult::Reject("Unknown application".into())
75///         }
76///     }
77///
78///     async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
79///         // Validate stream key (e.g., check against database)
80///         if params.stream_key.starts_with("valid_") {
81///             AuthResult::Accept
82///         } else {
83///             AuthResult::Reject("Invalid stream key".into())
84///         }
85///     }
86/// }
87/// ```
88#[async_trait]
89pub trait RtmpHandler: Send + Sync + 'static {
90    /// Called when a new TCP connection is established
91    ///
92    /// Return false to immediately close the connection.
93    /// Use this for IP-based rate limiting or blocklists.
94    async fn on_connection(&self, _ctx: &SessionContext) -> bool {
95        true
96    }
97
98    /// Called after successful handshake, before connect command
99    async fn on_handshake_complete(&self, _ctx: &SessionContext) {}
100
101    /// Called on RTMP 'connect' command
102    ///
103    /// Validate the application name, auth tokens in tcUrl, etc.
104    async fn on_connect(&self, _ctx: &SessionContext, _params: &ConnectParams) -> AuthResult {
105        AuthResult::Accept
106    }
107
108    /// Called on FCPublish command (OBS/Twitch compatibility)
109    ///
110    /// This is called before 'publish' and can be used for early stream key validation.
111    async fn on_fc_publish(&self, _ctx: &SessionContext, _stream_key: &str) -> AuthResult {
112        AuthResult::Accept
113    }
114
115    /// Called on 'publish' command
116    ///
117    /// Validate the stream key. This is the main authentication point for publishers.
118    async fn on_publish(&self, _ctx: &SessionContext, _params: &PublishParams) -> AuthResult {
119        AuthResult::Accept
120    }
121
122    /// Called on 'play' command
123    ///
124    /// Validate play access. Return Reject to deny playback.
125    async fn on_play(&self, _ctx: &SessionContext, _params: &PlayParams) -> AuthResult {
126        AuthResult::Accept
127    }
128
129    /// Called when stream metadata is received (@setDataFrame/onMetaData)
130    async fn on_metadata(&self, _ctx: &StreamContext, _metadata: &HashMap<String, AmfValue>) {}
131
132    /// Called for each raw FLV tag (when MediaDeliveryMode includes RawFlv)
133    ///
134    /// Return true to continue processing, false to drop the tag.
135    async fn on_media_tag(&self, _ctx: &StreamContext, _tag: &FlvTag) -> bool {
136        true
137    }
138
139    /// Called for each video frame (when MediaDeliveryMode includes ParsedFrames)
140    async fn on_video_frame(&self, _ctx: &StreamContext, _frame: &H264Data, _timestamp: u32) {}
141
142    /// Called for each audio frame (when MediaDeliveryMode includes ParsedFrames)
143    async fn on_audio_frame(&self, _ctx: &StreamContext, _frame: &AacData, _timestamp: u32) {}
144
145    /// Called when a keyframe is received
146    async fn on_keyframe(&self, _ctx: &StreamContext, _timestamp: u32) {}
147
148    /// Called when the publish stream ends
149    async fn on_publish_stop(&self, _ctx: &StreamContext) {}
150
151    /// Called when the play stream ends
152    async fn on_play_stop(&self, _ctx: &StreamContext) {}
153
154    /// Called when a subscriber pauses playback
155    async fn on_pause(&self, _ctx: &StreamContext) {}
156
157    /// Called when a subscriber resumes playback
158    async fn on_unpause(&self, _ctx: &StreamContext) {}
159
160    /// Called when the connection closes
161    async fn on_disconnect(&self, _ctx: &SessionContext) {}
162
163    /// Get the media delivery mode for this handler
164    fn media_delivery_mode(&self) -> MediaDeliveryMode {
165        MediaDeliveryMode::Both
166    }
167
168    /// Called periodically with stats update
169    async fn on_stats_update(&self, _ctx: &SessionContext) {}
170}
171
172/// A simple handler that accepts all connections and logs events
173pub struct LoggingHandler;
174
175#[async_trait]
176impl RtmpHandler for LoggingHandler {
177    async fn on_connection(&self, ctx: &SessionContext) -> bool {
178        tracing::info!(
179            session_id = ctx.session_id,
180            peer = %ctx.peer_addr,
181            "New connection"
182        );
183        true
184    }
185
186    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
187        tracing::info!(
188            session_id = ctx.session_id,
189            app = %params.app,
190            "Connect request"
191        );
192        AuthResult::Accept
193    }
194
195    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
196        tracing::info!(
197            session_id = ctx.session_id,
198            stream_key = %params.stream_key,
199            "Publish request"
200        );
201        AuthResult::Accept
202    }
203
204    async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
205        tracing::debug!(
206            session_id = ctx.session.session_id,
207            stream_key = %ctx.stream_key,
208            keys = ?metadata.keys().collect::<Vec<_>>(),
209            "Received metadata"
210        );
211    }
212
213    async fn on_disconnect(&self, ctx: &SessionContext) {
214        tracing::info!(session_id = ctx.session_id, "Connection closed");
215    }
216}
217
218/// A handler wrapper that chains multiple handlers
219pub struct ChainedHandler<H1, H2> {
220    first: H1,
221    second: H2,
222}
223
224impl<H1, H2> ChainedHandler<H1, H2>
225where
226    H1: RtmpHandler,
227    H2: RtmpHandler,
228{
229    pub fn new(first: H1, second: H2) -> Self {
230        Self { first, second }
231    }
232}
233
234#[async_trait]
235impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
236where
237    H1: RtmpHandler,
238    H2: RtmpHandler,
239{
240    async fn on_connection(&self, ctx: &SessionContext) -> bool {
241        self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
242    }
243
244    async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
245        let result = self.first.on_connect(ctx, params).await;
246        if result.is_accept() {
247            self.second.on_connect(ctx, params).await
248        } else {
249            result
250        }
251    }
252
253    async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
254        let result = self.first.on_publish(ctx, params).await;
255        if result.is_accept() {
256            self.second.on_publish(ctx, params).await
257        } else {
258            result
259        }
260    }
261
262    async fn on_disconnect(&self, ctx: &SessionContext) {
263        self.first.on_disconnect(ctx).await;
264        self.second.on_disconnect(ctx).await;
265    }
266}