1mod builder;
2mod connect;
3mod disconnect;
4mod handshake;
5mod subscribe;
6
7pub use builder::*;
8
9use crate::{ext::CookieJarExt as _, types::*, ArcSwapOptionExt};
10use arc_swap::ArcSwapOption;
11use cookie::{Cookie, CookieJar};
12use core::{
13 sync::atomic::{AtomicUsize, Ordering},
14 time::Duration,
15};
16use hyper::{client::HttpConnector, header::SET_COOKIE, http::HeaderValue, Client, HeaderMap, Uri};
17use serde::Serialize;
18use serde_json::json;
19use std::sync::{Mutex, PoisonError};
20
21#[derive(Debug)]
23pub struct CometdClient<Msg> {
24 cmd_tx: CmdSender,
25 inactive_event_rx: InactiveEventReceiver<Msg>,
26}
27
28#[derive(Debug)]
29pub(crate) struct CometdClientInner {
30 handshake_endpoint: Uri,
31 subscribe_endpoint: Uri,
32 connect_endpoint: Uri,
33 disconnect_endpoint: Uri,
34 timeout: Duration,
35 interval: Duration,
36 number_of_retries: usize,
37
38 id: AtomicUsize,
39 pub(crate) access_token: ArcSwapOption<Box<dyn AccessToken>>,
40 cookies: Mutex<CookieJar>,
41 pub(crate) cookies_string_cache: ArcSwapOption<Box<str>>,
42 client_id: ArcSwapOption<Box<str>>,
43 pub(crate) http_client: Client<HttpConnector>,
44 pub(crate) request_timeout: Duration,
45}
46
47impl<Msg> CometdClient<Msg> {
48 #[inline(always)]
65 pub fn rx(&self) -> CometdEventReceiver<Msg> {
66 CometdEventReceiver(self.inactive_event_rx.activate_cloned())
67 }
68
69 #[inline(always)]
80 pub async fn subscribe(&self, subscriptions: &[impl Serialize + Send + Sync]) {
81 let _ = self
82 .cmd_tx
83 .send(Command::Subscribe(json!(subscriptions)))
84 .await;
85 }
86}
87
88impl CometdClientInner {
89 #[inline(always)]
90 pub(crate) fn next_id(&self) -> String {
91 self.id.fetch_add(1, Ordering::Relaxed).to_string()
92 }
93
94 #[inline]
95 pub(crate) fn extract_and_store_cookie(&self, headers: &HeaderMap) {
96 let mut redo_cache = false;
97
98 let mut cookies = self.cookies.lock().unwrap_or_else(PoisonError::into_inner);
99 for cookie in headers
100 .get_all(SET_COOKIE)
101 .into_iter()
102 .map(HeaderValue::to_str)
103 .filter_map(Result::ok)
104 .map(str::to_owned)
105 .map(Cookie::parse)
106 .filter_map(Result::ok)
107 {
108 cookies.add(cookie);
109 redo_cache = true;
110 }
111
112 if redo_cache {
113 self.cookies_string_cache.store_value(cookies.make_string());
114 }
115 }
116}