1use std::collections::HashMap;
7
8use crate::amf::AmfValue;
9use crate::media::{AacData, EnhancedAudioData, EnhancedVideoData, FlvTag, H264Data};
10use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
11use crate::session::{SessionContext, StreamContext};
12
13#[derive(Debug, Clone)]
15pub enum AuthResult {
16 Accept,
18
19 Reject(String),
21
22 Redirect { url: String },
24}
25
26impl AuthResult {
27 pub fn is_accept(&self) -> bool {
29 matches!(self, AuthResult::Accept)
30 }
31
32 pub fn is_reject(&self) -> bool {
34 matches!(self, AuthResult::Reject(_))
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum MediaDeliveryMode {
41 RawFlv,
43
44 ParsedFrames,
46
47 #[default]
49 Both,
50}
51
52pub trait RtmpHandler: Send + Sync + 'static {
87 fn on_connection(
92 &self,
93 _ctx: &SessionContext,
94 ) -> impl std::future::Future<Output = bool> + Send {
95 async { true }
96 }
97
98 fn on_handshake_complete(
100 &self,
101 _ctx: &SessionContext,
102 ) -> impl std::future::Future<Output = ()> + Send {
103 async {}
104 }
105
106 fn on_connect(
110 &self,
111 _ctx: &SessionContext,
112 _params: &ConnectParams,
113 ) -> impl std::future::Future<Output = AuthResult> + Send {
114 async { AuthResult::Accept }
115 }
116
117 fn on_fc_publish(
121 &self,
122 _ctx: &SessionContext,
123 _stream_key: &str,
124 ) -> impl std::future::Future<Output = AuthResult> + Send {
125 async { AuthResult::Accept }
126 }
127
128 fn on_publish(
132 &self,
133 _ctx: &SessionContext,
134 _params: &PublishParams,
135 ) -> impl std::future::Future<Output = AuthResult> + Send {
136 async { AuthResult::Accept }
137 }
138
139 fn on_play(
143 &self,
144 _ctx: &SessionContext,
145 _params: &PlayParams,
146 ) -> impl std::future::Future<Output = AuthResult> + Send {
147 async { AuthResult::Accept }
148 }
149
150 fn on_metadata(
152 &self,
153 _ctx: &StreamContext,
154 _metadata: &HashMap<String, AmfValue>,
155 ) -> impl std::future::Future<Output = ()> + Send {
156 async {}
157 }
158
159 fn on_media_tag(
163 &self,
164 _ctx: &StreamContext,
165 _tag: &FlvTag,
166 ) -> impl std::future::Future<Output = bool> + Send {
167 async { true }
168 }
169
170 fn on_video_frame(
172 &self,
173 _ctx: &StreamContext,
174 _frame: &H264Data,
175 _timestamp: u32,
176 ) -> impl std::future::Future<Output = ()> + Send {
177 async {}
178 }
179
180 fn on_audio_frame(
182 &self,
183 _ctx: &StreamContext,
184 _frame: &AacData,
185 _timestamp: u32,
186 ) -> impl std::future::Future<Output = ()> + Send {
187 async {}
188 }
189
190 fn on_keyframe(
192 &self,
193 _ctx: &StreamContext,
194 _timestamp: u32,
195 ) -> impl std::future::Future<Output = ()> + Send {
196 async {}
197 }
198
199 #[deprecated(since = "0.3.0", note = "Use on_unpublish instead")]
201 fn on_publish_stop(
202 &self,
203 _ctx: &StreamContext,
204 ) -> impl std::future::Future<Output = ()> + Send {
205 async {}
206 }
207
208 fn on_unpublish(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
210 async {}
211 }
212
213 fn on_play_stop(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
215 async {}
216 }
217
218 fn on_pause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
220 async {}
221 }
222
223 fn on_unpause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
225 async {}
226 }
227
228 fn on_disconnect(&self, _ctx: &SessionContext) -> impl std::future::Future<Output = ()> + Send {
230 async {}
231 }
232
233 fn media_delivery_mode(&self) -> MediaDeliveryMode {
235 MediaDeliveryMode::Both
236 }
237
238 fn on_stats_update(
240 &self,
241 _ctx: &SessionContext,
242 ) -> impl std::future::Future<Output = ()> + Send {
243 async {}
244 }
245
246 fn on_enhanced_video_frame(
258 &self,
259 _ctx: &StreamContext,
260 _frame: &EnhancedVideoData,
261 _timestamp: u32,
262 ) -> impl std::future::Future<Output = ()> + Send {
263 async {}
264 }
265
266 fn on_enhanced_audio_frame(
274 &self,
275 _ctx: &StreamContext,
276 _frame: &EnhancedAudioData,
277 _timestamp: u32,
278 ) -> impl std::future::Future<Output = ()> + Send {
279 async {}
280 }
281}
282
283pub struct LoggingHandler;
285
286impl RtmpHandler for LoggingHandler {
287 async fn on_connection(&self, ctx: &SessionContext) -> bool {
288 tracing::info!(
289 session_id = ctx.session_id,
290 peer = %ctx.peer_addr,
291 "New connection"
292 );
293 true
294 }
295
296 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
297 tracing::info!(
298 session_id = ctx.session_id,
299 app = %params.app,
300 "Connect request"
301 );
302 AuthResult::Accept
303 }
304
305 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
306 tracing::info!(
307 session_id = ctx.session_id,
308 stream_key = %params.stream_key,
309 "Publish request"
310 );
311 AuthResult::Accept
312 }
313
314 async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
315 tracing::debug!(
316 session_id = ctx.session.session_id,
317 stream_key = %ctx.stream_key,
318 keys = ?metadata.keys().collect::<Vec<_>>(),
319 "Received metadata"
320 );
321 }
322
323 async fn on_disconnect(&self, ctx: &SessionContext) {
324 tracing::info!(session_id = ctx.session_id, "Connection closed");
325 }
326}
327
328pub struct ChainedHandler<H1, H2> {
330 first: H1,
331 second: H2,
332}
333
334impl<H1, H2> ChainedHandler<H1, H2>
335where
336 H1: RtmpHandler,
337 H2: RtmpHandler,
338{
339 pub fn new(first: H1, second: H2) -> Self {
340 Self { first, second }
341 }
342}
343
344impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
345where
346 H1: RtmpHandler,
347 H2: RtmpHandler,
348{
349 async fn on_connection(&self, ctx: &SessionContext) -> bool {
350 self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
351 }
352
353 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
354 let result = self.first.on_connect(ctx, params).await;
355 if result.is_accept() {
356 self.second.on_connect(ctx, params).await
357 } else {
358 result
359 }
360 }
361
362 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
363 let result = self.first.on_publish(ctx, params).await;
364 if result.is_accept() {
365 self.second.on_publish(ctx, params).await
366 } else {
367 result
368 }
369 }
370
371 async fn on_disconnect(&self, ctx: &SessionContext) {
372 self.first.on_disconnect(ctx).await;
373 self.second.on_disconnect(ctx).await;
374 }
375}