cometd_client/client/
builder.rs

1mod client_task;
2
3use crate::{
4    consts::*,
5    ext::CookieJarExt,
6    types::{AccessToken, CometdResult},
7    CometdClient, CometdClientInner,
8};
9use arc_swap::ArcSwapOption;
10use async_broadcast::broadcast;
11use cookie::{Cookie, CookieJar};
12use core::time::Duration;
13use hyper::Client;
14use serde::de::DeserializeOwned;
15use std::{borrow::Cow, sync::Mutex};
16use tokio::sync::mpsc;
17use url::Url;
18
19/// A builder to construct `CometdClient`.
20#[derive(Debug)]
21pub struct CometdClientBuilder<'a, 'b, 'c, 'd, 'e> {
22    endpoint: &'a Url,
23    handshake_base_path: &'b str,
24    subscribe_base_path: &'c str,
25    connect_base_path: &'d str,
26    disconnect_base_path: &'e str,
27    timeout: Option<Duration>,
28    interval: Option<Duration>,
29    access_token: Option<Box<dyn AccessToken>>,
30    cookies: Option<CookieJar>,
31    commands_channel_capacity: usize,
32    events_channel_capacity: usize,
33    number_of_retries: usize,
34    request_timeout: Duration,
35}
36
37impl<'a, 'b, 'c, 'd, 'e> CometdClientBuilder<'a, 'b, 'c, 'd, 'e> {
38    /// Construct a new `ClientBuilder`.
39    #[inline(always)]
40    pub fn new(endpoint: &'a Url) -> Self {
41        Self {
42            endpoint,
43            handshake_base_path: "",
44            subscribe_base_path: "",
45            connect_base_path: "",
46            disconnect_base_path: "",
47            timeout: None,
48            interval: None,
49            access_token: None,
50            cookies: None,
51            commands_channel_capacity: DEFAULT_COMMAND_CHANNEL_CAPACITY,
52            events_channel_capacity: DEFAULT_EVENT_CHANNEL_CAPACITY,
53            number_of_retries: DEFAULT_NUMBER_OF_RETRIES,
54            request_timeout: DEFAULT_CLIENT_TIMEOUT,
55        }
56    }
57
58    /// Return a `CometdClient`.
59    ///
60    /// # Example
61    /// ```rust,no_run
62    /// # use cometd_client::{CometdClient, CometdClientBuilder};
63    /// # let _ = || -> cometd_client::types::CometdResult<_> {
64    /// # #[derive(serde::Deserialize)]
65    /// # struct Data { msg: String, }
66    /// let client = CometdClientBuilder::new(&"http://[::1]:1025/notifications/".parse()?)
67    ///     .build()?;
68    /// # let client: CometdClient<Data> = client;
69    /// # Ok(()) };
70    /// ```
71    #[inline(always)]
72    pub fn build<Msg>(self) -> CometdResult<CometdClient<Msg>>
73    where
74        Msg: DeserializeOwned + Send + Sync + 'static,
75    {
76        let Self {
77            endpoint: base_url,
78            handshake_base_path,
79            subscribe_base_path,
80            connect_base_path,
81            disconnect_base_path,
82            timeout,
83            interval,
84            access_token,
85            cookies,
86            commands_channel_capacity,
87            events_channel_capacity,
88            number_of_retries,
89            request_timeout,
90        } = self;
91
92        let handshake_endpoint =
93            String::from(base_url.join(handshake_base_path)?.join("handshake")?).try_into()?;
94        let subscribe_endpoint = String::from(base_url.join(subscribe_base_path)?).try_into()?;
95        let connect_endpoint =
96            String::from(base_url.join(connect_base_path)?.join("connect")?).try_into()?;
97        let disconnect_endpoint =
98            String::from(base_url.join(disconnect_base_path)?.join("disconnect")?).try_into()?;
99        let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_MS);
100        let interval = interval.unwrap_or(DEFAULT_INTERVAL_MS);
101        let id = Default::default();
102        let access_token = access_token
103            .map(ArcSwapOption::from_pointee)
104            .unwrap_or_default();
105        let cookies_string_cache = cookies
106            .as_ref()
107            .map(CookieJarExt::make_string)
108            .map(ArcSwapOption::from_pointee)
109            .unwrap_or_default();
110        let cookies = cookies.unwrap_or_default();
111        let client_id = Default::default();
112        let http_client = Client::builder().build_http();
113
114        let (cmd_tx, cmd_rx) = mpsc::channel(commands_channel_capacity);
115        let (event_tx, mut event_rx) = broadcast(events_channel_capacity);
116        event_rx.set_await_active(false);
117
118        let inner = CometdClientInner {
119            handshake_endpoint,
120            subscribe_endpoint,
121            connect_endpoint,
122            disconnect_endpoint,
123            timeout,
124            interval,
125            number_of_retries,
126            id,
127            access_token,
128            cookies: Mutex::new(cookies),
129            cookies_string_cache,
130            client_id,
131            http_client,
132            request_timeout,
133        };
134
135        client_task::spawn(inner, cmd_rx, event_tx);
136
137        Ok(CometdClient {
138            cmd_tx,
139            inactive_event_rx: event_rx.deactivate(),
140        })
141    }
142
143    /// Set cometd server handshake url path.
144    ///
145    /// # Example
146    /// ```rust,no_run
147    /// # use cometd_client::{CometdClient, CometdClientBuilder};
148    /// # let _ = || -> cometd_client::types::CometdResult<_> {
149    /// # #[derive(serde::Deserialize)]
150    /// # struct Data { msg: String, }
151    ///
152    ///     let client = CometdClientBuilder::new(&"http://[::1]:1025/notifications/".parse()?)
153    ///         .handshake_base_path("hand/") // http://[::1]:1025/notifications/hand/handshake
154    ///         .build()?;
155    /// # let app: CometdClient<Data> = client;
156    /// # Ok(()) };
157    /// ```
158    #[inline(always)]
159    #[must_use]
160    pub const fn handshake_base_path(mut self, url: &'b str) -> Self {
161        self.handshake_base_path = url;
162        self
163    }
164
165    /// Set cometd server subscribe url path.
166    ///
167    /// # Example
168    /// ```rust,no_run
169    /// # use cometd_client::{CometdClient, CometdClientBuilder};
170    /// # let _ = || -> cometd_client::types::CometdResult<_> {
171    /// # #[derive(serde::Deserialize)]
172    /// # struct Data { msg: String, }
173    ///
174    ///     let client = CometdClientBuilder::new(&"http://[::1]:1025/notifications/".parse()?)
175    ///         .subscribe_base_path("sub/") // http://[::1]:1025/notifications/sub/
176    ///         .build()?;
177    /// # let app: CometdClient<Data> = client;
178    /// # Ok(()) };
179    /// ```
180    #[inline(always)]
181    #[must_use]
182    pub const fn subscribe_base_path(mut self, url: &'c str) -> Self {
183        self.subscribe_base_path = url;
184        self
185    }
186
187    /// Set cometd server connect url path.
188    ///
189    /// # Example
190    /// ```rust,no_run
191    /// # use cometd_client::{CometdClient, CometdClientBuilder};
192    /// # let _ = || -> cometd_client::types::CometdResult<_> {
193    /// # #[derive(serde::Deserialize)]
194    /// # struct Data { msg: String, }
195    ///     let client = CometdClientBuilder::new(&"http://[::1]:1025/notifications/".parse()?)
196    ///         .connect_base_path("con/") // http://[::1]:1025/notifications/con/connect
197    ///         .build()?;
198    /// # let app: CometdClient<Data> = client;
199    /// # Ok(()) };
200    /// ```
201    #[inline(always)]
202    #[must_use]
203    pub const fn connect_base_path(mut self, url: &'d str) -> Self {
204        self.connect_base_path = url;
205        self
206    }
207
208    /// Set cometd server disconnect url path.
209    ///
210    /// # Example
211    /// ```rust,no_run
212    /// # use cometd_client::{CometdClient, CometdClientBuilder};
213    /// # let _ = || -> cometd_client::types::CometdResult<_> {
214    /// # #[derive(serde::Deserialize)]
215    /// # struct Data { msg: String, }
216    ///     let client = CometdClientBuilder::new(&"http://[::1]:1025/notifications/".parse()?)
217    ///         .disconnect_base_path("con/") // http://[::1]:1025/notifications/discon/disconnect
218    ///         .build()?;
219    /// # let app: CometdClient<Data> = client;
220    /// # Ok(()) };
221    /// ```
222    #[inline(always)]
223    #[must_use]
224    pub const fn disconnect_base_path(mut self, url: &'e str) -> Self {
225        self.disconnect_base_path = url;
226        self
227    }
228
229    /// Set `timeout` option in handshake request.
230    #[inline(always)]
231    #[must_use]
232    pub const fn timeout(mut self, timeout: Duration) -> Self {
233        self.timeout = Some(timeout);
234        self
235    }
236
237    /// Set `interval` option in handshake request.
238    #[inline(always)]
239    #[must_use]
240    pub const fn interval(mut self, interval: Duration) -> Self {
241        self.interval = Some(interval);
242        self
243    }
244
245    /// Set `access token` option in handshake request.
246    #[inline(always)]
247    #[must_use]
248    pub fn access_token(self, access_token: impl AccessToken) -> Self {
249        Self {
250            access_token: Some(Box::new(access_token)),
251            ..self
252        }
253    }
254
255    /// Set `cookie`.
256    #[inline(always)]
257    #[must_use]
258    pub fn cookie<N, V>(self, name: N, value: V) -> Self
259    where
260        N: Into<Cow<'static, str>>,
261        V: Into<Cow<'static, str>>,
262    {
263        self.cookies([(name, value)])
264    }
265
266    /// Set `cookies`.
267    #[inline(always)]
268    #[must_use]
269    pub fn cookies<N, V>(self, cookies: impl IntoIterator<Item = (N, V)>) -> Self
270    where
271        N: Into<Cow<'static, str>>,
272        V: Into<Cow<'static, str>>,
273    {
274        let mut cookie_jar = CookieJar::new();
275
276        for (name, value) in cookies {
277            cookie_jar.add(Cookie::new(name, value))
278        }
279
280        Self {
281            cookies: Some(cookie_jar),
282            ..self
283        }
284    }
285
286    /// Set capacity of `Event` channel.
287    #[inline(always)]
288    #[must_use]
289    pub const fn events_channel_capacity(mut self, events_channel_capacity: usize) -> Self {
290        self.events_channel_capacity = events_channel_capacity;
291        self
292    }
293
294    /// Set capacity of internal commands channel.
295    #[inline(always)]
296    #[must_use]
297    pub const fn commands_channel_capacity(mut self, commands_channel_capacity: usize) -> Self {
298        self.commands_channel_capacity = commands_channel_capacity;
299        self
300    }
301
302    /// Set number of retries for requests.
303    #[inline(always)]
304    #[must_use]
305    pub const fn number_of_retries(mut self, number_of_retries: usize) -> Self {
306        self.number_of_retries = number_of_retries;
307        self
308    }
309
310    /// Set requests timeout.
311    #[inline(always)]
312    #[must_use]
313    pub const fn request_timeout(mut self, request_timeout: Duration) -> Self {
314        self.request_timeout = request_timeout;
315        self
316    }
317}