1use std::pin::Pin;
4
5use futures_core::{
6 future::BoxFuture,
7 stream::{BoxStream, Stream},
8};
9use futures_sink::Sink;
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12
13pub trait StreamSink<I, O, E>: Stream<Item = Result<I, E>> + Sink<O, Error = E> {}
19impl<I, O, E, S: ?Sized> StreamSink<I, O, E> for S where
20 S: Sink<O, Error = E> + Stream<Item = Result<I, E>>
21{
22}
23
24pub type BoxStreamSink<'a, I, O, E> = Pin<Box<dyn StreamSink<I, O, E> + 'a + Send>>;
27
28pub type SubNoteStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
30pub type ChannelStream<'a, T, E> = BoxStreamSink<
32 'a,
33 <T as ConnectChannelRequest>::Incoming,
34 <T as ConnectChannelRequest>::Outgoing,
35 E,
36>;
37pub type BroadcastStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
39
40pub trait StreamingClient {
42 type Error: std::error::Error;
44
45 fn subnote<E: SubNoteEvent>(
47 &self,
48 note_id: String,
49 ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>>;
50
51 fn channel<R: ConnectChannelRequest>(
53 &self,
54 request: R,
55 ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>>;
56
57 fn broadcast<E: BroadcastEvent>(
59 &self,
60 ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>>;
61}
62
63impl<C: ?Sized> StreamingClient for &C
64where
65 C: StreamingClient,
66{
67 type Error = C::Error;
68
69 fn subnote<E: SubNoteEvent>(
70 &self,
71 note_id: String,
72 ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
73 C::subnote(self, note_id)
74 }
75
76 fn channel<R: ConnectChannelRequest>(
77 &self,
78 request: R,
79 ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
80 C::channel(self, request)
81 }
82
83 fn broadcast<E: BroadcastEvent>(
84 &self,
85 ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
86 C::broadcast(self)
87 }
88}
89
90impl<C: ?Sized> StreamingClient for &mut C
91where
92 C: StreamingClient,
93{
94 type Error = C::Error;
95
96 fn subnote<E: SubNoteEvent>(
97 &self,
98 note_id: String,
99 ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
100 C::subnote(self, note_id)
101 }
102
103 fn channel<R: ConnectChannelRequest>(
104 &self,
105 request: R,
106 ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
107 C::channel(self, request)
108 }
109
110 fn broadcast<E: BroadcastEvent>(
111 &self,
112 ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
113 C::broadcast(self)
114 }
115}
116
117impl<C: ?Sized> StreamingClient for Box<C>
118where
119 C: StreamingClient,
120{
121 type Error = C::Error;
122
123 fn subnote<E: SubNoteEvent>(
124 &self,
125 note_id: String,
126 ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
127 C::subnote(self, note_id)
128 }
129
130 fn channel<R: ConnectChannelRequest>(
131 &self,
132 request: R,
133 ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
134 C::channel(self, request)
135 }
136
137 fn broadcast<E: BroadcastEvent>(
138 &self,
139 ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
140 C::broadcast(self)
141 }
142}
143
144pub trait ConnectChannelRequest: Serialize {
148 type Incoming: DeserializeOwned + 'static;
150 type Outgoing: Serialize + 'static;
152
153 const NAME: &'static str;
155}
156
157impl<R: ?Sized> ConnectChannelRequest for &'_ R
158where
159 R: ConnectChannelRequest,
160{
161 type Incoming = R::Incoming;
162 type Outgoing = R::Outgoing;
163
164 const NAME: &'static str = R::NAME;
165}
166
167impl<R: ?Sized> ConnectChannelRequest for &'_ mut R
168where
169 R: ConnectChannelRequest,
170{
171 type Incoming = R::Incoming;
172 type Outgoing = R::Outgoing;
173
174 const NAME: &'static str = R::NAME;
175}
176
177impl<R: ?Sized> ConnectChannelRequest for Box<R>
178where
179 R: ConnectChannelRequest,
180{
181 type Incoming = R::Incoming;
182 type Outgoing = R::Outgoing;
183
184 const NAME: &'static str = R::NAME;
185}
186
187pub trait SubNoteEvent: DeserializeOwned + 'static {}
192
193pub trait BroadcastEvent: DeserializeOwned + 'static {
195 const TYPE: &'static str;
197}