axum_cometd/handlers/
subscribe.rs1use crate::{
2 error::HandlerResult, messages::Message, types::Event, CheckExt, CookieJarExt,
3 LongPollingServiceContext, ZERO_CLIENT_ID,
4};
5use axum::{
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 Extension, Json,
9};
10use axum_extra::extract::CookieJar;
11use std::sync::Arc;
12
13pub(crate) async fn subscribe<AdditionalData, CustomData>(
14 State(context): State<Arc<LongPollingServiceContext<AdditionalData, CustomData>>>,
15 Extension(data): Extension<AdditionalData>,
16 headers: HeaderMap,
17 jar: CookieJar,
18 Json([message]): Json<[Message; 1]>,
19) -> HandlerResult<Json<[Message; 1]>>
20where
21 AdditionalData: Send + Sync + 'static,
22 CustomData: Send + Sync + 'static,
23{
24 tracing::info!(
25 channel = "/meta/subscribe",
26 request_id = message.id.as_deref().unwrap_or("empty"),
27 client_id = %message.client_id.unwrap_or(ZERO_CLIENT_ID),
28 "Got subscribe request: `{message:?}`."
29 );
30
31 let Message {
32 id,
33 channel,
34 subscription,
35 client_id,
36 ..
37 } = message;
38
39 let session_unknown = || Message::session_unknown(id.clone(), channel.clone(), None);
40
41 channel.check_or("/meta/subscribe", session_unknown)?;
42
43 let cookie_id = jar.get_cookie_id().ok_or_else(session_unknown)?;
44 let client_id = client_id.ok_or_else(session_unknown)?;
45 context
46 .check_client(cookie_id, &client_id)
47 .await
48 .ok_or_else(session_unknown)?;
49
50 let subscription = subscription.ok_or_else(|| Message::subscription_missing(id.clone()))?;
51 subscription
52 .is_empty()
53 .check_or(&false, || Message::subscription_missing(id.clone()))?;
54
55 subscription.iter().try_for_each(|name| {
56 context
57 .channel_name_validator
58 .validate_subscribe_channel_name(name)
59 .check(&true, StatusCode::BAD_REQUEST)
60 })?;
61
62 context.subscribe(client_id, &subscription).await;
63
64 let _ = context
65 .tx
66 .broadcast(Arc::new(Event::Subscribe {
67 client_id,
68 headers,
69 channels: subscription.clone(),
70 data,
71 }))
72 .await;
73
74 Ok(Json([Message {
75 subscription: Some(subscription),
76 ..Message::ok(id, channel)
77 }]))
78}