graphql_ws_client/next/
builder.rs1use std::{
2 future::{Future, IntoFuture},
3 time::Duration,
4};
5
6use futures_lite::future;
7use serde::Serialize;
8
9use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};
10
11use super::{
12 actor::ConnectionActor,
13 connection::{Connection, Message, ObjectSafeConnection},
14 keepalive::KeepAliveSettings,
15 production_future::read_from_producer,
16 Client, Subscription,
17};
18
19#[must_use]
35pub struct ClientBuilder {
36 payload: Option<serde_json::Value>,
37 subscription_buffer_size: Option<usize>,
38 connection: Box<dyn ObjectSafeConnection>,
39 keep_alive: KeepAliveSettings,
40}
41
42impl super::Client {
43 pub fn build<Conn>(connection: Conn) -> ClientBuilder
55 where
56 Conn: Connection + Send + 'static,
57 {
58 ClientBuilder {
59 payload: None,
60 subscription_buffer_size: None,
61 connection: Box::new(connection),
62 keep_alive: KeepAliveSettings::default(),
63 }
64 }
65}
66
67impl ClientBuilder {
68 pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
74 where
75 NewPayload: Serialize,
76 {
77 Ok(ClientBuilder {
78 payload: Some(
79 serde_json::to_value(payload)
80 .map_err(|error| Error::Serializing(error.to_string()))?,
81 ),
82 ..self
83 })
84 }
85
86 pub fn subscription_buffer_size(self, new: usize) -> Self {
89 ClientBuilder {
90 subscription_buffer_size: Some(new),
91 ..self
92 }
93 }
94
95 pub fn keep_alive_interval(mut self, new: Duration) -> Self {
100 self.keep_alive.interval = Some(new);
101 self
102 }
103
104 pub fn keep_alive_retries(mut self, count: usize) -> Self {
108 self.keep_alive.retries = count;
109 self
110 }
111
112 pub async fn subscribe<'a, Operation>(
131 self,
132 op: Operation,
133 ) -> Result<Subscription<Operation>, Error>
134 where
135 Operation: GraphqlOperation + Unpin + Send + 'static,
136 {
137 let (client, actor) = self.await?;
138
139 let actor_future = actor.into_future();
140 let subscribe_future = client.subscribe(op);
141
142 let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?;
143
144 Ok(stream.join(actor_future))
145 }
146}
147
148impl IntoFuture for ClientBuilder {
149 type Output = Result<(Client, ConnectionActor), Error>;
150
151 type IntoFuture = future::Boxed<Self::Output>;
152
153 fn into_future(self) -> Self::IntoFuture {
154 Box::pin(self.build())
155 }
156}
157
158impl ClientBuilder {
159 pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
165 let Self {
166 payload,
167 subscription_buffer_size,
168 mut connection,
169 keep_alive,
170 } = self;
171
172 connection.send(Message::init(payload)).await?;
173
174 loop {
176 match connection.receive().await {
177 None => return Err(Error::Unknown("connection dropped".into())),
178 Some(Message::Close { code, reason }) => {
179 return Err(Error::Close(
180 code.unwrap_or_default(),
181 reason.unwrap_or_default(),
182 ))
183 }
184 Some(Message::Ping | Message::Pong) => {}
185 Some(message @ Message::Text(_)) => {
186 let event = message.deserialize::<Event>()?;
187 match event {
188 Event::Ping { .. } => {
190 connection.send(Message::graphql_pong()).await?;
191 }
192 Event::Pong { .. } => {}
193 Event::ConnectionAck { .. } => {
194 trace!("connection_ack received, handshake completed");
196 break;
197 }
198 event => {
199 connection
200 .send(Message::Close {
201 code: Some(4950),
202 reason: Some("Unexpected message while waiting for ack".into()),
203 })
204 .await
205 .ok();
206 return Err(Error::Decode(format!(
207 "expected a connection_ack or ping, got {}",
208 event.r#type()
209 )));
210 }
211 }
212 }
213 }
214 }
215
216 let (command_sender, command_receiver) = async_channel::bounded(5);
217
218 let actor = ConnectionActor::new(connection, command_receiver, keep_alive);
219
220 let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5));
221
222 Ok((client, actor))
223 }
224}
225
226async fn run_startup<SubscribeFut, Operation>(
227 subscribe: SubscribeFut,
228 actor: future::Boxed<()>,
229) -> Result<(Subscription<Operation>, future::Boxed<()>), Error>
230where
231 SubscribeFut: Future<Output = Result<Subscription<Operation>, Error>>,
232 Operation: GraphqlOperation,
233{
234 match read_from_producer(subscribe, actor).await {
235 Some((Ok(subscription), actor)) => Ok((subscription, actor)),
236 Some((Err(err), _)) => Err(err),
237 None => Err(Error::Unknown(
238 "actor ended before subscription started".into(),
239 )),
240 }
241}