use std::pin::Pin;
use futures_core::{
future::BoxFuture,
stream::{BoxStream, Stream},
};
use futures_sink::Sink;
use serde::de::DeserializeOwned;
use serde::Serialize;
pub trait StreamSink<I, O, E>: Stream<Item = Result<I, E>> + Sink<O, Error = E> {}
impl<I, O, E, S: ?Sized> StreamSink<I, O, E> for S where
S: Sink<O, Error = E> + Stream<Item = Result<I, E>>
{
}
pub type BoxStreamSink<'a, I, O, E> = Pin<Box<dyn StreamSink<I, O, E> + 'a + Send>>;
pub type SubNoteStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
pub type ChannelStream<'a, T, E> = BoxStreamSink<
'a,
<T as ConnectChannelRequest>::Incoming,
<T as ConnectChannelRequest>::Outgoing,
E,
>;
pub type BroadcastStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
pub trait StreamingClient {
type Error: std::error::Error;
fn subnote<E: SubNoteEvent>(
&self,
note_id: String,
) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>>;
fn channel<R: ConnectChannelRequest>(
&self,
request: R,
) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>>;
fn broadcast<E: BroadcastEvent>(
&self,
) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>>;
}
impl<C: ?Sized> StreamingClient for &C
where
C: StreamingClient,
{
type Error = C::Error;
fn subnote<E: SubNoteEvent>(
&self,
note_id: String,
) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
C::subnote(self, note_id)
}
fn channel<R: ConnectChannelRequest>(
&self,
request: R,
) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
C::channel(self, request)
}
fn broadcast<E: BroadcastEvent>(
&self,
) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
C::broadcast(self)
}
}
impl<C: ?Sized> StreamingClient for &mut C
where
C: StreamingClient,
{
type Error = C::Error;
fn subnote<E: SubNoteEvent>(
&self,
note_id: String,
) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
C::subnote(self, note_id)
}
fn channel<R: ConnectChannelRequest>(
&self,
request: R,
) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
C::channel(self, request)
}
fn broadcast<E: BroadcastEvent>(
&self,
) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
C::broadcast(self)
}
}
impl<C: ?Sized> StreamingClient for Box<C>
where
C: StreamingClient,
{
type Error = C::Error;
fn subnote<E: SubNoteEvent>(
&self,
note_id: String,
) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
C::subnote(self, note_id)
}
fn channel<R: ConnectChannelRequest>(
&self,
request: R,
) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
C::channel(self, request)
}
fn broadcast<E: BroadcastEvent>(
&self,
) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
C::broadcast(self)
}
}
pub trait ConnectChannelRequest: Serialize {
type Incoming: DeserializeOwned + 'static;
type Outgoing: Serialize + 'static;
const NAME: &'static str;
}
impl<R: ?Sized> ConnectChannelRequest for &'_ R
where
R: ConnectChannelRequest,
{
type Incoming = R::Incoming;
type Outgoing = R::Outgoing;
const NAME: &'static str = R::NAME;
}
impl<R: ?Sized> ConnectChannelRequest for &'_ mut R
where
R: ConnectChannelRequest,
{
type Incoming = R::Incoming;
type Outgoing = R::Outgoing;
const NAME: &'static str = R::NAME;
}
impl<R: ?Sized> ConnectChannelRequest for Box<R>
where
R: ConnectChannelRequest,
{
type Incoming = R::Incoming;
type Outgoing = R::Outgoing;
const NAME: &'static str = R::NAME;
}
pub trait SubNoteEvent: DeserializeOwned + 'static {}
pub trait BroadcastEvent: DeserializeOwned + 'static {
const TYPE: &'static str;
}