1use std::collections::HashMap;
7
8use crate::amf::AmfValue;
9use crate::media::{AacData, FlvTag, H264Data};
10use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
11use crate::session::{SessionContext, StreamContext};
12
13#[derive(Debug, Clone)]
15pub enum AuthResult {
16 Accept,
18
19 Reject(String),
21
22 Redirect { url: String },
24}
25
26impl AuthResult {
27 pub fn is_accept(&self) -> bool {
29 matches!(self, AuthResult::Accept)
30 }
31
32 pub fn is_reject(&self) -> bool {
34 matches!(self, AuthResult::Reject(_))
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum MediaDeliveryMode {
41 RawFlv,
43
44 ParsedFrames,
46
47 #[default]
49 Both,
50}
51
52pub trait RtmpHandler: Send + Sync + 'static {
87 fn on_connection(
92 &self,
93 _ctx: &SessionContext,
94 ) -> impl std::future::Future<Output = bool> + Send {
95 async { true }
96 }
97
98 fn on_handshake_complete(
100 &self,
101 _ctx: &SessionContext,
102 ) -> impl std::future::Future<Output = ()> + Send {
103 async {}
104 }
105
106 fn on_connect(
110 &self,
111 _ctx: &SessionContext,
112 _params: &ConnectParams,
113 ) -> impl std::future::Future<Output = AuthResult> + Send {
114 async { AuthResult::Accept }
115 }
116
117 fn on_fc_publish(
121 &self,
122 _ctx: &SessionContext,
123 _stream_key: &str,
124 ) -> impl std::future::Future<Output = AuthResult> + Send {
125 async { AuthResult::Accept }
126 }
127
128 fn on_publish(
132 &self,
133 _ctx: &SessionContext,
134 _params: &PublishParams,
135 ) -> impl std::future::Future<Output = AuthResult> + Send {
136 async { AuthResult::Accept }
137 }
138
139 fn on_play(
143 &self,
144 _ctx: &SessionContext,
145 _params: &PlayParams,
146 ) -> impl std::future::Future<Output = AuthResult> + Send {
147 async { AuthResult::Accept }
148 }
149
150 fn on_metadata(
152 &self,
153 _ctx: &StreamContext,
154 _metadata: &HashMap<String, AmfValue>,
155 ) -> impl std::future::Future<Output = ()> + Send {
156 async {}
157 }
158
159 fn on_media_tag(
163 &self,
164 _ctx: &StreamContext,
165 _tag: &FlvTag,
166 ) -> impl std::future::Future<Output = bool> + Send {
167 async { true }
168 }
169
170 fn on_video_frame(
172 &self,
173 _ctx: &StreamContext,
174 _frame: &H264Data,
175 _timestamp: u32,
176 ) -> impl std::future::Future<Output = ()> + Send {
177 async {}
178 }
179
180 fn on_audio_frame(
182 &self,
183 _ctx: &StreamContext,
184 _frame: &AacData,
185 _timestamp: u32,
186 ) -> impl std::future::Future<Output = ()> + Send {
187 async {}
188 }
189
190 fn on_keyframe(
192 &self,
193 _ctx: &StreamContext,
194 _timestamp: u32,
195 ) -> impl std::future::Future<Output = ()> + Send {
196 async {}
197 }
198
199 fn on_publish_stop(
201 &self,
202 _ctx: &StreamContext,
203 ) -> impl std::future::Future<Output = ()> + Send {
204 async {}
205 }
206
207 fn on_play_stop(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
209 async {}
210 }
211
212 fn on_pause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
214 async {}
215 }
216
217 fn on_unpause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
219 async {}
220 }
221
222 fn on_disconnect(&self, _ctx: &SessionContext) -> impl std::future::Future<Output = ()> + Send {
224 async {}
225 }
226
227 fn media_delivery_mode(&self) -> MediaDeliveryMode {
229 MediaDeliveryMode::Both
230 }
231
232 fn on_stats_update(
234 &self,
235 _ctx: &SessionContext,
236 ) -> impl std::future::Future<Output = ()> + Send {
237 async {}
238 }
239}
240
241pub struct LoggingHandler;
243
244impl RtmpHandler for LoggingHandler {
245 async fn on_connection(&self, ctx: &SessionContext) -> bool {
246 tracing::info!(
247 session_id = ctx.session_id,
248 peer = %ctx.peer_addr,
249 "New connection"
250 );
251 true
252 }
253
254 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
255 tracing::info!(
256 session_id = ctx.session_id,
257 app = %params.app,
258 "Connect request"
259 );
260 AuthResult::Accept
261 }
262
263 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
264 tracing::info!(
265 session_id = ctx.session_id,
266 stream_key = %params.stream_key,
267 "Publish request"
268 );
269 AuthResult::Accept
270 }
271
272 async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
273 tracing::debug!(
274 session_id = ctx.session.session_id,
275 stream_key = %ctx.stream_key,
276 keys = ?metadata.keys().collect::<Vec<_>>(),
277 "Received metadata"
278 );
279 }
280
281 async fn on_disconnect(&self, ctx: &SessionContext) {
282 tracing::info!(session_id = ctx.session_id, "Connection closed");
283 }
284}
285
286pub struct ChainedHandler<H1, H2> {
288 first: H1,
289 second: H2,
290}
291
292impl<H1, H2> ChainedHandler<H1, H2>
293where
294 H1: RtmpHandler,
295 H2: RtmpHandler,
296{
297 pub fn new(first: H1, second: H2) -> Self {
298 Self { first, second }
299 }
300}
301
302impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
303where
304 H1: RtmpHandler,
305 H2: RtmpHandler,
306{
307 async fn on_connection(&self, ctx: &SessionContext) -> bool {
308 self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
309 }
310
311 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
312 let result = self.first.on_connect(ctx, params).await;
313 if result.is_accept() {
314 self.second.on_connect(ctx, params).await
315 } else {
316 result
317 }
318 }
319
320 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
321 let result = self.first.on_publish(ctx, params).await;
322 if result.is_accept() {
323 self.second.on_publish(ctx, params).await
324 } else {
325 result
326 }
327 }
328
329 async fn on_disconnect(&self, ctx: &SessionContext) {
330 self.first.on_disconnect(ctx).await;
331 self.second.on_disconnect(ctx).await;
332 }
333}