cometd_client/client/
connect.rs

1use crate::{
2    types::{Advice, CometdError, CometdResult, Data, ErrorKind, Message, Reconnect},
3    CometdClientInner,
4};
5use serde::de::DeserializeOwned;
6use serde_json::json;
7use std::sync::Arc;
8
9impl CometdClientInner {
10    pub(crate) async fn connect<Msg>(&self) -> CometdResult<Arc<[Data<Msg>]>>
11    where
12        Msg: DeserializeOwned,
13    {
14        const KIND: ErrorKind = ErrorKind::Connect;
15
16        let client_id = self
17            .client_id
18            .load_full()
19            .ok_or(CometdError::MissingClientId(KIND))?;
20        let id = self.next_id();
21        let body = json!([{
22          "id": id,
23          "channel": "/meta/connect",
24          "connectionType": "long-polling",
25          "clientId": *client_id
26        }])
27        .to_string();
28
29        let request_builder = self.create_request_builder(&self.connect_endpoint);
30
31        let mut messages = self
32            .send_request_and_parse_json_body::<Vec<Message>>(request_builder, body, KIND)
33            .await?;
34
35        if let Some(position) = messages
36            .iter()
37            .position(|message| message.id.as_ref() == Some(&id))
38        {
39            let Message {
40                successful,
41                error,
42                advice,
43                ..
44            } = messages.remove(position);
45
46            if successful == Some(false) {
47                Err(CometdError::wrong_response(
48                    KIND,
49                    Advice::reconnect(advice),
50                    error.unwrap_or_default(),
51                ))
52            } else {
53                let data = messages
54                    .into_iter()
55                    .map(|message| {
56                        let Message { channel, data, .. } = message;
57                        let message = data
58                            .map(serde_json::from_value::<Msg>)
59                            .transpose()
60                            .map_err(|error| CometdError::ParseBody(KIND, error))?;
61
62                        Ok::<_, CometdError>(Data { channel, message })
63                    })
64                    .collect::<CometdResult<_>>()?;
65
66                Ok(data)
67            }
68        } else {
69            Err(CometdError::wrong_response(
70                KIND,
71                Reconnect::None,
72                "The response corresponding request id cannot be found.",
73            ))
74        }
75    }
76}