graphql_ws_client/next/
mod.rs1use std::{
2 fmt,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7};
8
9use futures_lite::StreamExt;
10use serde_json::Value;
11
12use crate::{
13 graphql::GraphqlOperation,
14 protocol::{self},
15 Error,
16};
17
18mod actor;
19mod builder;
20mod connection;
21mod keepalive;
22mod production_future;
23mod stream;
24
25pub use self::{
26 actor::ConnectionActor,
27 builder::ClientBuilder,
28 connection::{Connection, Message},
29 stream::Subscription,
30};
31
32#[derive(Clone)]
56pub struct Client {
57 actor: async_channel::Sender<ConnectionCommand>,
58 subscription_buffer_size: usize,
59 next_id: Arc<AtomicUsize>,
60}
61
62impl Client {
63 pub(super) fn new_internal(
64 actor: async_channel::Sender<ConnectionCommand>,
65 subscription_buffer_size: usize,
66 ) -> Self {
67 Client {
68 actor,
69 subscription_buffer_size,
70 next_id: Arc::new(AtomicUsize::new(0)),
71 }
72 }
73
74 pub async fn subscribe<'a, Operation>(
78 &self,
79 op: Operation,
80 ) -> Result<Subscription<Operation>, Error>
81 where
82 Operation: GraphqlOperation + Unpin + Send + 'static,
83 {
84 let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
85
86 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
87
88 let message = protocol::Message::Subscribe {
89 id: id.to_string(),
90 payload: &op,
91 };
92
93 let request = serde_json::to_string(&message)
94 .map_err(|error| Error::Serializing(error.to_string()))?;
95
96 let actor = self.actor.clone();
97 actor
98 .send(ConnectionCommand::Subscribe {
99 request,
100 sender,
101 id,
102 })
103 .await
104 .map_err(|error| Error::Send(error.to_string()))?;
105
106 Ok(Subscription::<Operation> {
107 id,
108 stream: Box::pin(receiver.map(move |response| {
109 op.decode(response)
110 .map_err(|err| Error::Decode(err.to_string()))
111 })),
112 actor,
113 })
114 }
115
116 pub async fn close(self, code: u16, description: impl Into<String>) {
121 self.actor
122 .send(ConnectionCommand::Close(code, description.into()))
123 .await
124 .ok();
125 }
126}
127
128pub(super) enum ConnectionCommand {
129 Subscribe {
130 request: String,
132 sender: async_channel::Sender<Value>,
133 id: usize,
134 },
135 Ping,
136 Cancel(usize),
137 Close(u16, String),
138}
139
140impl fmt::Debug for Client {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_struct("Client")
143 .field("subscription_buffer_size", &self.subscription_buffer_size)
144 .finish_non_exhaustive()
145 }
146}