graphql_ws_client/client/
builder.rs1use std::{
2 future::{Future, IntoFuture},
3 time::Duration,
4};
5
6use futures_lite::future;
7use serde::Serialize;
8
9use crate::{Error, graphql::GraphqlOperation, logging::trace, protocol::Event};
10
11use super::{
12 Client, Subscription,
13 actor::ConnectionActor,
14 connection::{Connection, Message, ObjectSafeConnection},
15 keepalive::KeepAliveSettings,
16 production_future::read_from_producer,
17};
18
19#[must_use]
35pub struct ClientBuilder {
36 payload: Option<serde_json::Value>,
37 subscription_buffer_size: Option<usize>,
38 connection: Box<dyn ObjectSafeConnection>,
39 keep_alive: KeepAliveSettings,
40}
41
42impl super::Client {
43 pub fn build<Conn>(connection: Conn) -> ClientBuilder
55 where
56 Conn: Connection + Send + 'static,
57 {
58 ClientBuilder {
59 payload: None,
60 subscription_buffer_size: None,
61 connection: Box::new(connection),
62 keep_alive: KeepAliveSettings::default(),
63 }
64 }
65}
66
67impl ClientBuilder {
68 pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
74 where
75 NewPayload: Serialize,
76 {
77 Ok(ClientBuilder {
78 payload: Some(
79 serde_json::to_value(payload)
80 .map_err(|error| Error::Serializing(error.to_string()))?,
81 ),
82 ..self
83 })
84 }
85
86 pub fn subscription_buffer_size(self, new: usize) -> Self {
89 ClientBuilder {
90 subscription_buffer_size: Some(new),
91 ..self
92 }
93 }
94
95 pub fn keep_alive_interval(mut self, new: Duration) -> Self {
100 self.keep_alive.interval = Some(new);
101 self
102 }
103
104 pub fn keep_alive_retries(mut self, count: usize) -> Self {
108 self.keep_alive.retries = count;
109 self
110 }
111
112 pub async fn subscribe<Operation>(self, op: Operation) -> Result<Subscription<Operation>, Error>
131 where
132 Operation: GraphqlOperation + Unpin + Send + 'static,
133 {
134 let (client, actor) = self.await?;
135
136 let actor_future = actor.into_future();
137 let subscribe_future = client.subscribe(op);
138
139 let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?;
140
141 Ok(stream.join(actor_future))
142 }
143}
144
145impl IntoFuture for ClientBuilder {
146 type Output = Result<(Client, ConnectionActor), Error>;
147
148 type IntoFuture = future::Boxed<Self::Output>;
149
150 fn into_future(self) -> Self::IntoFuture {
151 Box::pin(self.build())
152 }
153}
154
155impl ClientBuilder {
156 pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
162 let Self {
163 payload,
164 subscription_buffer_size,
165 mut connection,
166 keep_alive,
167 } = self;
168
169 connection.send(Message::init(payload)).await?;
170
171 loop {
173 match connection.receive().await {
174 None => return Err(Error::Unknown("connection dropped".into())),
175 Some(Message::Close { code, reason }) => {
176 return Err(Error::Close(
177 code.unwrap_or_default(),
178 reason.unwrap_or_default(),
179 ));
180 }
181 Some(Message::Ping | Message::Pong) => {}
182 Some(message @ Message::Text(_)) => {
183 let event = message.deserialize::<Event>()?;
184 match event {
185 Event::Ping { .. } => {
187 connection.send(Message::graphql_pong()).await?;
188 }
189 Event::Pong { .. } => {}
190 Event::ConnectionAck { .. } => {
191 trace!("connection_ack received, handshake completed");
193 break;
194 }
195 event => {
196 connection
197 .send(Message::Close {
198 code: Some(4950),
199 reason: Some("Unexpected message while waiting for ack".into()),
200 })
201 .await
202 .ok();
203 return Err(Error::Decode(format!(
204 "expected a connection_ack or ping, got {}",
205 event.r#type()
206 )));
207 }
208 }
209 }
210 }
211 }
212
213 let (command_sender, command_receiver) = async_channel::bounded(5);
214 let (drop_sender, drop_receiver) = async_channel::unbounded();
215
216 let actor = ConnectionActor::new(connection, command_receiver, drop_receiver, keep_alive);
217
218 let client = Client::new_internal(
219 command_sender,
220 drop_sender,
221 subscription_buffer_size.unwrap_or(5),
222 );
223
224 Ok((client, actor))
225 }
226}
227
228async fn run_startup<SubscribeFut, Operation>(
229 subscribe: SubscribeFut,
230 actor: future::Boxed<()>,
231) -> Result<(Subscription<Operation>, future::Boxed<()>), Error>
232where
233 SubscribeFut: Future<Output = Result<Subscription<Operation>, Error>>,
234 Operation: GraphqlOperation,
235{
236 match read_from_producer(subscribe, actor).await {
237 Some((Ok(subscription), actor)) => Ok((subscription, actor)),
238 Some((Err(err), _)) => Err(err),
239 None => Err(Error::Unknown(
240 "actor ended before subscription started".into(),
241 )),
242 }
243}