use crate::{
pubsub::{
shared::{ConnectionManager, ListenerTask, DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT},
ServerShutdown,
},
TaskSet,
};
use bytes::Bytes;
use serde_json::value::RawValue;
use std::future::Future;
use tokio::runtime::Handle;
use tokio_stream::Stream;
pub type Out<T> = <T as Listener>::RespSink;
pub type In<T> = <T as Listener>::ReqStream;
pub trait Connect: Send + Sync + Sized {
type Listener: Listener;
type Error: core::error::Error + 'static;
fn make_listener(self) -> impl Future<Output = Result<Self::Listener, Self::Error>> + Send;
fn notification_buffer_size(&self) -> usize {
DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT
}
fn serve_with_handle(
self,
router: crate::Router<()>,
handle: Handle,
) -> impl Future<Output = Result<ServerShutdown, Self::Error>> + Send {
async move {
let root_tasks: TaskSet = handle.into();
let notification_buffer_per_task = self.notification_buffer_size();
ListenerTask {
listener: self.make_listener().await?,
manager: ConnectionManager {
next_id: 0,
router,
notification_buffer_per_task,
root_tasks: root_tasks.clone(),
},
}
.spawn();
Ok(root_tasks.into())
}
}
fn serve(
self,
router: crate::Router<()>,
) -> impl Future<Output = Result<ServerShutdown, Self::Error>> + Send {
self.serve_with_handle(router, Handle::current())
}
}
pub trait Listener: Send + 'static {
type RespSink: JsonSink;
type ReqStream: JsonReqStream;
type Error: core::error::Error;
fn accept(
&self,
) -> impl Future<Output = Result<(Self::RespSink, Self::ReqStream), Self::Error>> + Send;
}
pub trait JsonSink: Send + 'static {
type Error: core::error::Error + 'static;
fn send_json(
&mut self,
json: Box<RawValue>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
impl JsonSink for tokio::sync::mpsc::Sender<Box<RawValue>> {
type Error = tokio::sync::mpsc::error::SendError<Box<RawValue>>;
fn send_json(
&mut self,
json: Box<RawValue>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.send(json)
}
}
pub trait JsonReqStream: Stream<Item = Bytes> + Send + Unpin + 'static {}
impl<T> JsonReqStream for T where T: Stream<Item = Bytes> + Send + Unpin + 'static {}