1use std::collections::HashMap;
7
8use crate::amf::AmfValue;
9use crate::media::{AacData, FlvTag, H264Data};
10use crate::protocol::message::{ConnectParams, PlayParams, PublishParams};
11use crate::session::{SessionContext, StreamContext};
12
13#[derive(Debug, Clone)]
15pub enum AuthResult {
16 Accept,
18
19 Reject(String),
21
22 Redirect { url: String },
24}
25
26impl AuthResult {
27 pub fn is_accept(&self) -> bool {
29 matches!(self, AuthResult::Accept)
30 }
31
32 pub fn is_reject(&self) -> bool {
34 matches!(self, AuthResult::Reject(_))
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum MediaDeliveryMode {
41 RawFlv,
43
44 ParsedFrames,
46
47 #[default]
49 Both,
50}
51
52pub trait RtmpHandler: Send + Sync + 'static {
87 fn on_connection(
92 &self,
93 _ctx: &SessionContext,
94 ) -> impl std::future::Future<Output = bool> + Send {
95 async { true }
96 }
97
98 fn on_handshake_complete(
100 &self,
101 _ctx: &SessionContext,
102 ) -> impl std::future::Future<Output = ()> + Send {
103 async {}
104 }
105
106 fn on_connect(
110 &self,
111 _ctx: &SessionContext,
112 _params: &ConnectParams,
113 ) -> impl std::future::Future<Output = AuthResult> + Send {
114 async { AuthResult::Accept }
115 }
116
117 fn on_fc_publish(
121 &self,
122 _ctx: &SessionContext,
123 _stream_key: &str,
124 ) -> impl std::future::Future<Output = AuthResult> + Send {
125 async { AuthResult::Accept }
126 }
127
128 fn on_publish(
132 &self,
133 _ctx: &SessionContext,
134 _params: &PublishParams,
135 ) -> impl std::future::Future<Output = AuthResult> + Send {
136 async { AuthResult::Accept }
137 }
138
139 fn on_play(
143 &self,
144 _ctx: &SessionContext,
145 _params: &PlayParams,
146 ) -> impl std::future::Future<Output = AuthResult> + Send {
147 async { AuthResult::Accept }
148 }
149
150 fn on_metadata(
152 &self,
153 _ctx: &StreamContext,
154 _metadata: &HashMap<String, AmfValue>,
155 ) -> impl std::future::Future<Output = ()> + Send {
156 async {}
157 }
158
159 fn on_media_tag(
163 &self,
164 _ctx: &StreamContext,
165 _tag: &FlvTag,
166 ) -> impl std::future::Future<Output = bool> + Send {
167 async { true }
168 }
169
170 fn on_video_frame(
172 &self,
173 _ctx: &StreamContext,
174 _frame: &H264Data,
175 _timestamp: u32,
176 ) -> impl std::future::Future<Output = ()> + Send {
177 async {}
178 }
179
180 fn on_audio_frame(
182 &self,
183 _ctx: &StreamContext,
184 _frame: &AacData,
185 _timestamp: u32,
186 ) -> impl std::future::Future<Output = ()> + Send {
187 async {}
188 }
189
190 fn on_keyframe(
192 &self,
193 _ctx: &StreamContext,
194 _timestamp: u32,
195 ) -> impl std::future::Future<Output = ()> + Send {
196 async {}
197 }
198
199 #[deprecated(since = "0.3.0", note = "Use on_unpublish instead")]
201 fn on_publish_stop(
202 &self,
203 _ctx: &StreamContext,
204 ) -> impl std::future::Future<Output = ()> + Send {
205 async {}
206 }
207
208 fn on_unpublish(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
210 async {}
211 }
212
213 fn on_play_stop(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
215 async {}
216 }
217
218 fn on_pause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
220 async {}
221 }
222
223 fn on_unpause(&self, _ctx: &StreamContext) -> impl std::future::Future<Output = ()> + Send {
225 async {}
226 }
227
228 fn on_disconnect(&self, _ctx: &SessionContext) -> impl std::future::Future<Output = ()> + Send {
230 async {}
231 }
232
233 fn media_delivery_mode(&self) -> MediaDeliveryMode {
235 MediaDeliveryMode::Both
236 }
237
238 fn on_stats_update(
240 &self,
241 _ctx: &SessionContext,
242 ) -> impl std::future::Future<Output = ()> + Send {
243 async {}
244 }
245}
246
247pub struct LoggingHandler;
249
250impl RtmpHandler for LoggingHandler {
251 async fn on_connection(&self, ctx: &SessionContext) -> bool {
252 tracing::info!(
253 session_id = ctx.session_id,
254 peer = %ctx.peer_addr,
255 "New connection"
256 );
257 true
258 }
259
260 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
261 tracing::info!(
262 session_id = ctx.session_id,
263 app = %params.app,
264 "Connect request"
265 );
266 AuthResult::Accept
267 }
268
269 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
270 tracing::info!(
271 session_id = ctx.session_id,
272 stream_key = %params.stream_key,
273 "Publish request"
274 );
275 AuthResult::Accept
276 }
277
278 async fn on_metadata(&self, ctx: &StreamContext, metadata: &HashMap<String, AmfValue>) {
279 tracing::debug!(
280 session_id = ctx.session.session_id,
281 stream_key = %ctx.stream_key,
282 keys = ?metadata.keys().collect::<Vec<_>>(),
283 "Received metadata"
284 );
285 }
286
287 async fn on_disconnect(&self, ctx: &SessionContext) {
288 tracing::info!(session_id = ctx.session_id, "Connection closed");
289 }
290}
291
292pub struct ChainedHandler<H1, H2> {
294 first: H1,
295 second: H2,
296}
297
298impl<H1, H2> ChainedHandler<H1, H2>
299where
300 H1: RtmpHandler,
301 H2: RtmpHandler,
302{
303 pub fn new(first: H1, second: H2) -> Self {
304 Self { first, second }
305 }
306}
307
308impl<H1, H2> RtmpHandler for ChainedHandler<H1, H2>
309where
310 H1: RtmpHandler,
311 H2: RtmpHandler,
312{
313 async fn on_connection(&self, ctx: &SessionContext) -> bool {
314 self.first.on_connection(ctx).await && self.second.on_connection(ctx).await
315 }
316
317 async fn on_connect(&self, ctx: &SessionContext, params: &ConnectParams) -> AuthResult {
318 let result = self.first.on_connect(ctx, params).await;
319 if result.is_accept() {
320 self.second.on_connect(ctx, params).await
321 } else {
322 result
323 }
324 }
325
326 async fn on_publish(&self, ctx: &SessionContext, params: &PublishParams) -> AuthResult {
327 let result = self.first.on_publish(ctx, params).await;
328 if result.is_accept() {
329 self.second.on_publish(ctx, params).await
330 } else {
331 result
332 }
333 }
334
335 async fn on_disconnect(&self, ctx: &SessionContext) {
336 self.first.on_disconnect(ctx).await;
337 self.second.on_disconnect(ctx).await;
338 }
339}