cometd_client/client/
connect.rs1use crate::{
2 types::{Advice, CometdError, CometdResult, Data, ErrorKind, Message, Reconnect},
3 CometdClientInner,
4};
5use serde::de::DeserializeOwned;
6use serde_json::json;
7use std::sync::Arc;
8
9impl CometdClientInner {
10 pub(crate) async fn connect<Msg>(&self) -> CometdResult<Arc<[Data<Msg>]>>
11 where
12 Msg: DeserializeOwned,
13 {
14 const KIND: ErrorKind = ErrorKind::Connect;
15
16 let client_id = self
17 .client_id
18 .load_full()
19 .ok_or(CometdError::MissingClientId(KIND))?;
20 let id = self.next_id();
21 let body = json!([{
22 "id": id,
23 "channel": "/meta/connect",
24 "connectionType": "long-polling",
25 "clientId": *client_id
26 }])
27 .to_string();
28
29 let request_builder = self.create_request_builder(&self.connect_endpoint);
30
31 let mut messages = self
32 .send_request_and_parse_json_body::<Vec<Message>>(request_builder, body, KIND)
33 .await?;
34
35 if let Some(position) = messages
36 .iter()
37 .position(|message| message.id.as_ref() == Some(&id))
38 {
39 let Message {
40 successful,
41 error,
42 advice,
43 ..
44 } = messages.remove(position);
45
46 if successful == Some(false) {
47 Err(CometdError::wrong_response(
48 KIND,
49 Advice::reconnect(advice),
50 error.unwrap_or_default(),
51 ))
52 } else {
53 let data = messages
54 .into_iter()
55 .map(|message| {
56 let Message { channel, data, .. } = message;
57 let message = data
58 .map(serde_json::from_value::<Msg>)
59 .transpose()
60 .map_err(|error| CometdError::ParseBody(KIND, error))?;
61
62 Ok::<_, CometdError>(Data { channel, message })
63 })
64 .collect::<CometdResult<_>>()?;
65
66 Ok(data)
67 }
68 } else {
69 Err(CometdError::wrong_response(
70 KIND,
71 Reconnect::None,
72 "The response corresponding request id cannot be found.",
73 ))
74 }
75 }
76}