1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use std::{
fmt,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use futures_lite::StreamExt;
use serde_json::Value;
use crate::{
graphql::GraphqlOperation,
protocol::{self},
Error,
};
mod actor;
mod builder;
mod connection;
mod keepalive;
mod production_future;
mod stream;
pub use self::{
actor::ConnectionActor,
builder::ClientBuilder,
connection::{Connection, Message},
stream::Subscription,
};
/// A GraphQL over Websocket client
///
/// ```rust,no_run
/// use graphql_ws_client::Client;
/// use std::future::IntoFuture;
/// use futures_lite::StreamExt;
/// # use graphql_ws_client::__doc_utils::spawn;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
///
/// let (mut client, actor) = Client::build(connection).await?;
///
/// // Spawn the actor onto an async runtime
/// spawn(actor.into_future());
///
/// let mut subscription = client.subscribe(subscription).await?;
///
/// while let Some(response) = subscription.next().await {
/// // Do something with response
/// }
/// # Ok(())
/// # }
#[derive(Clone)]
pub struct Client {
actor: async_channel::Sender<ConnectionCommand>,
subscription_buffer_size: usize,
next_id: Arc<AtomicUsize>,
}
impl Client {
pub(super) fn new_internal(
actor: async_channel::Sender<ConnectionCommand>,
subscription_buffer_size: usize,
) -> Self {
Client {
actor,
subscription_buffer_size,
next_id: Arc::new(AtomicUsize::new(0)),
}
}
// Starts a streaming operation on this client.
///
/// Returns a `Stream` of responses.
pub async fn subscribe<'a, Operation>(
&self,
op: Operation,
) -> Result<Subscription<Operation>, Error>
where
Operation: GraphqlOperation + Unpin + Send + 'static,
{
let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let message = protocol::Message::Subscribe {
id: id.to_string(),
payload: &op,
};
let request = serde_json::to_string(&message)
.map_err(|error| Error::Serializing(error.to_string()))?;
let actor = self.actor.clone();
actor
.send(ConnectionCommand::Subscribe {
request,
sender,
id,
})
.await
.map_err(|error| Error::Send(error.to_string()))?;
Ok(Subscription::<Operation> {
id,
stream: Box::pin(receiver.map(move |response| {
op.decode(response)
.map_err(|err| Error::Decode(err.to_string()))
})),
actor,
})
}
/// Gracefully closes the connection
///
/// This will stop all running subscriptions and shut down the ConnectionActor wherever
/// it is running.
pub async fn close(self, code: u16, description: impl Into<String>) {
self.actor
.send(ConnectionCommand::Close(code, description.into()))
.await
.ok();
}
}
pub(super) enum ConnectionCommand {
Subscribe {
/// The full subscribe request as a JSON encoded string.
request: String,
sender: async_channel::Sender<Value>,
id: usize,
},
Ping,
Cancel(usize),
Close(u16, String),
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("subscription_buffer_size", &self.subscription_buffer_size)
.finish_non_exhaustive()
}
}