cometd_client/
client.rs

1mod builder;
2mod connect;
3mod disconnect;
4mod handshake;
5mod subscribe;
6
7pub use builder::*;
8
9use crate::{ext::CookieJarExt as _, types::*, ArcSwapOptionExt};
10use arc_swap::ArcSwapOption;
11use cookie::{Cookie, CookieJar};
12use core::{
13    sync::atomic::{AtomicUsize, Ordering},
14    time::Duration,
15};
16use hyper::{client::HttpConnector, header::SET_COOKIE, http::HeaderValue, Client, HeaderMap, Uri};
17use serde::Serialize;
18use serde_json::json;
19use std::sync::{Mutex, PoisonError};
20
21/// A cometd Client.
22#[derive(Debug)]
23pub struct CometdClient<Msg> {
24    cmd_tx: CmdSender,
25    inactive_event_rx: InactiveEventReceiver<Msg>,
26}
27
28#[derive(Debug)]
29pub(crate) struct CometdClientInner {
30    handshake_endpoint: Uri,
31    subscribe_endpoint: Uri,
32    connect_endpoint: Uri,
33    disconnect_endpoint: Uri,
34    timeout: Duration,
35    interval: Duration,
36    number_of_retries: usize,
37
38    id: AtomicUsize,
39    pub(crate) access_token: ArcSwapOption<Box<dyn AccessToken>>,
40    cookies: Mutex<CookieJar>,
41    pub(crate) cookies_string_cache: ArcSwapOption<Box<str>>,
42    client_id: ArcSwapOption<Box<str>>,
43    pub(crate) http_client: Client<HttpConnector>,
44    pub(crate) request_timeout: Duration,
45}
46
47impl<Msg> CometdClient<Msg> {
48    /// Return client event receiver channel.
49    ///
50    /// # Example
51    /// ```rust,no_run
52    /// # use cometd_client::{CometdClientBuilder, types::CometdResult};
53    /// # async fn _fun() {
54    /// #   let client = CometdClientBuilder::new(&"http://[::1]:1025/".parse().unwrap()).build::<()>().unwrap();
55    ///     let mut event_rx = client.rx();
56    ///     
57    ///     client.subscribe(&["/topic0"]).await;
58    ///
59    ///     while let Some(event) = event_rx.recv().await {
60    ///         println!("Got cometd client event: `{event:?}`.");
61    ///     }
62    /// # }
63    /// ```
64    #[inline(always)]
65    pub fn rx(&self) -> CometdEventReceiver<Msg> {
66        CometdEventReceiver(self.inactive_event_rx.activate_cloned())
67    }
68
69    /// Ask client command loop to send subscribe request.
70    ///
71    /// # Example
72    /// ```rust,no_run
73    /// # use cometd_client::{CometdClientBuilder, types::CometdResult};
74    /// # async fn _fun() {
75    /// #   let client = CometdClientBuilder::new(&"http://[::1]:1025/".parse().unwrap()).build::<()>().unwrap();
76    ///     client.subscribe(&["/topic0", "/topic1"]).await;
77    /// # }
78    /// ```
79    #[inline(always)]
80    pub async fn subscribe(&self, subscriptions: &[impl Serialize + Send + Sync]) {
81        let _ = self
82            .cmd_tx
83            .send(Command::Subscribe(json!(subscriptions)))
84            .await;
85    }
86}
87
88impl CometdClientInner {
89    #[inline(always)]
90    pub(crate) fn next_id(&self) -> String {
91        self.id.fetch_add(1, Ordering::Relaxed).to_string()
92    }
93
94    #[inline]
95    pub(crate) fn extract_and_store_cookie(&self, headers: &HeaderMap) {
96        let mut redo_cache = false;
97
98        let mut cookies = self.cookies.lock().unwrap_or_else(PoisonError::into_inner);
99        for cookie in headers
100            .get_all(SET_COOKIE)
101            .into_iter()
102            .map(HeaderValue::to_str)
103            .filter_map(Result::ok)
104            .map(str::to_owned)
105            .map(Cookie::parse)
106            .filter_map(Result::ok)
107        {
108            cookies.add(cookie);
109            redo_cache = true;
110        }
111
112        if redo_cache {
113            self.cookies_string_cache.store_value(cookies.make_string());
114        }
115    }
116}