spirit_tokio/
either.rs

1//! Support for alternative choices of configuration.
2
3use std::future::Future;
4use std::io::{Error as IoError, SeekFrom};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[cfg(feature = "either")]
9use either::Either as OtherEither;
10use err_context::AnyError;
11#[cfg(feature = "futures")]
12use futures_util::future::Either as FutEither;
13use pin_project::pin_project;
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use spirit::extension::Extensible;
17use spirit::fragment::driver::{Comparable, Comparison, Driver, Instruction};
18use spirit::fragment::{Fragment, Installer, Stackable, Transformation};
19#[cfg(feature = "cfg-help")]
20use structdoc::StructDoc;
21use structopt::StructOpt;
22use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
23#[cfg(feature = "stream")]
24use tokio_stream::Stream;
25
26#[cfg(feature = "net")]
27use crate::net::Accept;
28
29/// The [`Either`] type allows to wrap two similar [`Fragment`]s and let the user choose
30/// which one will be used.
31///
32/// For example, if your server could run both on common TCP and unix domain stream sockets, you
33/// could use the `Either<TcpListen, UnixListen>`. This fragment would then create resources of
34/// type `Either<TcpListener, UnixListener>`.
35///
36/// Many traits are delegated through to one or the other instance inside (in case both implement
37/// it). So, the above resource will implement the [`Accept`] trait that will accept
38/// instances of `Either<TcpStream, UnixStream>`. These'll in turn implement [`AsyncRead`] and
39/// [`AsyncWrite`], therefore can be handled uniformly just as connections.
40///
41/// # Deserialization
42///
43/// This uses the [untagged] serde attribute. This means there are no additional configuration
44/// options present and the choice is made by trying to first deserialize the [`A`] variant and
45/// if that fails, trying the [`B`] one. Therefore, the inner resource configs need to have some
46/// distinct fields. In our example, this would parse as [`TcpListen`]:
47///
48/// ```toml
49/// [[listen]]
50/// port = 1234
51/// ```
52///
53/// While this as an [`UnixListen`]:
54///
55/// ```toml
56/// [[listen]]
57/// path = "/tmp/socket"
58/// ```
59///
60/// If you need different parsing, you can use either a newtype or [remote derive].
61///
62/// # Other similar types
63///
64/// This is not the only `Either` type around. Unfortunately, none of the available ones was just
65/// right for the use case here, so this crate rolls its own. But it provides [`From`]/[`Into`]
66/// conversions between them, if the corresponding feature on this crate is enabled.
67///
68/// # More than two options
69///
70/// This allows only two variants. However, if you need more, it is possible to nest them and form
71/// a tree.
72///
73/// # Drawbacks
74///
75/// Due to the complexity of implementation, the [`Fragment`] is implemented for either only if
76/// both variants are [`Fragment`]s with simple enough [`Driver`]s (drivers that don't sub-divide
77/// their [`Fragment`]s). Therefore, `Vec<Either<TcpListen, UnixListen>>` will work, but
78/// `Either<Vec<TcpListen>, Vec<UnixListen>>` will not.
79///
80/// This is an implementation limitation and may be lifted in the future (PRs are welcome).
81///
82/// # Examples
83///
84/// ```rust
85/// use std::sync::Arc;
86///
87/// use serde::Deserialize;
88/// use spirit::{AnyError, Empty, Pipeline, Spirit};
89/// use spirit::prelude::*;
90/// #[cfg(unix)]
91/// use spirit_tokio::either::Either;
92/// use spirit_tokio::handlers::PerConnection;
93/// use spirit_tokio::net::TcpListen;
94/// #[cfg(unix)]
95/// use spirit_tokio::net::unix::UnixListen;
96/// use tokio::io::{AsyncWrite, AsyncWriteExt};
97/// use tokio::pin;
98///
99/// // If we want to work on systems that don't have unix domain sockets...
100///
101/// #[cfg(unix)]
102/// type Listener = Either<TcpListen, UnixListen>;
103/// #[cfg(not(unix))]
104/// type Listener = TcpListen;
105///
106/// const DEFAULT_CONFIG: &str = r#"
107/// [[listening_socket]]
108/// port = 1235
109/// max-conn = 20
110/// error-sleep = "100ms"
111/// "#;
112/// #[derive(Default, Deserialize)]
113/// struct Config {
114///     listening_socket: Vec<Listener>,
115/// }
116///
117/// impl Config {
118///     fn listen(&self) -> Vec<Listener> {
119///         self.listening_socket.clone()
120///     }
121/// }
122///
123/// async fn handle_connection<C: AsyncWrite>(conn: C) -> Result<(), AnyError> {
124///     pin!(conn);
125///     conn.write_all(b"hello world").await?;
126///     conn.shutdown().await?;
127///     Ok(())
128/// }
129///
130/// fn main() {
131///     let handler = PerConnection(|conn, _cfg: &_| async {
132///         if let Err(e) = handle_connection(conn).await {
133///             eprintln!("Error: {}", e);
134///         }
135///     });
136///     Spirit::<Empty, Config>::new()
137///         .config_defaults(DEFAULT_CONFIG)
138///         .with(Pipeline::new("listen").extract_cfg(Config::listen).transform(handler))
139///         .run(|spirit| {
140/// #           let spirit = Arc::clone(spirit);
141/// #           std::thread::spawn(move || spirit.terminate());
142///             Ok(())
143///         });
144/// }
145/// ```
146///
147/// [untagged]: https://serde.rs/container-attrs.html#untagged
148/// [remote derive]: https://serde.rs/remote-derive.html
149/// [`TcpListen`]: crate::TcpListen
150/// [`UnixListen`]: crate::net::unix::UnixListen
151#[pin_project(project = EitherProj)]
152#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
153#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
154#[serde(untagged)]
155pub enum Either<A, B> {
156    #[allow(missing_docs)]
157    A(#[pin] A),
158    #[allow(missing_docs)]
159    B(#[pin] B),
160}
161
162use self::Either::{A, B};
163
164macro_rules! either_unwrap {
165    ($enum: ident, $either: expr, $pat: pat => $res: expr) => {
166        match $either {
167            $enum::A($pat) => $res,
168            $enum::B($pat) => $res,
169        }
170    };
171}
172
173impl<T> Either<T, T> {
174    /// Extracts the inner value in case both have the same type.
175    ///
176    /// Sometimes, a series of operations produces an `Either` with both types the same. In such
177    /// case, `Either` plays no role anymore and this method can be used to get to the inner value.
178    pub fn into_inner(self) -> T {
179        either_unwrap!(Either, self, v => v)
180    }
181}
182
183#[cfg(feature = "futures")]
184impl<A, B> From<FutEither<A, B>> for Either<A, B> {
185    fn from(e: FutEither<A, B>) -> Self {
186        match e {
187            FutEither::Left(a) => A(a),
188            FutEither::Right(b) => B(b),
189        }
190    }
191}
192
193#[cfg(feature = "futures")]
194impl<A, B> From<Either<A, B>> for FutEither<A, B> {
195    fn from(either: Either<A, B>) -> FutEither<A, B> {
196        match either {
197            A(a) => FutEither::Left(a),
198            B(b) => FutEither::Right(b),
199        }
200    }
201}
202
203#[cfg(feature = "either")]
204impl<A, B> From<OtherEither<A, B>> for Either<A, B> {
205    fn from(e: OtherEither<A, B>) -> Self {
206        match e {
207            OtherEither::Left(a) => A(a),
208            OtherEither::Right(b) => B(b),
209        }
210    }
211}
212
213#[cfg(feature = "either")]
214impl<A, B> From<Either<A, B>> for OtherEither<A, B> {
215    fn from(either: Either<A, B>) -> OtherEither<A, B> {
216        match either {
217            A(a) => OtherEither::Left(a),
218            B(b) => OtherEither::Right(b),
219        }
220    }
221}
222
223#[cfg(feature = "net")]
224impl<A, B> Accept for Either<A, B>
225where
226    A: Accept,
227    B: Accept,
228{
229    type Connection = Either<A::Connection, B::Connection>;
230    fn poll_accept(
231        self: Pin<&mut Self>,
232        ctx: &mut Context,
233    ) -> Poll<Result<Self::Connection, IoError>> {
234        match self.project() {
235            EitherProj::A(a) => a.poll_accept(ctx).map(|r| r.map(A)),
236            EitherProj::B(b) => b.poll_accept(ctx).map(|r| r.map(B)),
237        }
238    }
239}
240
241impl<A, B> Future for Either<A, B>
242where
243    A: Future,
244    B: Future,
245{
246    type Output = Either<A::Output, B::Output>;
247    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
248        match self.project() {
249            EitherProj::A(a) => a.poll(ctx).map(A),
250            EitherProj::B(b) => b.poll(ctx).map(B),
251        }
252    }
253}
254
255#[cfg(feature = "stream")]
256impl<A, B> Stream for Either<A, B>
257where
258    A: Stream,
259    B: Stream,
260{
261    type Item = Either<A::Item, B::Item>;
262    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
263        match self.project() {
264            EitherProj::A(a) => a.poll_next(ctx).map(|i| i.map(A)),
265            EitherProj::B(b) => b.poll_next(ctx).map(|i| i.map(B)),
266        }
267    }
268}
269
270impl<A, B> AsyncBufRead for Either<A, B>
271where
272    A: AsyncBufRead,
273    B: AsyncBufRead,
274{
275    fn poll_fill_buf(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<&[u8], IoError>> {
276        either_unwrap!(EitherProj, self.project(), v => v.poll_fill_buf(ctx))
277    }
278    fn consume(self: Pin<&mut Self>, amt: usize) {
279        either_unwrap!(EitherProj, self.project(), v => v.consume(amt))
280    }
281}
282
283impl<A, B> AsyncRead for Either<A, B>
284where
285    A: AsyncRead,
286    B: AsyncRead,
287{
288    fn poll_read(
289        self: Pin<&mut Self>,
290        ctx: &mut Context,
291        buf: &mut ReadBuf,
292    ) -> Poll<Result<(), IoError>> {
293        either_unwrap!(EitherProj, self.project(), v => v.poll_read(ctx, buf))
294    }
295}
296
297impl<A, B> AsyncSeek for Either<A, B>
298where
299    A: AsyncSeek,
300    B: AsyncSeek,
301{
302    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<(), IoError> {
303        either_unwrap!(EitherProj, self.project(), v => v.start_seek(position))
304    }
305    fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<u64, IoError>> {
306        either_unwrap!(EitherProj, self.project(), v => v.poll_complete(ctx))
307    }
308}
309
310impl<A, B> AsyncWrite for Either<A, B>
311where
312    A: AsyncWrite,
313    B: AsyncWrite,
314{
315    fn poll_write(
316        self: Pin<&mut Self>,
317        ctx: &mut Context,
318        buf: &[u8],
319    ) -> Poll<Result<usize, IoError>> {
320        either_unwrap!(EitherProj, self.project(), v => v.poll_write(ctx, buf))
321    }
322
323    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<(), IoError>> {
324        either_unwrap!(EitherProj, self.project(), v => v.poll_flush(ctx))
325    }
326
327    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<(), IoError>> {
328        either_unwrap!(EitherProj, self.project(), v => v.poll_shutdown(ctx))
329    }
330
331    fn poll_write_vectored(
332        self: Pin<&mut Self>,
333        ctx: &mut Context,
334        bufs: &[std::io::IoSlice],
335    ) -> Poll<Result<usize, IoError>> {
336        either_unwrap!(EitherProj, self.project(), v => v.poll_write_vectored(ctx, bufs))
337    }
338
339    fn is_write_vectored(&self) -> bool {
340        either_unwrap!(Either, self, v => v.is_write_vectored())
341    }
342}
343
344impl<A, B> Stackable for Either<A, B>
345where
346    A: Stackable,
347    B: Stackable,
348{
349}
350
351impl<A, B, AR, BR> Comparable<Either<AR, BR>> for Either<A, B>
352where
353    A: Comparable<AR>,
354    B: Comparable<BR>,
355{
356    fn compare(&self, rhs: &Either<AR, BR>) -> Comparison {
357        match (self, rhs) {
358            (Either::A(s), Either::A(r)) => s.compare(r),
359            (Either::B(s), Either::B(r)) => s.compare(r),
360            _ => Comparison::Dissimilar,
361        }
362    }
363}
364
365impl<A, B> Fragment for Either<A, B>
366where
367    A: Fragment,
368    A::Driver: Driver<A, SubFragment = A>,
369    B: Fragment,
370    B::Driver: Driver<B, SubFragment = B>,
371{
372    type Driver = EitherDriver<A, B>;
373    type Installer = EitherInstaller<A::Installer, B::Installer>;
374    type Seed = Either<A::Seed, B::Seed>;
375    type Resource = Either<A::Resource, B::Resource>;
376    fn make_seed(&self, name: &'static str) -> Result<Self::Seed, AnyError> {
377        match self {
378            Either::A(a) => Ok(Either::A(a.make_seed(name)?)),
379            Either::B(b) => Ok(Either::B(b.make_seed(name)?)),
380        }
381    }
382    fn make_resource(
383        &self,
384        seed: &mut Self::Seed,
385        name: &'static str,
386    ) -> Result<Self::Resource, AnyError> {
387        match (self, seed) {
388            (Either::A(a), Either::A(sa)) => Ok(Either::A(a.make_resource(sa, name)?)),
389            (Either::B(b), Either::B(sb)) => Ok(Either::B(b.make_resource(sb, name)?)),
390            _ => unreachable!("Seed vs. fragment mismatch"),
391        }
392    }
393}
394
395/// An [`Installer`] for [`Either`] [`Resource`]s.
396///
397/// This wraps two distinct installers so it can install resources that are installable by one or
398/// the other.
399///
400/// Note that this to work, *both* installers need to exist at the same time (as opposed to the
401/// resource where one or the other is in existence).
402///
403/// [`Resource`]: Fragment::Resource
404#[derive(Debug, Default)]
405pub struct EitherInstaller<A, B>(A, B);
406
407impl<A, B, RA, RB, O, C> Installer<Either<RA, RB>, O, C> for EitherInstaller<A, B>
408where
409    A: Installer<RA, O, C>,
410    B: Installer<RB, O, C>,
411{
412    type UninstallHandle = Either<A::UninstallHandle, B::UninstallHandle>;
413    fn install(&mut self, resource: Either<RA, RB>, name: &'static str) -> Self::UninstallHandle {
414        match resource {
415            Either::A(ra) => Either::A(self.0.install(ra, name)),
416            Either::B(rb) => Either::B(self.1.install(rb, name)),
417        }
418    }
419    fn init<E: Extensible<Opts = O, Config = C, Ok = E>>(
420        &mut self,
421        builder: E,
422        name: &'static str,
423    ) -> Result<E, AnyError>
424    where
425        E::Config: DeserializeOwned + Send + Sync + 'static,
426        E::Opts: StructOpt + Send + Sync + 'static,
427    {
428        let builder = self.0.init(builder, name)?;
429        let builder = self.1.init(builder, name)?;
430        Ok(builder)
431    }
432}
433
434/// A [`Driver`] used for [`Either`] [`Fragment`]s.
435///
436/// This switches between driving the variants ‒ if the fragment changes from one variant to
437/// another, the old driver is dropped and new one created for the other one. If the variant stays
438/// the same, driving is delegated to the existing driver.
439///
440/// Note that there are limitations to what this driver implementation ‒ see the
441/// [`Either` drawbacks](struct.Either.html#drawbacks).
442#[derive(Debug)]
443pub struct EitherDriver<A, B>
444where
445    A: Fragment,
446    B: Fragment,
447{
448    driver: Either<A::Driver, B::Driver>,
449    new_driver: Option<Either<A::Driver, B::Driver>>,
450}
451
452impl<A, B> Default for EitherDriver<A, B>
453where
454    A: Fragment,
455    A::Driver: Default,
456    B: Fragment,
457{
458    fn default() -> Self {
459        EitherDriver {
460            driver: Either::A(Default::default()),
461            new_driver: None,
462        }
463    }
464}
465
466// TODO: This is a bit limiting
467impl<A, B> Driver<Either<A, B>> for EitherDriver<A, B>
468where
469    A: Fragment,
470    A::Driver: Driver<A, SubFragment = A> + Default,
471    B: Fragment,
472    B::Driver: Driver<B, SubFragment = B> + Default,
473{
474    type SubFragment = Either<A, B>;
475    fn instructions<T, I>(
476        &mut self,
477        fragment: &Either<A, B>,
478        transform: &mut T,
479        name: &'static str,
480    ) -> Result<Vec<Instruction<T::OutputResource>>, Vec<AnyError>>
481    where
482        T: Transformation<<Self::SubFragment as Fragment>::Resource, I, Self::SubFragment>,
483    {
484        assert!(self.new_driver.is_none(), "Unclosed transaction");
485
486        // Shape adaptor for the transformation ‒ we need to first wrap in A or B before feeding it
487        // into the either-transformation.
488        //
489        // Note that due to the lifetimes, we cache the outer fragment, not the inner part that the
490        // transformation gets. It should be the same one.
491        //
492        // T: Transformation on the either
493        // F: The original configuration fragment (eg Either<A, B>)
494        // W: Wrapping function (Either::A or Either::B)
495        struct Wrap<'a, T, F, W>(&'a mut T, &'a F, W);
496
497        impl<'a, T, I, Fi, Fo, W> Transformation<Fi::Resource, I, Fi> for Wrap<'a, T, Fo, W>
498        where
499            Fi: Fragment,
500            Fi::Driver: Driver<Fi, SubFragment = Fi>,
501            Fo: Fragment,
502            W: Fn(Fi::Resource) -> Fo::Resource,
503            T: Transformation<Fo::Resource, I, Fo>,
504        {
505            type OutputResource = T::OutputResource;
506            type OutputInstaller = T::OutputInstaller;
507            fn installer(&mut self, in_installer: I, name: &'static str) -> T::OutputInstaller {
508                self.0.installer(in_installer, name)
509            }
510            fn transform(
511                &mut self,
512                resource: Fi::Resource,
513                _fragment: &Fi,
514                name: &'static str,
515            ) -> Result<Self::OutputResource, AnyError> {
516                self.0.transform((self.2)(resource), self.1, name)
517            }
518        }
519
520        match (&mut self.driver, fragment) {
521            (Either::A(da), Either::A(a)) => {
522                da.instructions(a, &mut Wrap(transform, fragment, Either::A), name)
523            }
524            (Either::B(db), Either::B(b)) => {
525                db.instructions(b, &mut Wrap(transform, fragment, Either::B), name)
526            }
527            (Either::B(_), Either::A(a)) => {
528                let mut da = A::Driver::default();
529                let result = da.instructions(a, &mut Wrap(transform, fragment, Either::A), name);
530                self.new_driver = Some(Either::A(da));
531                result
532            }
533            (Either::A(_), Either::B(b)) => {
534                let mut db = B::Driver::default();
535                let result = db.instructions(b, &mut Wrap(transform, fragment, Either::B), name);
536                self.new_driver = Some(Either::B(db));
537                result
538            }
539        }
540    }
541    fn confirm(&mut self, name: &'static str) {
542        if let Some(new) = self.new_driver.take() {
543            self.driver = new;
544        }
545        either_unwrap!(Either, &mut self.driver, v => v.confirm(name))
546    }
547    fn abort(&mut self, name: &'static str) {
548        if self.new_driver.is_some() {
549            self.new_driver.take();
550        } else {
551            either_unwrap!(Either, &mut self.driver, v => v.abort(name))
552        }
553    }
554    fn maybe_cached(&self, fragment: &Either<A, B>, name: &'static str) -> bool {
555        match (&self.driver, fragment) {
556            (Either::A(da), Either::A(a)) => da.maybe_cached(a, name),
557            (Either::B(db), Either::B(b)) => db.maybe_cached(b, name),
558            _ => false,
559        }
560    }
561}