1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
//! Module containing types for managing [`Stream`]s.
mod cmd;
pub use cmd::StreamWorkerCmd;

mod consumer;
use consumer::StreamConsumer;

mod producer;
use producer::StreamProducer;

use crate::{
    protocols::stream::responses::*, Address, Context, Message, Result, Route, Routed,
    TransportMessage,
};
use core::{ops::Deref, time::Duration};
use ockam_core::compat::rand::{self, Rng};
use ockam_core::compat::string::String;
use ockam_core::{Decodable, RouteBuilder, TransportType};

/// Stream controller transport type.
pub const STREAM: TransportType = TransportType::new(16);

/// Ockam stream protocol controller
///
/// Each stream has a sending and consuming worker (publisher and
/// consumer) that are created and managed on the fly by this
/// abstraction.
pub struct Stream {
    ctx: Context,
    interval: Duration,
    forwarding_address: Option<Address>,
    stream_service: String,
    index_service: String,
    client_id: Option<String>,
}

/// A simple address wrapper for stream workers
///
/// This type can be used as any other address, while also exposing
/// the name of the stream it is associated with.
pub struct SenderAddress {
    inner: Address,
}

impl SenderAddress {
    /// Create a new route from this sender address
    pub fn to_route(&self) -> RouteBuilder {
        Route::new().append(self.inner.clone())
    }
}

impl Deref for SenderAddress {
    type Target = Address;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl From<SenderAddress> for Route {
    fn from(addr: SenderAddress) -> Self {
        Route::new().append(addr.inner).into()
    }
}

/// The reciever half of [`SenderAddress`].
pub struct ReceiverAddress {
    ctx: Context,
    _inner: Address,
}

impl ReceiverAddress {
    /// Wait for the next message received by the stream consumer
    pub async fn next<T: Message>(&mut self) -> Result<Routed<T>> {
        let routed = self.ctx.receive_block::<StreamMessage>().await?.take();
        let stream_msg = routed.as_body();
        let (addr, local_msg) = routed.dissolve();

        let transport = TransportMessage::decode(&stream_msg.data).unwrap();
        T::decode(&transport.payload).map(|t| Routed::new(t, addr, local_msg))
    }
}

impl Stream {
    /// Create a new Ockam stream controller
    ///
    /// By default, the created stream will poll for new messages
    /// every 250 milliseconds.
    pub async fn new(ctx: &Context) -> Result<Self> {
        ctx.new_context(Address::random(STREAM))
            .await
            .map(|ctx| Self {
                ctx,
                interval: Duration::from_millis(250),
                forwarding_address: None,
                stream_service: "stream".into(),
                index_service: "stream_index".into(),
                client_id: None,
            })
    }

    /// Customize the polling interval for the stream consumer
    pub fn with_interval<D: Into<Duration>>(self, duration: D) -> Self {
        Self {
            interval: duration.into(),
            ..self
        }
    }

    /// Specify the stream service running on the remote
    pub fn stream_service<S: Into<String>>(self, serv: S) -> Self {
        Self {
            stream_service: serv.into(),
            ..self
        }
    }

    /// Specify the index service running on the remote
    pub fn index_service<S: Into<String>>(self, serv: S) -> Self {
        Self {
            index_service: serv.into(),
            ..self
        }
    }

    /// Specify the client_id for the stream consumer
    ///
    /// When setting up a stream without calling this function
    /// a random client id will be assigned.
    pub fn client_id<S: Into<String>>(self, client_id: S) -> Self {
        Self {
            client_id: Some(client_id.into()),
            ..self
        }
    }

    /// Specify an address to forward incoming messages to
    ///
    /// When setting up a stream without calling this function
    /// messages will be buffered by the StreamConsumer and must be
    /// polled via the [`StreamWorkerCmd`].
    pub fn with_recipient<A: Into<Address>>(self, addr: A) -> Self {
        Self {
            forwarding_address: Some(addr.into()),
            ..self
        }
    }

    /// Connect to a bi-directional stream by remote and stream pair
    ///
    /// When using the stream protocol for bi-directional
    /// communication a sending and receiving stream name is required.
    /// These two identifiers MUST be known between nodes that wish to
    /// exchange messages.
    ///
    /// The `route` parameter is the route to a remote which hosts a
    /// `stream_service` and `stream_index_service`, such as
    /// hub.ockam.io.
    ///
    /// Streams that do not already exists will be created, and
    /// existing stream identifiers will automatically be re-used.
    pub async fn connect<R, S>(
        &self,
        route: R,
        sender_name: S,
        receiver_name: S,
    ) -> Result<(SenderAddress, ReceiverAddress)>
    where
        R: Into<Route>,
        S: Into<String>,
    {
        let route = route.into();
        let sender_name = sender_name.into();
        let receiver_name = receiver_name.into();

        // Generate two new random addresses
        let receiver_address = Address::random_local();
        let sender_address = Address::random_local();

        let receiver_rx = Address::random_local();

        // Generate a random client_id if one has not been provided
        let client_id = match self.client_id.clone() {
            Some(client_id) => client_id,
            None => {
                let random: [u8; 16] = rand::thread_rng().gen();
                hex::encode(random)
            }
        };

        // Create and start a new stream consumer
        self.ctx
            .start_worker(
                receiver_address.clone(),
                StreamConsumer::new(
                    client_id,
                    route.clone(),
                    Some(sender_address.clone()),
                    receiver_name.clone(),
                    self.interval,
                    self.forwarding_address.clone(),
                    receiver_rx.clone(),
                    self.stream_service.clone(),
                    self.index_service.clone(),
                ),
            )
            .await?;

        // Create and start a new stream producer
        self.ctx
            .start_worker(
                sender_address.clone(),
                StreamProducer::new(sender_name.clone(), route, self.stream_service.clone()),
            )
            .await?;

        // Return a sender and receiver address
        Ok((
            SenderAddress {
                inner: sender_address,
            },
            ReceiverAddress {
                _inner: receiver_address,
                ctx: self.ctx.new_context(receiver_rx).await?,
            },
        ))
    }
}