misskey_core/
streaming.rs

1//! Streaming API.
2
3use std::pin::Pin;
4
5use futures_core::{
6    future::BoxFuture,
7    stream::{BoxStream, Stream},
8};
9use futures_sink::Sink;
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12
13/// Trait for [`Stream`] + [`Sink`].
14///
15/// We need this for [`BoxStreamSink`] because trait objects can only have a single base trait. ([reference])
16///
17/// [reference]: https://doc.rust-lang.org/reference/types/trait-object.html
18pub trait StreamSink<I, O, E>: Stream<Item = Result<I, E>> + Sink<O, Error = E> {}
19impl<I, O, E, S: ?Sized> StreamSink<I, O, E> for S where
20    S: Sink<O, Error = E> + Stream<Item = Result<I, E>>
21{
22}
23
24/// An owned dynamically typed [`Stream`] + [`Sink`] for use in cases where we can't statically
25/// type the result.
26pub type BoxStreamSink<'a, I, O, E> = Pin<Box<dyn StreamSink<I, O, E> + 'a + Send>>;
27
28/// Stream for the [`StreamingClient::subnote`] method.
29pub type SubNoteStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
30/// Stream for the [`StreamingClient::channel`] method.
31pub type ChannelStream<'a, T, E> = BoxStreamSink<
32    'a,
33    <T as ConnectChannelRequest>::Incoming,
34    <T as ConnectChannelRequest>::Outgoing,
35    E,
36>;
37/// Stream for the [`StreamingClient::broadcast`] method.
38pub type BroadcastStream<'a, T, E> = BoxStream<'a, Result<T, E>>;
39
40/// Abstraction over API clients with streaming connections.
41pub trait StreamingClient {
42    /// The error type produced by the client when an error occurs.
43    type Error: std::error::Error;
44
45    /// Captures the note specified by `note_id`.
46    fn subnote<E: SubNoteEvent>(
47        &self,
48        note_id: String,
49    ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>>;
50
51    /// Connects to the channel using `request`.
52    fn channel<R: ConnectChannelRequest>(
53        &self,
54        request: R,
55    ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>>;
56
57    /// Receive messages from the broadcast stream.
58    fn broadcast<E: BroadcastEvent>(
59        &self,
60    ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>>;
61}
62
63impl<C: ?Sized> StreamingClient for &C
64where
65    C: StreamingClient,
66{
67    type Error = C::Error;
68
69    fn subnote<E: SubNoteEvent>(
70        &self,
71        note_id: String,
72    ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
73        C::subnote(self, note_id)
74    }
75
76    fn channel<R: ConnectChannelRequest>(
77        &self,
78        request: R,
79    ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
80        C::channel(self, request)
81    }
82
83    fn broadcast<E: BroadcastEvent>(
84        &self,
85    ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
86        C::broadcast(self)
87    }
88}
89
90impl<C: ?Sized> StreamingClient for &mut C
91where
92    C: StreamingClient,
93{
94    type Error = C::Error;
95
96    fn subnote<E: SubNoteEvent>(
97        &self,
98        note_id: String,
99    ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
100        C::subnote(self, note_id)
101    }
102
103    fn channel<R: ConnectChannelRequest>(
104        &self,
105        request: R,
106    ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
107        C::channel(self, request)
108    }
109
110    fn broadcast<E: BroadcastEvent>(
111        &self,
112    ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
113        C::broadcast(self)
114    }
115}
116
117impl<C: ?Sized> StreamingClient for Box<C>
118where
119    C: StreamingClient,
120{
121    type Error = C::Error;
122
123    fn subnote<E: SubNoteEvent>(
124        &self,
125        note_id: String,
126    ) -> BoxFuture<Result<SubNoteStream<E, Self::Error>, Self::Error>> {
127        C::subnote(self, note_id)
128    }
129
130    fn channel<R: ConnectChannelRequest>(
131        &self,
132        request: R,
133    ) -> BoxFuture<Result<ChannelStream<R, Self::Error>, Self::Error>> {
134        C::channel(self, request)
135    }
136
137    fn broadcast<E: BroadcastEvent>(
138        &self,
139    ) -> BoxFuture<Result<BroadcastStream<E, Self::Error>, Self::Error>> {
140        C::broadcast(self)
141    }
142}
143
144/// Request to connect to the channel.
145///
146/// It's similar to `Request` but for connecting to a channel.
147pub trait ConnectChannelRequest: Serialize {
148    /// Type of the data we receive from the channel.
149    type Incoming: DeserializeOwned + 'static;
150    /// Type of the data we send to the channel.
151    type Outgoing: Serialize + 'static;
152
153    /// The name of the channel to be connected by this request.
154    const NAME: &'static str;
155}
156
157impl<R: ?Sized> ConnectChannelRequest for &'_ R
158where
159    R: ConnectChannelRequest,
160{
161    type Incoming = R::Incoming;
162    type Outgoing = R::Outgoing;
163
164    const NAME: &'static str = R::NAME;
165}
166
167impl<R: ?Sized> ConnectChannelRequest for &'_ mut R
168where
169    R: ConnectChannelRequest,
170{
171    type Incoming = R::Incoming;
172    type Outgoing = R::Outgoing;
173
174    const NAME: &'static str = R::NAME;
175}
176
177impl<R: ?Sized> ConnectChannelRequest for Box<R>
178where
179    R: ConnectChannelRequest,
180{
181    type Incoming = R::Incoming;
182    type Outgoing = R::Outgoing;
183
184    const NAME: &'static str = R::NAME;
185}
186
187/// Events you receive with a subscription to the note.
188///
189/// This unquestionably corresponds to `NoteUpdateEvent` in [misskey-api](https://docs.rs/misskey-api).
190/// We treat it abstractly here since [misskey-core](https://docs.rs/misskey-core) cannot depend on [misskey-api](https://docs.rs/misskey-api).
191pub trait SubNoteEvent: DeserializeOwned + 'static {}
192
193/// Events you receive from broadcast stream.
194pub trait BroadcastEvent: DeserializeOwned + 'static {
195    /// Name of this event in the broadcast stream.
196    const TYPE: &'static str;
197}