use std::{
fmt,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use futures_lite::StreamExt;
use serde_json::Value;
use crate::{
Error,
graphql::GraphqlOperation,
protocol::{self},
};
mod actor;
mod builder;
mod conection_id;
mod connection;
mod keepalive;
mod production_future;
mod subscription;
pub use self::{
actor::ConnectionActor,
builder::ClientBuilder,
conection_id::SubscriptionId,
connection::{Connection, Message},
subscription::Subscription,
};
#[derive(Clone)]
pub struct Client {
actor: async_channel::Sender<ConnectionCommand>,
drop_sender: async_channel::Sender<SubscriptionId>,
subscription_buffer_size: usize,
next_id: Arc<AtomicUsize>,
}
impl Client {
pub(super) fn new_internal(
actor: async_channel::Sender<ConnectionCommand>,
drop_sender: async_channel::Sender<SubscriptionId>,
subscription_buffer_size: usize,
) -> Self {
Client {
actor,
drop_sender,
subscription_buffer_size,
next_id: Arc::new(AtomicUsize::new(1)),
}
}
pub async fn subscribe<Operation>(
&self,
op: Operation,
) -> Result<Subscription<Operation>, Error>
where
Operation: GraphqlOperation + Unpin + Send + 'static,
{
let (sender, receiver) = async_channel::bounded(self.subscription_buffer_size);
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let message = protocol::Message::Subscribe {
id: id.to_string(),
payload: &op,
};
let request = serde_json::to_string(&message)
.map_err(|error| Error::Serializing(error.to_string()))?;
let id = SubscriptionId::new(id).ok_or(Error::ConnectionIdsExhausted)?;
let actor = self.actor.clone();
actor
.send(ConnectionCommand::Subscribe {
request,
sender,
id,
})
.await
.map_err(|error| Error::Send(error.to_string()))?;
Ok(Subscription::<Operation> {
id,
stream: Some(Box::pin(receiver.map(move |response| {
op.decode(response)
.map_err(|err| Error::Decode(err.to_string()))
}))),
actor,
drop_sender: Some(self.drop_sender.clone()),
})
}
pub async fn stop(&self, subscription_id: SubscriptionId) -> Result<(), Error> {
self.actor
.send(ConnectionCommand::Cancel(subscription_id))
.await
.map_err(|error| Error::Send(error.to_string()))
}
pub async fn close(self, code: u16, description: impl Into<String>) {
self.actor
.send(ConnectionCommand::Close(code, description.into()))
.await
.ok();
}
}
pub(super) enum ConnectionCommand {
Subscribe {
request: String,
sender: async_channel::Sender<Value>,
id: SubscriptionId,
},
Ping,
Cancel(SubscriptionId),
Close(u16, String),
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("subscription_buffer_size", &self.subscription_buffer_size)
.finish_non_exhaustive()
}
}