1mod client_task;
2
3use crate::{
4 consts::*,
5 ext::CookieJarExt,
6 types::{AccessToken, CometdResult},
7 CometdClient, CometdClientInner,
8};
9use arc_swap::ArcSwapOption;
10use async_broadcast::broadcast;
11use cookie::{Cookie, CookieJar};
12use core::time::Duration;
13use hyper::Client;
14use serde::de::DeserializeOwned;
15use std::{borrow::Cow, sync::Mutex};
16use tokio::sync::mpsc;
17use url::Url;
18
19#[derive(Debug)]
21pub struct CometdClientBuilder<'a, 'b, 'c, 'd, 'e> {
22 endpoint: &'a Url,
23 handshake_base_path: &'b str,
24 subscribe_base_path: &'c str,
25 connect_base_path: &'d str,
26 disconnect_base_path: &'e str,
27 timeout: Option<Duration>,
28 interval: Option<Duration>,
29 access_token: Option<Box<dyn AccessToken>>,
30 cookies: Option<CookieJar>,
31 commands_channel_capacity: usize,
32 events_channel_capacity: usize,
33 number_of_retries: usize,
34 request_timeout: Duration,
35}
36
37impl<'a, 'b, 'c, 'd, 'e> CometdClientBuilder<'a, 'b, 'c, 'd, 'e> {
38 #[inline(always)]
40 pub fn new(endpoint: &'a Url) -> Self {
41 Self {
42 endpoint,
43 handshake_base_path: "",
44 subscribe_base_path: "",
45 connect_base_path: "",
46 disconnect_base_path: "",
47 timeout: None,
48 interval: None,
49 access_token: None,
50 cookies: None,
51 commands_channel_capacity: DEFAULT_COMMAND_CHANNEL_CAPACITY,
52 events_channel_capacity: DEFAULT_EVENT_CHANNEL_CAPACITY,
53 number_of_retries: DEFAULT_NUMBER_OF_RETRIES,
54 request_timeout: DEFAULT_CLIENT_TIMEOUT,
55 }
56 }
57
58 #[inline(always)]
72 pub fn build<Msg>(self) -> CometdResult<CometdClient<Msg>>
73 where
74 Msg: DeserializeOwned + Send + Sync + 'static,
75 {
76 let Self {
77 endpoint: base_url,
78 handshake_base_path,
79 subscribe_base_path,
80 connect_base_path,
81 disconnect_base_path,
82 timeout,
83 interval,
84 access_token,
85 cookies,
86 commands_channel_capacity,
87 events_channel_capacity,
88 number_of_retries,
89 request_timeout,
90 } = self;
91
92 let handshake_endpoint =
93 String::from(base_url.join(handshake_base_path)?.join("handshake")?).try_into()?;
94 let subscribe_endpoint = String::from(base_url.join(subscribe_base_path)?).try_into()?;
95 let connect_endpoint =
96 String::from(base_url.join(connect_base_path)?.join("connect")?).try_into()?;
97 let disconnect_endpoint =
98 String::from(base_url.join(disconnect_base_path)?.join("disconnect")?).try_into()?;
99 let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_MS);
100 let interval = interval.unwrap_or(DEFAULT_INTERVAL_MS);
101 let id = Default::default();
102 let access_token = access_token
103 .map(ArcSwapOption::from_pointee)
104 .unwrap_or_default();
105 let cookies_string_cache = cookies
106 .as_ref()
107 .map(CookieJarExt::make_string)
108 .map(ArcSwapOption::from_pointee)
109 .unwrap_or_default();
110 let cookies = cookies.unwrap_or_default();
111 let client_id = Default::default();
112 let http_client = Client::builder().build_http();
113
114 let (cmd_tx, cmd_rx) = mpsc::channel(commands_channel_capacity);
115 let (event_tx, mut event_rx) = broadcast(events_channel_capacity);
116 event_rx.set_await_active(false);
117
118 let inner = CometdClientInner {
119 handshake_endpoint,
120 subscribe_endpoint,
121 connect_endpoint,
122 disconnect_endpoint,
123 timeout,
124 interval,
125 number_of_retries,
126 id,
127 access_token,
128 cookies: Mutex::new(cookies),
129 cookies_string_cache,
130 client_id,
131 http_client,
132 request_timeout,
133 };
134
135 client_task::spawn(inner, cmd_rx, event_tx);
136
137 Ok(CometdClient {
138 cmd_tx,
139 inactive_event_rx: event_rx.deactivate(),
140 })
141 }
142
143 #[inline(always)]
159 #[must_use]
160 pub const fn handshake_base_path(mut self, url: &'b str) -> Self {
161 self.handshake_base_path = url;
162 self
163 }
164
165 #[inline(always)]
181 #[must_use]
182 pub const fn subscribe_base_path(mut self, url: &'c str) -> Self {
183 self.subscribe_base_path = url;
184 self
185 }
186
187 #[inline(always)]
202 #[must_use]
203 pub const fn connect_base_path(mut self, url: &'d str) -> Self {
204 self.connect_base_path = url;
205 self
206 }
207
208 #[inline(always)]
223 #[must_use]
224 pub const fn disconnect_base_path(mut self, url: &'e str) -> Self {
225 self.disconnect_base_path = url;
226 self
227 }
228
229 #[inline(always)]
231 #[must_use]
232 pub const fn timeout(mut self, timeout: Duration) -> Self {
233 self.timeout = Some(timeout);
234 self
235 }
236
237 #[inline(always)]
239 #[must_use]
240 pub const fn interval(mut self, interval: Duration) -> Self {
241 self.interval = Some(interval);
242 self
243 }
244
245 #[inline(always)]
247 #[must_use]
248 pub fn access_token(self, access_token: impl AccessToken) -> Self {
249 Self {
250 access_token: Some(Box::new(access_token)),
251 ..self
252 }
253 }
254
255 #[inline(always)]
257 #[must_use]
258 pub fn cookie<N, V>(self, name: N, value: V) -> Self
259 where
260 N: Into<Cow<'static, str>>,
261 V: Into<Cow<'static, str>>,
262 {
263 self.cookies([(name, value)])
264 }
265
266 #[inline(always)]
268 #[must_use]
269 pub fn cookies<N, V>(self, cookies: impl IntoIterator<Item = (N, V)>) -> Self
270 where
271 N: Into<Cow<'static, str>>,
272 V: Into<Cow<'static, str>>,
273 {
274 let mut cookie_jar = CookieJar::new();
275
276 for (name, value) in cookies {
277 cookie_jar.add(Cookie::new(name, value))
278 }
279
280 Self {
281 cookies: Some(cookie_jar),
282 ..self
283 }
284 }
285
286 #[inline(always)]
288 #[must_use]
289 pub const fn events_channel_capacity(mut self, events_channel_capacity: usize) -> Self {
290 self.events_channel_capacity = events_channel_capacity;
291 self
292 }
293
294 #[inline(always)]
296 #[must_use]
297 pub const fn commands_channel_capacity(mut self, commands_channel_capacity: usize) -> Self {
298 self.commands_channel_capacity = commands_channel_capacity;
299 self
300 }
301
302 #[inline(always)]
304 #[must_use]
305 pub const fn number_of_retries(mut self, number_of_retries: usize) -> Self {
306 self.number_of_retries = number_of_retries;
307 self
308 }
309
310 #[inline(always)]
312 #[must_use]
313 pub const fn request_timeout(mut self, request_timeout: Duration) -> Self {
314 self.request_timeout = request_timeout;
315 self
316 }
317}