axum_cometd/handlers/
subscribe.rs

1use crate::{
2    error::HandlerResult, messages::Message, types::Event, CheckExt, CookieJarExt,
3    LongPollingServiceContext, ZERO_CLIENT_ID,
4};
5use axum::{
6    extract::State,
7    http::{HeaderMap, StatusCode},
8    Extension, Json,
9};
10use axum_extra::extract::CookieJar;
11use std::sync::Arc;
12
13pub(crate) async fn subscribe<AdditionalData, CustomData>(
14    State(context): State<Arc<LongPollingServiceContext<AdditionalData, CustomData>>>,
15    Extension(data): Extension<AdditionalData>,
16    headers: HeaderMap,
17    jar: CookieJar,
18    Json([message]): Json<[Message; 1]>,
19) -> HandlerResult<Json<[Message; 1]>>
20where
21    AdditionalData: Send + Sync + 'static,
22    CustomData: Send + Sync + 'static,
23{
24    tracing::info!(
25        channel = "/meta/subscribe",
26        request_id = message.id.as_deref().unwrap_or("empty"),
27        client_id = %message.client_id.unwrap_or(ZERO_CLIENT_ID),
28        "Got subscribe request: `{message:?}`."
29    );
30
31    let Message {
32        id,
33        channel,
34        subscription,
35        client_id,
36        ..
37    } = message;
38
39    let session_unknown = || Message::session_unknown(id.clone(), channel.clone(), None);
40
41    channel.check_or("/meta/subscribe", session_unknown)?;
42
43    let cookie_id = jar.get_cookie_id().ok_or_else(session_unknown)?;
44    let client_id = client_id.ok_or_else(session_unknown)?;
45    context
46        .check_client(cookie_id, &client_id)
47        .await
48        .ok_or_else(session_unknown)?;
49
50    let subscription = subscription.ok_or_else(|| Message::subscription_missing(id.clone()))?;
51    subscription
52        .is_empty()
53        .check_or(&false, || Message::subscription_missing(id.clone()))?;
54
55    subscription.iter().try_for_each(|name| {
56        context
57            .channel_name_validator
58            .validate_subscribe_channel_name(name)
59            .check(&true, StatusCode::BAD_REQUEST)
60    })?;
61
62    context.subscribe(client_id, &subscription).await;
63
64    let _ = context
65        .tx
66        .broadcast(Arc::new(Event::Subscribe {
67            client_id,
68            headers,
69            channels: subscription.clone(),
70            data,
71        }))
72        .await;
73
74    Ok(Json([Message {
75        subscription: Some(subscription),
76        ..Message::ok(id, channel)
77    }]))
78}