1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::sync::Arc;

use futures::Future;

use crate::{bus::Bus, cell::MsgCell, error::Error, message::Message};

pub trait Handler<M: Message>: Send + Sync {
    type Response: Message;

    type InitFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
    where
        Self: 'a;

    type HandleFuture<'a>: Future<Output = Result<Self::Response, Error>> + Send + 'a
    where
        Self: 'a;

    type FlushFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
    where
        Self: 'a;

    type CloseFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
    where
        Self: 'a;

    fn init(&self, bus: &Bus) -> Self::InitFuture<'_>;
    fn handle(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::HandleFuture<'_>;
    fn flush(&self, bus: &Bus) -> Self::FlushFuture<'_>;
    fn close(&self) -> Self::CloseFuture<'_>;
}

impl<M: Message, H: Handler<M> + 'static> Handler<M> for Arc<H> {
    type Response = H::Response;

    type InitFuture<'a> = H::InitFuture<'a>;
    type HandleFuture<'a> = H::HandleFuture<'a>;
    type FlushFuture<'a> = H::FlushFuture<'a>;
    type CloseFuture<'a> = H::CloseFuture<'a>;

    #[inline]
    fn init(&self, bus: &Bus) -> Self::InitFuture<'_> {
        (**self).init(bus)
    }

    #[inline]
    fn handle(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::HandleFuture<'_> {
        (**self).handle(msg, bus)
    }

    #[inline]
    fn flush(&self, bus: &Bus) -> Self::FlushFuture<'_> {
        (**self).flush(bus)
    }

    #[inline]
    fn close(&self) -> Self::CloseFuture<'_> {
        (**self).close()
    }
}

// pub trait StreamHandler<M: Message>: Send + Sync {
//     type Message: Message;

//     type HandleStream<'a>: Stream<Item = Result<Self::Message, Error>> + Send + 'a
//     where
//         Self: 'a;

//     type CloseFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
//     where
//         Self: 'a;

//     fn handle(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::HandleStream<'_>;
//     fn close(&self, ctx: Self::Context) -> Self::CloseFuture<'_>;
// }

pub trait MessageProducer<M: Message>: Send + Sync {
    type Message: Message;
    type Context: Send;

    type InitFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
    where
        Self: 'a;

    type StartFuture<'a>: Future<Output = Result<Self::Context, Error>> + Send + 'a
    where
        Self: 'a;

    type NextFuture<'a>: Future<Output = Result<Self::Message, Error>> + Send + 'a
    where
        Self: 'a;

    type CloseFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
    where
        Self: 'a;

    fn init(&self, bus: &Bus) -> Self::InitFuture<'_>;
    fn start(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::StartFuture<'_>;
    fn next<'a>(&'a self, ctx: &'a mut Self::Context, bus: &Bus) -> Self::NextFuture<'a>;
    fn close(&self, ctx: Self::Context) -> Self::CloseFuture<'_>;
}

impl<M: Message, H: MessageProducer<M> + 'static> MessageProducer<M> for Arc<H> {
    type Message = H::Message;
    type Context = H::Context;

    type InitFuture<'a> = H::InitFuture<'a>;
    type StartFuture<'a> = H::StartFuture<'a>;
    type NextFuture<'a> = H::NextFuture<'a>;
    type CloseFuture<'a> = H::CloseFuture<'a>;

    #[inline]
    fn init(&self, bus: &Bus) -> Self::InitFuture<'_> {
        (**self).init(bus)
    }

    #[inline]
    fn start(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::StartFuture<'_> {
        (**self).start(msg, bus)
    }

    #[inline]
    fn next<'a>(&'a self, ctx: &'a mut Self::Context, bus: &Bus) -> Self::NextFuture<'a> {
        (**self).next(ctx, bus)
    }

    #[inline]
    fn close(&self, ctx: Self::Context) -> Self::CloseFuture<'_> {
        (**self).close(ctx)
    }
}