1use async_trait::async_trait;
7use std::collections::HashMap;
8
9use crate::amf::AmfValue;
10use crate::media::{AacData, FlvTag, H264Data};
11use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
12use crate::session::{SessionContext, StreamContext};
13
14#[derive(Debug, Clone)]
16pub enum AuthResult {
17 Accept,
19
20 Reject(String),
22
23 Redirect { url: String },
25}
26
27impl AuthResult {
28 pub fn is_accept(&self) -> bool {
30 matches!(self, AuthResult::Accept)
31 }
32
33 pub fn is_reject(&self) -> bool {
35 matches!(self, AuthResult::Reject(_))
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum MediaDeliveryMode {
42 RawFlv,
44
45 ParsedFrames,
47
48 #[default]
50 Both,
51}
52
53#[async_trait]
89pub trait RtmpHandler: Send + Sync + 'static {
90 async fn on_connection(&self, _ctx: &SessionContext) -> bool {
95 true
96 }
97
98 async fn on_handshake_complete(&self, _ctx: &SessionContext) {}
100
101 async fn on_connect(&self, _ctx: &SessionContext, _params: &ConnectParams) -> AuthResult {
105 AuthResult::Accept
106 }
107
108 async fn on_fc_publish(&self, _ctx: &SessionContext, _stream_key: &str) -> AuthResult {
112 AuthResult::Accept
113 }
114
115 async fn on_publish(&self, _ctx: &SessionContext, _params: &PublishParams) -> AuthResult {
119 AuthResult::Accept
120 }
121
122 async fn on_play(&self, _ctx: &SessionContext, _params: &PlayParams) -> AuthResult {
126 AuthResult::Accept
127 }
128
129 async fn on_metadata(&self, _ctx: &StreamContext, _metadata: &HashMap<String, AmfValue>) {}
131
132 async fn on_media_tag(&self, _ctx: &StreamContext, _tag: &FlvTag) -> bool {
136 true
137 }
138
139 async fn on_video_frame(&self, _ctx: &StreamContext, _frame: &H264Data, _timestamp: u32) {}
141
142 async fn on_audio_frame(&self, _ctx: &StreamContext, _frame: &AacData, _timestamp: u32) {}
144
145 async fn on_keyframe(&self, _ctx: &StreamContext, _timestamp: u32) {}
147
148 async fn on_publish_stop(&self, _ctx: &StreamContext) {}
150
151 async fn on_play_stop(&self, _ctx: &StreamContext) {}
153
154 async fn on_pause(&self, _ctx: &StreamContext) {}
156
157 async fn on_unpause(&self, _ctx: &StreamContext) {}
159
160 async fn on_disconnect(&self, _ctx: &SessionContext) {}
162
163 fn media_delivery_mode(&self) -> MediaDeliveryMode {
165 MediaDeliveryMode::Both
166 }
167
168 async fn on_stats_update(&self, _ctx: &SessionContext) {}
170}
171
172pub struct LoggingHandler;
174
175#[async_trait]
176impl RtmpHandler for LoggingHandler {
177 async fn on_connection(&self, ctx: &SessionContext) -> bool {
178 tracing::info!(
179 session_id = ctx.session_id,
180 peer = %ctx.peer_addr,
181 "New connection"
182 );
183 true
184 }
185
186 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
187 tracing::info!(
188 session_id = ctx.session_id,
189 app = %params.app,
190 "Connect request"
191 );
192 AuthResult::Accept
193 }
194
195 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
196 tracing::info!(
197 session_id = ctx.session_id,
198 stream_key = %params.stream_key,
199 "Publish request"
200 );
201 AuthResult::Accept
202 }
203
204 async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
205 tracing::debug!(
206 session_id = ctx.session.session_id,
207 stream_key = %ctx.stream_key,
208 keys = ?metadata.keys().collect::<Vec<_>>(),
209 "Received metadata"
210 );
211 }
212
213 async fn on_disconnect(&self, ctx: &SessionContext) {
214 tracing::info!(session_id = ctx.session_id, "Connection closed");
215 }
216}
217
218pub struct ChainedHandler<H1, H2> {
220 first: H1,
221 second: H2,
222}
223
224impl<H1, H2> ChainedHandler<H1, H2>
225where
226 H1: RtmpHandler,
227 H2: RtmpHandler,
228{
229 pub fn new(first: H1, second: H2) -> Self {
230 Self { first, second }
231 }
232}
233
234#[async_trait]
235impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
236where
237 H1: RtmpHandler,
238 H2: RtmpHandler,
239{
240 async fn on_connection(&self, ctx: &SessionContext) -> bool {
241 self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
242 }
243
244 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
245 let result = self.first.on_connect(ctx, params).await;
246 if result.is_accept() {
247 self.second.on_connect(ctx, params).await
248 } else {
249 result
250 }
251 }
252
253 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
254 let result = self.first.on_publish(ctx, params).await;
255 if result.is_accept() {
256 self.second.on_publish(ctx, params).await
257 } else {
258 result
259 }
260 }
261
262 async fn on_disconnect(&self, ctx: &SessionContext) {
263 self.first.on_disconnect(ctx).await;
264 self.second.on_disconnect(ctx).await;
265 }
266}