cord_client/
lib.rs

1//! Cord is a data streaming platform for composing, aggregating and distributing
2//! arbitrary streams. It uses a publish-subscribe model that allows multiple publishers
3//! to share their streams via a Cord Broker. Subscribers can then compose custom sinks
4//! using a regex-like pattern to access realtime data based on their individual
5//! requirements.
6//!
7//! To interact with a Broker, we use this library:
8//!
9//! # Examples
10//!
11//! ```no_run
12//! use cord_client::Client;
13//!# use cord_client::errors::Error;
14//!
15//!# async fn test() -> Result<(), Error> {
16//! let mut conn = Client::connect(("127.0.0.1", 7101)).await?;
17//!
18//! // Tell the broker we're going to provide the namespace /users
19//! conn.provide("/users".into()).await?;
20//!
21//! // Start publishing events...
22//! conn.event("/users/mark".into(), "Mark has joined").await?;
23//!
24//!# Ok(())
25//!# }
26//! ```
27//!
28//! # Cord CLI
29//! For one-off interactions with a Broker, there is also the Cord CLI, which is
30//! available via Cargo:
31//!
32//! ```text
33//! $ cargo install cord-client
34//! $ cord-client sub /namespaces
35//! ```
36//!
37//! For more usage, check the usage guidelines on [cord-proj.org](https://cord-proj.org).
38
39pub mod errors;
40
41use cord_message::{errors::Error as MessageError, Codec, Message, Pattern};
42use errors::{Error, ErrorKind, Result};
43use futures::{
44    future::{self, try_select},
45    stream::SplitSink,
46    Sink, SinkExt, Stream, StreamExt, TryStreamExt,
47};
48use futures_locks::Mutex;
49use tokio::{
50    net::{TcpStream, ToSocketAddrs},
51    sync::mpsc,
52    sync::oneshot,
53};
54use tokio_util::codec::Framed;
55
56use std::{
57    collections::HashMap,
58    convert::Into,
59    ops::Drop,
60    pin::Pin,
61    result,
62    sync::Arc,
63    task::{Context, Poll},
64};
65
66/// The `Client` type alias defines the `Sink` type for communicating with a Broker.
67///
68/// This type alias should be used for normal operation in favour of consuming
69/// `ClientConn` directly. This type is instantiated using
70/// [`Client::connect`](struct.ClientConn.html#method.connect).
71///
72/// The reason for this alias' existence is so that the `Sink` can be overridden for
73/// testing.
74pub type Client = ClientConn<SplitSink<Framed<TcpStream, Codec>, Message>>;
75
76/// The `ClientConn` manages the connection between you and a Cord Broker.
77///
78/// Using a generic `Sink` (`S`) allows us to build mocks for testing. However for normal
79/// use, it is strongly recommended to use the type alias, [`Client`](type.Client.html).
80pub struct ClientConn<S> {
81    sink: S,
82    inner: Arc<Inner>,
83}
84
85/// A `Subscriber` encapsulates a stream of events for a subscribed namespace. It is
86/// created by [`Client::subscribe()`](struct.Client.html#method.subscribe).
87///
88/// # Examples
89///
90/// ```
91///# use cord_client::{errors::Result, Client};
92///# use futures::{future, StreamExt, TryFutureExt};
93///
94///# async fn test() -> Result<()> {
95/// let mut conn = Client::connect(("127.0.0.1", 7101)).await?;
96/// conn.subscribe("/users/".into())
97///     .and_then(|sub| async {
98///         sub.for_each(|(namespace, data)| {
99///             // Handle the message...
100///             dbg!("Received the namespace '{}' with data: {}", namespace, data);
101///             future::ready(())
102///         })
103///         .await;
104///         Ok(())
105///     })
106///     .await
107///# }
108/// ```
109pub struct Subscriber {
110    receiver: mpsc::Receiver<Message>,
111    _inner: Arc<Inner>,
112}
113
114struct Inner {
115    receivers: Mutex<HashMap<Pattern, Vec<mpsc::Sender<Message>>>>,
116    detonator: Option<oneshot::Sender<()>>,
117}
118
119impl<S> ClientConn<S>
120where
121    S: Sink<Message, Error = MessageError> + Unpin,
122{
123    /// Connect to a broker
124    pub async fn connect<A>(addr: A) -> Result<Client>
125    where
126        A: ToSocketAddrs,
127    {
128        // This channel is used to shutdown the stream listener when the Client is dropped
129        let (det_tx, det_rx) = oneshot::channel();
130
131        // Connect to the broker
132        let sock = TcpStream::connect(addr).await?;
133
134        // Wrap socket in message codec
135        let framed = Framed::new(sock, Codec::default());
136        let (sink, stream) = framed.split();
137
138        // Setup the receivers map
139        let receivers = Mutex::new(HashMap::new());
140        let receivers_c = receivers.clone();
141
142        // Route the codec's stream to receivers
143        let router = Box::pin(
144            stream
145                .map_err(|e| Error::from_kind(ErrorKind::Message(e)))
146                .try_fold(receivers_c, |recv, message| async move {
147                    route(&recv, message).await;
148                    Ok(recv)
149                }),
150        );
151
152        tokio::spawn(try_select(router, det_rx));
153
154        Ok(ClientConn {
155            sink,
156            inner: Arc::new(Inner {
157                receivers,
158                detonator: Some(det_tx),
159            }),
160        })
161    }
162
163    /// Inform the broker that you will be providing a new namespace
164    pub async fn provide(&mut self, namespace: Pattern) -> Result<()> {
165        self.sink
166            .send(Message::Provide(namespace))
167            .await
168            .map_err(|e| ErrorKind::Message(e).into())
169    }
170
171    /// Inform the broker that you will no longer be providing a namespace
172    pub async fn revoke(&mut self, namespace: Pattern) -> Result<()> {
173        self.sink
174            .send(Message::Revoke(namespace))
175            .await
176            .map_err(|e| ErrorKind::Message(e).into())
177    }
178
179    /// Subscribe to another provider's namespace
180    ///
181    /// # Examples
182    ///
183    /// ```
184    ///# use cord_client::{errors::Result, Client};
185    ///# use futures::{future, StreamExt, TryFutureExt};
186    ///
187    ///# async fn test() -> Result<()> {
188    /// let mut conn = Client::connect(("127.0.0.1", 7101)).await?;
189    /// conn.subscribe("/users/".into())
190    ///     .and_then(|sub| async {
191    ///         sub.for_each(|(namespace, data)| {
192    ///             // Handle the message...
193    ///             dbg!("Received the namespace '{}' with data: {}", namespace, data);
194    ///             future::ready(())
195    ///         })
196    ///         .await;
197    ///         Ok(())
198    ///     })
199    ///     .await
200    ///# }
201    /// ```
202    pub async fn subscribe(&mut self, namespace: Pattern) -> Result<Subscriber> {
203        let namespace_c = namespace.clone();
204        self.sink.send(Message::Subscribe(namespace)).await?;
205
206        let (tx, rx) = mpsc::channel(10);
207        self.inner
208            .receivers
209            .with(move |mut guard| {
210                (*guard)
211                    .entry(namespace_c)
212                    .or_insert_with(Vec::new)
213                    .push(tx);
214                let ok: result::Result<_, ()> = Ok(());
215                future::ready(ok)
216            })
217            .await
218            .unwrap();
219        Ok(Subscriber {
220            receiver: rx,
221            _inner: self.inner.clone(),
222        })
223    }
224
225    /// Unsubscribe from another provider's namespace
226    pub async fn unsubscribe(&mut self, namespace: Pattern) -> Result<()> {
227        let namespace_c = namespace.clone();
228        self.sink.send(Message::Unsubscribe(namespace)).await?;
229
230        self.inner
231            .receivers
232            .with(move |mut guard| {
233                (*guard).remove(&namespace_c);
234                future::ready(())
235            })
236            .await;
237        Ok(())
238    }
239
240    /// Publish an event to your subscribers
241    pub async fn event<Str: Into<String>>(&mut self, namespace: Pattern, data: Str) -> Result<()> {
242        self.sink
243            .send(Message::Event(namespace, data.into()))
244            .await
245            .map_err(|e| ErrorKind::Message(e).into())
246    }
247}
248
249impl<E, S, T> Sink<T> for ClientConn<S>
250where
251    S: Sink<T, Error = E>,
252    E: Into<Error>,
253{
254    type Error = Error;
255
256    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<result::Result<(), Self::Error>> {
257        unsafe { Pin::map_unchecked_mut(self, |x| &mut x.sink) }
258            .poll_ready(cx)
259            .map_err(|e| e.into())
260    }
261
262    fn start_send(self: Pin<&mut Self>, item: T) -> result::Result<(), Self::Error> {
263        unsafe { Pin::map_unchecked_mut(self, |x| &mut x.sink) }
264            .start_send(item)
265            .map_err(|e| e.into())
266    }
267
268    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<result::Result<(), Self::Error>> {
269        unsafe { Pin::map_unchecked_mut(self, |x| &mut x.sink) }
270            .poll_flush(cx)
271            .map_err(|e| e.into())
272    }
273
274    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<result::Result<(), Self::Error>> {
275        unsafe { Pin::map_unchecked_mut(self, |x| &mut x.sink) }
276            .poll_close(cx)
277            .map_err(|e| e.into())
278    }
279}
280
281impl Stream for Subscriber {
282    type Item = (Pattern, String);
283
284    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
285        unsafe { Pin::map_unchecked_mut(self, |x| &mut x.receiver) }
286            .poll_recv(cx)
287            .map(|opt_msg| match opt_msg {
288                Some(Message::Event(pattern, data)) => Some((pattern, data)),
289                None => None,
290                _ => unreachable!(),
291            })
292    }
293}
294
295impl Drop for Inner {
296    fn drop(&mut self) {
297        // Ignore any error from the channel as an error indicates that the other side
298        // has already terminated.
299        let _ = self
300            .detonator
301            .take()
302            .expect("Inner has already been terminated")
303            .send(());
304    }
305}
306
307async fn route(receivers: &Mutex<HashMap<Pattern, Vec<mpsc::Sender<Message>>>>, message: Message) {
308    receivers
309        .with(move |mut guard| {
310            // Remove any subscribers that have no senders left
311            (*guard).retain(|namespace, senders| {
312                // We assume that all messages will be Events. If this changes, we will
313                // need to store a Message, not a pattern.
314                if namespace.contains(message.namespace()) {
315                    // Remove any senders that give errors when attempting to send
316                    senders.retain_mut(|tx| tx.try_send(message.clone()).is_ok());
317                }
318
319                // So long as we have senders, keep the subscriber
320                !senders.is_empty()
321            });
322
323            future::ready(())
324        })
325        .await
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    use cord_message::errors::ErrorKind as MessageErrorKind;
333
334    // Using Futures channel instead of Tokio as Tokio's channel implementation is
335    // missing a Sink implementation
336    use futures::channel::mpsc::{unbounded, UnboundedReceiver};
337
338    struct ForwardStream(Vec<Message>);
339
340    impl Stream for ForwardStream {
341        type Item = Result<Message>;
342
343        fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
344            Poll::Ready(self.0.pop().map(Ok))
345        }
346    }
347
348    #[allow(clippy::type_complexity)]
349    fn setup_client() -> (
350        ClientConn<impl Sink<Message, Error = MessageError>>,
351        UnboundedReceiver<Message>,
352        Mutex<HashMap<Pattern, Vec<mpsc::Sender<Message>>>>,
353    ) {
354        let (tx, rx) = unbounded();
355        let (det_tx, _) = oneshot::channel();
356        let receivers = Mutex::new(HashMap::new());
357
358        (
359            ClientConn {
360                sink: tx.sink_map_err(|e| MessageErrorKind::Msg(format!("{}", e)).into()),
361                inner: Arc::new(Inner {
362                    receivers: receivers.clone(),
363                    detonator: Some(det_tx),
364                }),
365            },
366            rx,
367            receivers,
368        )
369    }
370
371    #[tokio::test]
372    async fn test_forward() {
373        let (client, rx, _) = setup_client();
374
375        let data_stream = ForwardStream(vec![
376            Message::Event("/a".into(), "b".into()),
377            Message::Provide("/a".into()),
378        ]);
379        data_stream.forward(client).await.unwrap();
380
381        // We check these messages in reverse order (i.e. Provide, then Event), because
382        // our budget DIY stream sends them in reverse order.
383        let (item, rx) = rx.into_future().await;
384        assert_eq!(item, Some(Message::Provide("/a".into())));
385
386        let (item, _) = rx.into_future().await;
387        assert_eq!(item, Some(Message::Event("/a".into(), "b".into())));
388    }
389
390    #[tokio::test]
391    async fn test_provide() {
392        let (mut client, rx, _) = setup_client();
393
394        client.provide("/a/b".into()).await.unwrap();
395        assert_eq!(
396            rx.into_future().await.0.unwrap(),
397            Message::Provide("/a/b".into())
398        );
399    }
400
401    #[tokio::test]
402    async fn test_revoke() {
403        let (mut client, rx, _) = setup_client();
404
405        client.revoke("/a/b".into()).await.unwrap();
406        assert_eq!(
407            rx.into_future().await.0.unwrap(),
408            Message::Revoke("/a/b".into())
409        );
410    }
411
412    #[tokio::test]
413    async fn test_subscribe() {
414        let (mut client, rx, receivers) = setup_client();
415
416        client.subscribe("/a/b".into()).await.unwrap();
417
418        // Check that the mock broker (`rx`) has received our message
419        assert_eq!(
420            rx.into_future().await.0.unwrap(),
421            Message::Subscribe("/a/b".into())
422        );
423
424        // Check that the `receivers` routing table has been updated
425        let guard = receivers.lock().await;
426        assert!((*guard).contains_key(&"/a/b".into()));
427    }
428
429    #[tokio::test]
430    async fn test_unsubscribe() {
431        let (mut client, rx, receivers) = setup_client();
432
433        receivers
434            .with(|mut guard| {
435                (*guard).insert("/a/b".into(), Vec::new());
436                future::ready(())
437            })
438            .await;
439
440        client.unsubscribe("/a/b".into()).await.unwrap();
441
442        // Check that the mock broker (`rx`) has received our message
443        assert_eq!(
444            rx.into_future().await.0.unwrap(),
445            Message::Unsubscribe("/a/b".into())
446        );
447
448        // Check that the `receivers` routing table has been updated
449        let guard = receivers.lock().await;
450        assert!((*guard).is_empty());
451    }
452
453    #[tokio::test]
454    async fn test_event() {
455        let (mut client, rx, _) = setup_client();
456
457        client.event("/a/b".into(), "moo").await.unwrap();
458        assert_eq!(
459            rx.into_future().await.0.unwrap(),
460            Message::Event("/a/b".into(), "moo".into())
461        );
462    }
463
464    #[tokio::test]
465    async fn test_route() {
466        let (tx, mut rx) = mpsc::channel(10);
467        let receivers = Mutex::new(HashMap::new());
468
469        receivers
470            .with(|mut guard| {
471                (*guard).insert("/a/b".into(), vec![tx]);
472                future::ready(())
473            })
474            .await;
475
476        let event_msg = Message::Event("/a/b".into(), "Moo!".into());
477        let event_msg_c = event_msg.clone();
478
479        route(&receivers, event_msg).await;
480
481        // Check that the subscriber has received our message
482        assert_eq!(rx.recv().await.unwrap(), event_msg_c);
483
484        let guard = receivers.lock().await;
485        assert!((*guard).contains_key(&"/a/b".into()));
486    }
487
488    #[tokio::test]
489    async fn test_route_norecv() {
490        let (tx, _) = mpsc::channel(10);
491        let receivers = Mutex::new(HashMap::new());
492
493        receivers
494            .with(|mut guard| {
495                (*guard).insert("/a/b".into(), vec![tx]);
496                future::ready(())
497            })
498            .await;
499
500        route(&receivers, Message::Event("/a/b".into(), "Moo!".into())).await;
501
502        // Check that the unused receiver has been removed
503        let guard = receivers.lock().await;
504        assert!((*guard).is_empty());
505    }
506}