1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use core::fmt;
use std::any::Any;

use futures::Stream;

use crate::{async_iter::AsyncIterator, Error, Iter, StreamIter};

pub trait Message: Any + fmt::Debug + Clone + Send + Sync + Unpin {}
pub trait ErrorMessage: Any + fmt::Debug + fmt::Display + Send + Sync + Unpin {}
pub trait IntoMessages<M: Message, E: ErrorMessage> {
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
}

pub struct AsyncIter<I: AsyncIterator>(I);
pub fn async_iter<I: AsyncIterator>(i: I) -> AsyncIter<I> {
    AsyncIter(i)
}

impl Message for () {}
impl ErrorMessage for anyhow::Error {}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Msg<M: Message> {
    pub(crate) inner: Option<Result<M, Error>>,
    pub(crate) index: u64,
    pub(crate) stream_id: u32,
}

impl<M: Message, E: ErrorMessage, S: Stream<Item = Result<M, E>> + Send> IntoMessages<M, E>
    for StreamIter<S>
{
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
        self
    }
}

impl<E: ErrorMessage, I: Iterator + Send + Unpin> IntoMessages<I::Item, E> for Iter<I>
where
    I::Item: Message,
{
    fn into_messages(self) -> impl AsyncIterator<Item = Result<I::Item, E>> {
        self.map(Ok)
    }
}

impl<M: Message, E: ErrorMessage, I: AsyncIterator<Item = Result<M, E>>> IntoMessages<M, E>
    for AsyncIter<I>
{
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
        self.0
    }
}

impl<E: ErrorMessage> IntoMessages<(), E> for () {
    fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
        crate::empty()
    }
}

impl<const N: usize, M: Message, E: ErrorMessage> IntoMessages<M, E> for [M; N] {
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
        crate::iter(self.into_iter().map(Ok))
    }
}

impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Vec<M> {
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
        crate::iter(self.into_iter().map(Ok))
    }
}

impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Option<M> {
    fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
        crate::iter(self.into_iter().map(Ok))
    }
}