Skip to main content

graphql_ws_client/client/
builder.rs

1use std::{
2    future::{Future, IntoFuture},
3    time::Duration,
4};
5
6use futures_lite::future;
7use serde::Serialize;
8
9use crate::{Error, graphql::GraphqlOperation, logging::trace, protocol::Event};
10
11use super::{
12    Client, Subscription,
13    actor::ConnectionActor,
14    connection::{Connection, Message, ObjectSafeConnection},
15    keepalive::KeepAliveSettings,
16    production_future::read_from_producer,
17};
18
19/// Builder for Graphql over Websocket clients
20///
21/// This can be used to configure the client prior to construction, but can also create
22/// subscriptions directly in the case where users only need to run one per connection.
23///
24/// ```rust
25/// use graphql_ws_client::Client;
26/// use std::future::IntoFuture;
27/// #
28/// # async fn example() -> Result<(), graphql_ws_client::Error> {
29/// # let connection = graphql_ws_client::__doc_utils::Conn;
30/// let (client, actor) = Client::build(connection).await?;
31/// # Ok(())
32/// # }
33/// ```
34#[must_use]
35pub struct ClientBuilder {
36    payload: Option<serde_json::Value>,
37    subscription_buffer_size: Option<usize>,
38    connection: Box<dyn ObjectSafeConnection>,
39    keep_alive: KeepAliveSettings,
40}
41
42impl super::Client {
43    /// Creates a `ClientBuilder` with the given connection.
44    ///
45    /// ```rust
46    /// use graphql_ws_client::Client;
47    /// use std::future::IntoFuture;
48    /// # async fn example() -> Result<(), graphql_ws_client::Error> {
49    /// # let connection = graphql_ws_client::__doc_utils::Conn;
50    /// let (client, actor) = Client::build(connection).await?;
51    /// # Ok(())
52    /// # }
53    /// ```
54    pub fn build<Conn>(connection: Conn) -> ClientBuilder
55    where
56        Conn: Connection + Send + 'static,
57    {
58        ClientBuilder {
59            payload: None,
60            subscription_buffer_size: None,
61            connection: Box::new(connection),
62            keep_alive: KeepAliveSettings::default(),
63        }
64    }
65}
66
67impl ClientBuilder {
68    /// Add payload to `connection_init`
69    ///
70    /// # Errors
71    ///
72    /// Will return `Err` if `payload` serialization fails.
73    pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
74    where
75        NewPayload: Serialize,
76    {
77        Ok(ClientBuilder {
78            payload: Some(
79                serde_json::to_value(payload)
80                    .map_err(|error| Error::Serializing(error.to_string()))?,
81            ),
82            ..self
83        })
84    }
85
86    /// Sets the size of the incoming message buffer that subscriptions created by this client will
87    /// use
88    pub fn subscription_buffer_size(self, new: usize) -> Self {
89        ClientBuilder {
90            subscription_buffer_size: Some(new),
91            ..self
92        }
93    }
94
95    /// Sets the interval between keep alives.
96    ///
97    /// Any incoming messages automatically reset this interval so keep alives may not be sent
98    /// on busy connections even if this is set.
99    pub fn keep_alive_interval(mut self, new: Duration) -> Self {
100        self.keep_alive.interval = Some(new);
101        self
102    }
103
104    /// The number of keepalive retries before a connection is considered broken.
105    ///
106    /// This defaults to 3, but has no effect if `keep_alive_interval` is not called.
107    pub fn keep_alive_retries(mut self, count: usize) -> Self {
108        self.keep_alive.retries = count;
109        self
110    }
111
112    /// Initialise a Client and use it to run a single subscription
113    ///
114    /// ```rust
115    /// use graphql_ws_client::Client;
116    /// use std::future::IntoFuture;
117    /// # async fn example() -> Result<(), graphql_ws_client::Error> {
118    /// # let connection = graphql_ws_client::__doc_utils::Conn;
119    /// # let subscription = graphql_ws_client::__doc_utils::Subscription;
120    /// let stream = Client::build(connection).subscribe(subscription).await?;
121    /// # Ok(())
122    /// # }
123    /// ```
124    ///
125    /// Note that this takes ownership of the client, so it cannot be
126    /// used to run any more operations.
127    ///
128    /// If users want to run multiple operations on a connection they
129    /// should use the `IntoFuture` impl to construct a `Client`
130    pub async fn subscribe<Operation>(self, op: Operation) -> Result<Subscription<Operation>, Error>
131    where
132        Operation: GraphqlOperation + Unpin + Send + 'static,
133    {
134        let (client, actor) = self.await?;
135
136        let actor_future = actor.into_future();
137        let subscribe_future = client.subscribe(op);
138
139        let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?;
140
141        Ok(stream.join(actor_future))
142    }
143}
144
145impl IntoFuture for ClientBuilder {
146    type Output = Result<(Client, ConnectionActor), Error>;
147
148    type IntoFuture = future::Boxed<Self::Output>;
149
150    fn into_future(self) -> Self::IntoFuture {
151        Box::pin(self.build())
152    }
153}
154
155impl ClientBuilder {
156    /// Constructs a Client
157    ///
158    /// Accepts an already built websocket connection, and returns the connection
159    /// and a future that must be awaited somewhere - if the future is dropped the
160    /// connection will also drop.
161    pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
162        let Self {
163            payload,
164            subscription_buffer_size,
165            mut connection,
166            keep_alive,
167        } = self;
168
169        connection.send(Message::init(payload)).await?;
170
171        // wait for ack before entering receiver loop:
172        loop {
173            match connection.receive().await {
174                None => return Err(Error::Unknown("connection dropped".into())),
175                Some(Message::Close { code, reason }) => {
176                    return Err(Error::Close(
177                        code.unwrap_or_default(),
178                        reason.unwrap_or_default(),
179                    ));
180                }
181                Some(Message::Ping | Message::Pong) => {}
182                Some(message @ Message::Text(_)) => {
183                    let event = message.deserialize::<Event>()?;
184                    match event {
185                        // pings can be sent at any time
186                        Event::Ping { .. } => {
187                            connection.send(Message::graphql_pong()).await?;
188                        }
189                        Event::Pong { .. } => {}
190                        Event::ConnectionAck { .. } => {
191                            // handshake completed, ready to enter main receiver loop
192                            trace!("connection_ack received, handshake completed");
193                            break;
194                        }
195                        event => {
196                            connection
197                                .send(Message::Close {
198                                    code: Some(4950),
199                                    reason: Some("Unexpected message while waiting for ack".into()),
200                                })
201                                .await
202                                .ok();
203                            return Err(Error::Decode(format!(
204                                "expected a connection_ack or ping, got {}",
205                                event.r#type()
206                            )));
207                        }
208                    }
209                }
210            }
211        }
212
213        let (command_sender, command_receiver) = async_channel::bounded(5);
214        let (drop_sender, drop_receiver) = async_channel::unbounded();
215
216        let actor = ConnectionActor::new(connection, command_receiver, drop_receiver, keep_alive);
217
218        let client = Client::new_internal(
219            command_sender,
220            drop_sender,
221            subscription_buffer_size.unwrap_or(5),
222        );
223
224        Ok((client, actor))
225    }
226}
227
228async fn run_startup<SubscribeFut, Operation>(
229    subscribe: SubscribeFut,
230    actor: future::Boxed<()>,
231) -> Result<(Subscription<Operation>, future::Boxed<()>), Error>
232where
233    SubscribeFut: Future<Output = Result<Subscription<Operation>, Error>>,
234    Operation: GraphqlOperation,
235{
236    match read_from_producer(subscribe, actor).await {
237        Some((Ok(subscription), actor)) => Ok((subscription, actor)),
238        Some((Err(err), _)) => Err(err),
239        None => Err(Error::Unknown(
240            "actor ended before subscription started".into(),
241        )),
242    }
243}