graphql_ws_client/next/
builder.rs

1use std::{
2    future::{Future, IntoFuture},
3    time::Duration,
4};
5
6use futures_lite::future;
7use serde::Serialize;
8
9use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};
10
11use super::{
12    actor::ConnectionActor,
13    connection::{Connection, Message, ObjectSafeConnection},
14    keepalive::KeepAliveSettings,
15    production_future::read_from_producer,
16    Client, Subscription,
17};
18
19/// Builder for Graphql over Websocket clients
20///
21/// This can be used to configure the client prior to construction, but can also create
22/// subscriptions directly in the case where users only need to run one per connection.
23///
24/// ```rust
25/// use graphql_ws_client::Client;
26/// use std::future::IntoFuture;
27/// #
28/// # async fn example() -> Result<(), graphql_ws_client::Error> {
29/// # let connection = graphql_ws_client::__doc_utils::Conn;
30/// let (client, actor) = Client::build(connection).await?;
31/// # Ok(())
32/// # }
33/// ```
34#[must_use]
35pub struct ClientBuilder {
36    payload: Option<serde_json::Value>,
37    subscription_buffer_size: Option<usize>,
38    connection: Box<dyn ObjectSafeConnection>,
39    keep_alive: KeepAliveSettings,
40}
41
42impl super::Client {
43    /// Creates a `ClientBuilder` with the given connection.
44    ///
45    /// ```rust
46    /// use graphql_ws_client::Client;
47    /// use std::future::IntoFuture;
48    /// # async fn example() -> Result<(), graphql_ws_client::Error> {
49    /// # let connection = graphql_ws_client::__doc_utils::Conn;
50    /// let (client, actor) = Client::build(connection).await?;
51    /// # Ok(())
52    /// # }
53    /// ```
54    pub fn build<Conn>(connection: Conn) -> ClientBuilder
55    where
56        Conn: Connection + Send + 'static,
57    {
58        ClientBuilder {
59            payload: None,
60            subscription_buffer_size: None,
61            connection: Box::new(connection),
62            keep_alive: KeepAliveSettings::default(),
63        }
64    }
65}
66
67impl ClientBuilder {
68    /// Add payload to `connection_init`
69    ///
70    /// # Errors
71    ///
72    /// Will return `Err` if `payload` serialization fails.
73    pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
74    where
75        NewPayload: Serialize,
76    {
77        Ok(ClientBuilder {
78            payload: Some(
79                serde_json::to_value(payload)
80                    .map_err(|error| Error::Serializing(error.to_string()))?,
81            ),
82            ..self
83        })
84    }
85
86    /// Sets the size of the incoming message buffer that subscriptions created by this client will
87    /// use
88    pub fn subscription_buffer_size(self, new: usize) -> Self {
89        ClientBuilder {
90            subscription_buffer_size: Some(new),
91            ..self
92        }
93    }
94
95    /// Sets the interval between keep alives.
96    ///
97    /// Any incoming messages automatically reset this interval so keep alives may not be sent
98    /// on busy connections even if this is set.
99    pub fn keep_alive_interval(mut self, new: Duration) -> Self {
100        self.keep_alive.interval = Some(new);
101        self
102    }
103
104    /// The number of keepalive retries before a connection is considered broken.
105    ///
106    /// This defaults to 3, but has no effect if `keep_alive_interval` is not called.
107    pub fn keep_alive_retries(mut self, count: usize) -> Self {
108        self.keep_alive.retries = count;
109        self
110    }
111
112    /// Initialise a Client and use it to run a single subscription
113    ///
114    /// ```rust
115    /// use graphql_ws_client::Client;
116    /// use std::future::IntoFuture;
117    /// # async fn example() -> Result<(), graphql_ws_client::Error> {
118    /// # let connection = graphql_ws_client::__doc_utils::Conn;
119    /// # let subscription = graphql_ws_client::__doc_utils::Subscription;
120    /// let stream = Client::build(connection).subscribe(subscription).await?;
121    /// # Ok(())
122    /// # }
123    /// ```
124    ///
125    /// Note that this takes ownership of the client, so it cannot be
126    /// used to run any more operations.
127    ///
128    /// If users want to run multiple operations on a connection they
129    /// should use the `IntoFuture` impl to construct a `Client`
130    pub async fn subscribe<'a, Operation>(
131        self,
132        op: Operation,
133    ) -> Result<Subscription<Operation>, Error>
134    where
135        Operation: GraphqlOperation + Unpin + Send + 'static,
136    {
137        let (client, actor) = self.await?;
138
139        let actor_future = actor.into_future();
140        let subscribe_future = client.subscribe(op);
141
142        let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?;
143
144        Ok(stream.join(actor_future))
145    }
146}
147
148impl IntoFuture for ClientBuilder {
149    type Output = Result<(Client, ConnectionActor), Error>;
150
151    type IntoFuture = future::Boxed<Self::Output>;
152
153    fn into_future(self) -> Self::IntoFuture {
154        Box::pin(self.build())
155    }
156}
157
158impl ClientBuilder {
159    /// Constructs a Client
160    ///
161    /// Accepts an already built websocket connection, and returns the connection
162    /// and a future that must be awaited somewhere - if the future is dropped the
163    /// connection will also drop.
164    pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
165        let Self {
166            payload,
167            subscription_buffer_size,
168            mut connection,
169            keep_alive,
170        } = self;
171
172        connection.send(Message::init(payload)).await?;
173
174        // wait for ack before entering receiver loop:
175        loop {
176            match connection.receive().await {
177                None => return Err(Error::Unknown("connection dropped".into())),
178                Some(Message::Close { code, reason }) => {
179                    return Err(Error::Close(
180                        code.unwrap_or_default(),
181                        reason.unwrap_or_default(),
182                    ))
183                }
184                Some(Message::Ping | Message::Pong) => {}
185                Some(message @ Message::Text(_)) => {
186                    let event = message.deserialize::<Event>()?;
187                    match event {
188                        // pings can be sent at any time
189                        Event::Ping { .. } => {
190                            connection.send(Message::graphql_pong()).await?;
191                        }
192                        Event::Pong { .. } => {}
193                        Event::ConnectionAck { .. } => {
194                            // handshake completed, ready to enter main receiver loop
195                            trace!("connection_ack received, handshake completed");
196                            break;
197                        }
198                        event => {
199                            connection
200                                .send(Message::Close {
201                                    code: Some(4950),
202                                    reason: Some("Unexpected message while waiting for ack".into()),
203                                })
204                                .await
205                                .ok();
206                            return Err(Error::Decode(format!(
207                                "expected a connection_ack or ping, got {}",
208                                event.r#type()
209                            )));
210                        }
211                    }
212                }
213            }
214        }
215
216        let (command_sender, command_receiver) = async_channel::bounded(5);
217
218        let actor = ConnectionActor::new(connection, command_receiver, keep_alive);
219
220        let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5));
221
222        Ok((client, actor))
223    }
224}
225
226async fn run_startup<SubscribeFut, Operation>(
227    subscribe: SubscribeFut,
228    actor: future::Boxed<()>,
229) -> Result<(Subscription<Operation>, future::Boxed<()>), Error>
230where
231    SubscribeFut: Future<Output = Result<Subscription<Operation>, Error>>,
232    Operation: GraphqlOperation,
233{
234    match read_from_producer(subscribe, actor).await {
235        Some((Ok(subscription), actor)) => Ok((subscription, actor)),
236        Some((Err(err), _)) => Err(err),
237        None => Err(Error::Unknown(
238            "actor ended before subscription started".into(),
239        )),
240    }
241}