Skip to main content

graphql_ws_client/client/
mod.rs

1use std::{
2    fmt,
3    sync::{
4        Arc,
5        atomic::{AtomicUsize, Ordering},
6    },
7};
8
9use futures_lite::StreamExt;
10use serde_json::Value;
11
12use crate::{
13    Error,
14    graphql::GraphqlOperation,
15    protocol::{self},
16};
17
18mod actor;
19mod builder;
20mod conection_id;
21mod connection;
22mod keepalive;
23mod production_future;
24mod subscription;
25
26pub use self::{
27    actor::ConnectionActor,
28    builder::ClientBuilder,
29    conection_id::SubscriptionId,
30    connection::{Connection, Message},
31    subscription::Subscription,
32};
33
34/// A GraphQL over Websocket client
35///
36/// ```rust,no_run
37/// use graphql_ws_client::Client;
38/// use std::future::IntoFuture;
39/// use futures_lite::StreamExt;
40/// # use graphql_ws_client::__doc_utils::spawn;
41/// # async fn example() -> Result<(), graphql_ws_client::Error> {
42/// # let connection = graphql_ws_client::__doc_utils::Conn;
43/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
44///
45/// let (mut client, actor) = Client::build(connection).await?;
46///
47/// // Spawn the actor onto an async runtime
48/// spawn(actor.into_future());
49///
50/// let mut subscription = client.subscribe(subscription).await?;
51///
52/// while let Some(response) = subscription.next().await {
53///     // Do something with response
54/// }
55/// # Ok(())
56/// # }
57#[derive(Clone)]
58pub struct Client {
59    actor: async_channel::Sender<ConnectionCommand>,
60    drop_sender: async_channel::Sender<SubscriptionId>,
61    subscription_buffer_size: usize,
62    next_id: Arc<AtomicUsize>,
63}
64
65impl Client {
66    pub(super) fn new_internal(
67        actor: async_channel::Sender<ConnectionCommand>,
68        drop_sender: async_channel::Sender<SubscriptionId>,
69        subscription_buffer_size: usize,
70    ) -> Self {
71        Client {
72            actor,
73            drop_sender,
74            subscription_buffer_size,
75            next_id: Arc::new(AtomicUsize::new(1)),
76        }
77    }
78
79    /// Starts a streaming operation on this client.
80    ///
81    /// Returns a `Stream` of responses.
82    pub async fn subscribe<Operation>(
83        &self,
84        op: Operation,
85    ) -> Result<Subscription<Operation>, Error>
86    where
87        Operation: GraphqlOperation + Unpin + Send + 'static,
88    {
89        let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
90
91        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
92
93        let message = protocol::Message::Subscribe {
94            id: id.to_string(),
95            payload: &op,
96        };
97
98        let request = serde_json::to_string(&message)
99            .map_err(|error| Error::Serializing(error.to_string()))?;
100
101        let id = SubscriptionId::new(id).ok_or(Error::ConnectionIdsExhausted)?;
102
103        let actor = self.actor.clone();
104        actor
105            .send(ConnectionCommand::Subscribe {
106                request,
107                sender,
108                id,
109            })
110            .await
111            .map_err(|error| Error::Send(error.to_string()))?;
112
113        Ok(Subscription::<Operation> {
114            id,
115            stream: Some(Box::pin(receiver.map(move |response| {
116                op.decode(response)
117                    .map_err(|err| Error::Decode(err.to_string()))
118            }))),
119            actor,
120            drop_sender: Some(self.drop_sender.clone()),
121        })
122    }
123
124    /// Stops a subscription by id
125    ///
126    /// # Errors
127    ///
128    /// Will return `Err` if the connection actor has already been shut down.
129    pub async fn stop(&self, subscription_id: SubscriptionId) -> Result<(), Error> {
130        self.actor
131            .send(ConnectionCommand::Cancel(subscription_id))
132            .await
133            .map_err(|error| Error::Send(error.to_string()))
134    }
135
136    /// Gracefully closes the connection
137    ///
138    /// This will stop all running subscriptions and shut down the [`ConnectionActor`] wherever
139    /// it is running.
140    pub async fn close(self, code: u16, description: impl Into<String>) {
141        self.actor
142            .send(ConnectionCommand::Close(code, description.into()))
143            .await
144            .ok();
145    }
146}
147
148pub(super) enum ConnectionCommand {
149    Subscribe {
150        /// The full subscribe request as a JSON encoded string.
151        request: String,
152        sender: async_channel::Sender<Value>,
153        id: SubscriptionId,
154    },
155    Ping,
156    Cancel(SubscriptionId),
157    Close(u16, String),
158}
159
160impl fmt::Debug for Client {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        f.debug_struct("Client")
163            .field("subscription_buffer_size", &self.subscription_buffer_size)
164            .finish_non_exhaustive()
165    }
166}