graphql_ws_client/next/
mod.rs

1use std::{
2    fmt,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7};
8
9use futures_lite::StreamExt;
10use serde_json::Value;
11
12use crate::{
13    graphql::GraphqlOperation,
14    protocol::{self},
15    Error,
16};
17
18mod actor;
19mod builder;
20mod connection;
21mod keepalive;
22mod production_future;
23mod stream;
24
25pub use self::{
26    actor::ConnectionActor,
27    builder::ClientBuilder,
28    connection::{Connection, Message},
29    stream::Subscription,
30};
31
32/// A GraphQL over Websocket client
33///
34/// ```rust,no_run
35/// use graphql_ws_client::Client;
36/// use std::future::IntoFuture;
37/// use futures_lite::StreamExt;
38/// # use graphql_ws_client::__doc_utils::spawn;
39/// # async fn example() -> Result<(), graphql_ws_client::Error> {
40/// # let connection = graphql_ws_client::__doc_utils::Conn;
41/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
42///
43/// let (mut client, actor) = Client::build(connection).await?;
44///
45/// // Spawn the actor onto an async runtime
46/// spawn(actor.into_future());
47///
48/// let mut subscription = client.subscribe(subscription).await?;
49///
50/// while let Some(response) = subscription.next().await {
51///     // Do something with response
52/// }
53/// # Ok(())
54/// # }
55#[derive(Clone)]
56pub struct Client {
57    actor: async_channel::Sender<ConnectionCommand>,
58    subscription_buffer_size: usize,
59    next_id: Arc<AtomicUsize>,
60}
61
62impl Client {
63    pub(super) fn new_internal(
64        actor: async_channel::Sender<ConnectionCommand>,
65        subscription_buffer_size: usize,
66    ) -> Self {
67        Client {
68            actor,
69            subscription_buffer_size,
70            next_id: Arc::new(AtomicUsize::new(0)),
71        }
72    }
73
74    /// Starts a streaming operation on this client.
75    ///
76    /// Returns a `Stream` of responses.
77    pub async fn subscribe<'a, Operation>(
78        &self,
79        op: Operation,
80    ) -> Result<Subscription<Operation>, Error>
81    where
82        Operation: GraphqlOperation + Unpin + Send + 'static,
83    {
84        let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
85
86        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
87
88        let message = protocol::Message::Subscribe {
89            id: id.to_string(),
90            payload: &op,
91        };
92
93        let request = serde_json::to_string(&message)
94            .map_err(|error| Error::Serializing(error.to_string()))?;
95
96        let actor = self.actor.clone();
97        actor
98            .send(ConnectionCommand::Subscribe {
99                request,
100                sender,
101                id,
102            })
103            .await
104            .map_err(|error| Error::Send(error.to_string()))?;
105
106        Ok(Subscription::<Operation> {
107            id,
108            stream: Box::pin(receiver.map(move |response| {
109                op.decode(response)
110                    .map_err(|err| Error::Decode(err.to_string()))
111            })),
112            actor,
113        })
114    }
115
116    /// Gracefully closes the connection
117    ///
118    /// This will stop all running subscriptions and shut down the [`ConnectionActor`] wherever
119    /// it is running.
120    pub async fn close(self, code: u16, description: impl Into<String>) {
121        self.actor
122            .send(ConnectionCommand::Close(code, description.into()))
123            .await
124            .ok();
125    }
126}
127
128pub(super) enum ConnectionCommand {
129    Subscribe {
130        /// The full subscribe request as a JSON encoded string.
131        request: String,
132        sender: async_channel::Sender<Value>,
133        id: usize,
134    },
135    Ping,
136    Cancel(usize),
137    Close(u16, String),
138}
139
140impl fmt::Debug for Client {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        f.debug_struct("Client")
143            .field("subscription_buffer_size", &self.subscription_buffer_size)
144            .finish_non_exhaustive()
145    }
146}