canary/
type_iter.rs

1//! Here be dragons
2
3use std::marker::PhantomData;
4
5use serde::{de::DeserializeOwned, Serialize};
6
7use crate::Channel;
8
9/// used for internals.
10/// `pipe!(send i32, receive u32)` -> `TypeIter<Tx<i32>, TypeIter<Rx<u32>>>`
11#[macro_export]
12macro_rules! pipe {
13    (send $t: ty) => {
14        $crate::type_iter::TypeIter<$crate::type_iter::Tx<$t>>
15    };
16    (receive $t: ty) => {
17        $crate::type_iter::TypeIter<$crate::type_iter::Rx<$t>>
18    };
19    (send $t: ty) => {
20        $crate::type_iter::TypeIter<$crate::type_iter::Tx<$t>>
21    };
22    (receive $t: ty) => {
23        $crate::type_iter::TypeIter<$crate::type_iter::Rx<$t>>
24    };
25
26    (send $t: ty, $($lit: ident $s: ty),*) => {
27        $crate::type_iter::TypeIter<$crate::type_iter::Tx<$t>, $crate::pipe!($($lit $s),*)>
28    };
29    (receive $t: ty, $($lit: ident $s: ty),*) => {
30        $crate::type_iter::TypeIter<$crate::type_iter::Rx<$t>, $crate::pipe!($($lit $s),*)>
31    };
32    (send $t: ty, $($lit: ident $s: ty),*) => {
33        $crate::type_iter::TypeIter<$crate::type_iter::Tx<$t>, $crate::pipe!($($lit $s),*)>
34    };
35    (receive $t: ty, $($lit: ident $s: ty),*) => {
36        $crate::type_iter::TypeIter<$crate::type_iter::Rx<$t>, $crate::pipe!($($lit $s),*)>
37    };
38}
39
40/// shorten send calls
41/// ```no_run
42/// send!(pipe, 2); // this is equivalent to this
43/// let pipe = pipe.send(2).await?;
44/// ```
45#[macro_export]
46macro_rules! send {
47    ($i: ident, $e: expr) => {
48        #[allow(unused_variables)] // disable unused warning
49        let $i = $i.send($e).await?;
50    };
51}
52
53/// shorten receive calls
54/// ```no_run
55/// receive!(res, pipe); // this is equivalent to this
56/// let (res, pipe) = pipe.receive().await?;
57/// ```
58#[macro_export]
59macro_rules! receive {
60    ($i: ident, $e: ident) => {
61        let ($i, $e) = $e.receive().await?;
62        #[allow(unused_variables)] // disable unused warning
63        let $e = $e;
64    };
65}
66
67///
68/// Declares pipelines.
69/// Pipelines are used to guarantee that communication is correct at compile-time.
70/// ```no_run
71/// pipeline! {
72///     pub pipeline MyPipeline {
73///         send String,
74///         receive String,
75///     }
76/// }
77/// ```
78#[macro_export]
79macro_rules! pipeline {
80    () => {};
81    (
82        $v: vis pipeline $i: ident {
83            $($lit: ident $s: ty),*
84            $(,)?
85        }
86    ) => {
87        $v struct $i;
88        impl $crate::type_iter::Pipeline for $i {
89            type Pipe = $crate::pipe!($($lit $s),*);
90        }
91    };
92}
93
94/// used for iterating over types
95pub trait TypeIterT {
96    /// next type iterator
97    type Next;
98    /// current value of node
99    type Type;
100}
101
102impl TypeIterT for () {
103    type Next = ();
104    type Type = ();
105}
106
107#[derive(Default)]
108/// type iterator which allows compile-time magic
109pub struct TypeIter<T, L = ()>(PhantomData<T>, PhantomData<L>);
110impl<T, L: TypeIterT> TypeIterT for TypeIter<T, L> {
111    type Next = L;
112    type Type = T;
113}
114
115/// trait that represents send or send in pipelines
116pub trait Transmit {
117    /// type that can be transmitted
118    type Type;
119}
120/// trait that represents receive or receive in pipelines
121pub trait Receive {
122    /// type that can be received
123    type Type;
124}
125
126impl<T> Transmit for Tx<T> {
127    type Type = T;
128}
129impl<T> Receive for Rx<T> {
130    type Type = T;
131}
132
133/// type iterator that represents a type to be sent
134pub struct Tx<T>(T);
135/// type iterator that represents a type to be received
136pub struct Rx<T>(T);
137
138/// used for constructing pipelines
139pub trait Pipeline {
140    /// inner pipeline
141    type Pipe: TypeIterT;
142}
143
144impl Pipeline for () {
145    type Pipe = ();
146}
147
148/// optimization to allow &str to be sent whenever a String needs to be received
149pub trait Str {}
150impl Str for Tx<String> {}
151impl Str for Tx<&str> {}
152
153/// optimization to allow &str to be sent whenever a Vec needs to be received
154pub trait Slice<T> {}
155impl<T> Slice<T> for Tx<&[T]> {}
156impl<T> Slice<T> for Tx<Vec<T>> {}
157
158/// Used for writing services, peer services should use PeerChannel.
159pub struct MainChannel<T: TypeIterT>(pub(crate) PhantomData<T>, pub(crate) Channel);
160
161impl<T: TypeIterT> MainChannel<T> {
162    /// construct a new main channel
163    pub fn new<P: Pipeline>(chan: Channel) -> MainChannel<P::Pipe> {
164        MainChannel(PhantomData, chan)
165    }
166    /// send an object through the stream and iterate to the next type
167    pub async fn send(
168        mut self,
169        obj: <T::Type as Transmit>::Type,
170    ) -> crate::Result<MainChannel<T::Next>>
171    where
172        T::Type: Transmit,
173        <T as TypeIterT>::Next: TypeIterT,
174        <<T as TypeIterT>::Type as Transmit>::Type: Serialize + Send,
175    {
176        self.1.send(obj).await?;
177        Ok(MainChannel(PhantomData, self.1))
178    }
179    /// receive an object from the stream and iterate to the next type
180    pub async fn receive(
181        mut self,
182    ) -> crate::Result<(<T::Type as Receive>::Type, MainChannel<T::Next>)>
183    where
184        T::Type: Receive,
185        <T as TypeIterT>::Next: TypeIterT,
186        <T::Type as Receive>::Type: DeserializeOwned,
187    {
188        let res = self.1.receive::<<T::Type as Receive>::Type>().await?;
189        let chan = MainChannel(PhantomData, self.1);
190        Ok((res, chan))
191    }
192    /// coerce into a different kind of channel:
193    pub fn coerce(self) -> Channel {
194        self.1
195    }
196    /// send a str through the stream, this is an optimization done for pipelines receiving String
197    /// to make sure an unnecessary allocation is not made
198    pub async fn send_str(mut self, obj: &str) -> crate::Result<MainChannel<T::Next>>
199    where
200        T::Type: Transmit + Str,
201        <T as TypeIterT>::Next: TypeIterT,
202        <<T as TypeIterT>::Type as Transmit>::Type: Serialize + Send,
203    {
204        self.1.send(obj).await?;
205        Ok(MainChannel(PhantomData, self.1))
206    }
207
208    /// send a slice through the stream, this is an optimization done for pipelines receiving Vec<T>
209    /// to make sure an unnecessary allocation is not made
210    pub async fn send_slice(mut self, obj: &[T::Type]) -> crate::Result<MainChannel<T::Next>>
211    where
212        T::Type: Transmit + Slice<T::Type> + Serialize,
213        <T as TypeIterT>::Next: TypeIterT,
214        <<T as TypeIterT>::Type as Transmit>::Type: Serialize + Send,
215    {
216        self.1.send(obj).await?;
217        Ok(MainChannel(PhantomData, self.1))
218    }
219}
220
221/// Used for consuming services. Services should use MainChannel.
222pub struct PeerChannel<T: TypeIterT>(pub(crate) PhantomData<T>, pub(crate) Channel);
223
224impl<T: TypeIterT> PeerChannel<T> {
225    /// construct a new peer channel
226    pub fn new<P: Pipeline>(chan: Channel) -> PeerChannel<P::Pipe>
227    where
228        <P as Pipeline>::Pipe: TypeIterT,
229    {
230        PeerChannel(PhantomData, chan)
231    }
232    /// send an object through the stream and iterate to the next type
233    pub async fn send(
234        mut self,
235        obj: <T::Type as Receive>::Type,
236    ) -> crate::Result<PeerChannel<T::Next>>
237    where
238        T::Type: Receive,
239        <T as TypeIterT>::Next: TypeIterT,
240        <<T as TypeIterT>::Type as Receive>::Type: Serialize + Send,
241    {
242        self.1.send(obj).await?;
243        Ok(PeerChannel(PhantomData, self.1))
244    }
245
246    /// receive an object from the stream and iterate to the next type
247    pub async fn receive(
248        mut self,
249    ) -> crate::Result<(<T::Type as Transmit>::Type, PeerChannel<T::Next>)>
250    where
251        T::Type: Transmit,
252        <T as TypeIterT>::Next: TypeIterT,
253        <T::Type as Transmit>::Type: DeserializeOwned + 'static,
254    {
255        let res = self.1.receive::<<T::Type as Transmit>::Type>().await?;
256        let chan = PeerChannel(PhantomData, self.1);
257        Ok((res, chan))
258    }
259    /// coerce into a different kind of channel:
260    pub fn channel(self) -> Channel {
261        self.1
262    }
263    /// send a str through the stream, this is an optimization done for pipelines receiving String
264    /// to make sure an unnecessary allocation is not made
265    pub async fn send_str(mut self, obj: &str) -> crate::Result<PeerChannel<T::Next>>
266    where
267        T::Type: Transmit + Str,
268        <T as TypeIterT>::Next: TypeIterT,
269        <<T as TypeIterT>::Type as Transmit>::Type: Serialize + Send,
270    {
271        self.1.send(obj).await?;
272        Ok(PeerChannel(PhantomData, self.1))
273    }
274    /// send a slice through the stream, this is an optimization done for pipelines receiving Vec<T>
275    /// to make sure an unnecessary allocation is not made
276    pub async fn send_slice(mut self, obj: &[T::Type]) -> crate::Result<PeerChannel<T::Next>>
277    where
278        T::Type: Transmit + Slice<T::Type> + Serialize,
279        <T as TypeIterT>::Next: TypeIterT,
280        <<T as TypeIterT>::Type as Transmit>::Type: Serialize + Send,
281    {
282        self.1.send(obj).await?;
283        Ok(PeerChannel(PhantomData, self.1))
284    }
285}