graphql_ws_client/client/
mod.rs1use std::{
2 fmt,
3 sync::{
4 Arc,
5 atomic::{AtomicUsize, Ordering},
6 },
7};
8
9use futures_lite::StreamExt;
10use serde_json::Value;
11
12use crate::{
13 Error,
14 graphql::GraphqlOperation,
15 protocol::{self},
16};
17
18mod actor;
19mod builder;
20mod conection_id;
21mod connection;
22mod keepalive;
23mod production_future;
24mod subscription;
25
26pub use self::{
27 actor::ConnectionActor,
28 builder::ClientBuilder,
29 conection_id::SubscriptionId,
30 connection::{Connection, Message},
31 subscription::Subscription,
32};
33
34#[derive(Clone)]
58pub struct Client {
59 actor: async_channel::Sender<ConnectionCommand>,
60 drop_sender: async_channel::Sender<SubscriptionId>,
61 subscription_buffer_size: usize,
62 next_id: Arc<AtomicUsize>,
63}
64
65impl Client {
66 pub(super) fn new_internal(
67 actor: async_channel::Sender<ConnectionCommand>,
68 drop_sender: async_channel::Sender<SubscriptionId>,
69 subscription_buffer_size: usize,
70 ) -> Self {
71 Client {
72 actor,
73 drop_sender,
74 subscription_buffer_size,
75 next_id: Arc::new(AtomicUsize::new(1)),
76 }
77 }
78
79 pub async fn subscribe<Operation>(
83 &self,
84 op: Operation,
85 ) -> Result<Subscription<Operation>, Error>
86 where
87 Operation: GraphqlOperation + Unpin + Send + 'static,
88 {
89 let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
90
91 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
92
93 let message = protocol::Message::Subscribe {
94 id: id.to_string(),
95 payload: &op,
96 };
97
98 let request = serde_json::to_string(&message)
99 .map_err(|error| Error::Serializing(error.to_string()))?;
100
101 let id = SubscriptionId::new(id).ok_or(Error::ConnectionIdsExhausted)?;
102
103 let actor = self.actor.clone();
104 actor
105 .send(ConnectionCommand::Subscribe {
106 request,
107 sender,
108 id,
109 })
110 .await
111 .map_err(|error| Error::Send(error.to_string()))?;
112
113 Ok(Subscription::<Operation> {
114 id,
115 stream: Some(Box::pin(receiver.map(move |response| {
116 op.decode(response)
117 .map_err(|err| Error::Decode(err.to_string()))
118 }))),
119 actor,
120 drop_sender: Some(self.drop_sender.clone()),
121 })
122 }
123
124 pub async fn stop(&self, subscription_id: SubscriptionId) -> Result<(), Error> {
130 self.actor
131 .send(ConnectionCommand::Cancel(subscription_id))
132 .await
133 .map_err(|error| Error::Send(error.to_string()))
134 }
135
136 pub async fn close(self, code: u16, description: impl Into<String>) {
141 self.actor
142 .send(ConnectionCommand::Close(code, description.into()))
143 .await
144 .ok();
145 }
146}
147
148pub(super) enum ConnectionCommand {
149 Subscribe {
150 request: String,
152 sender: async_channel::Sender<Value>,
153 id: SubscriptionId,
154 },
155 Ping,
156 Cancel(SubscriptionId),
157 Close(u16, String),
158}
159
160impl fmt::Debug for Client {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 f.debug_struct("Client")
163 .field("subscription_buffer_size", &self.subscription_buffer_size)
164 .finish_non_exhaustive()
165 }
166}