1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//! `netmod-mem` is an in-memory `netmod` endpoint
//!
//! This aims to make testing any structure that binds against
//! `netmod` easier and reproducable.

use async_std::{
    future::{self, Future},
    pin::Pin,
    sync::{Arc, RwLock},
    task::{self, Poll},
};
use async_trait::async_trait;
use crossbeam_channel::TryRecvError;
use ratman_netmod::{Endpoint, Error as NetError, Frame, Result as NetResult, Target};
use std::time::Duration;

/// An input/output pair of `mpsc::channel`s.
///
/// This is the actual mechanism by which data is moved around between `MemMod`s in
/// different places.
pub(crate) mod io;
/// Simulated transmission media.
pub mod media;

/// Represents a single netmod endpoint that can connect to exactly one other, either
/// as a 1-to-1 link between libqaul instances or as a link into a transmission
/// medium of some kind.
pub struct MemMod {
    /// Internal memory access to send/receive
    io: Arc<RwLock<Option<io::Io>>>,
}

impl MemMod {
    /// Create a new, unpaired `MemMod`.
    pub fn new() -> Self {
        Self {
            io: Default::default(),
        }
    }

    /// Create two already-paired `MemMod`s, ready for use.
    pub fn make_pair() -> (Self, Self) {
        let (a, b) = (MemMod::new(), MemMod::new());
        a.link(&b);
        (a, b)
    }

    /// Return `true` if the MemMod is linked to another one or
    /// `false` otherwise.
    pub fn linked(&self) -> bool {
        task::block_on(async { self.io.read().await.is_some() })
    }

    /// Establish a 1-to-1 link between two `MemMod`s.
    ///
    /// # Panics
    ///
    /// Panics if this MemMod, or the other one, is already linked.
    pub fn link(&self, pair: &MemMod) {
        if self.linked() || pair.linked() {
            panic!("Attempted to link an already linked MemMod.");
        }
        let (my_io, their_io) = io::Io::make_pair();

        self.set_io_async(my_io);
        pair.set_io_async(their_io);
    }

    /// Establish a link to an `Io` module
    ///
    /// # Panics
    /// Panics if this MemMod is already linked.
    pub(crate) fn link_raw(&mut self, io: io::Io) {
        if self.linked() {
            panic!("Attempted to link an already linked MemMod.");
        }
        self.set_io_async(io);
    }

    /// Remove the connection between MemMods.
    pub fn split(&self) {
        // The previous value in here will now be dropped,
        // so future messages will fail.
        self.set_io_async(None);
    }

    fn set_io_async<I: Into<Option<io::Io>>>(&self, val: I) {
        task::block_on(async { *self.io.write().await = val.into() });
    }
}

#[async_trait]
impl Endpoint for MemMod {
    /// Provides maximum frame-size information to `RATMAN`
    fn size_hint(&self) -> usize {
        ::std::u32::MAX as usize
    }

    /// Send a message to a specific endpoint (client)
    ///
    /// # Errors
    ///
    /// Returns `OperationNotSupported` if attempting to send through
    /// a connection that is not yet connected.
    async fn send(&self, frame: Frame, _: Target) -> NetResult<()> {
        let mut lock = self.io.write().await;
        match *lock {
            None => Err(NetError::NotSupported),
            Some(ref mut io) => match io.out.send(frame) {
                Ok(_) => Ok(()),
                Err(_) => Err(NetError::ConnectionLost),
            },
        }
    }

    async fn next(&self) -> NetResult<(Frame, Target)> {
        future::poll_fn(|ctx| {
            let lock = &mut self.io.write();
            let waker = ctx.waker().clone();

            match unsafe { Pin::new_unchecked(lock).poll(ctx) } {
                Poll::Ready(mut io_opt) => match &mut *io_opt {
                    Some(ref mut io) => match io.inc.try_recv() {
                        Ok(v) => Poll::Ready(Ok((v, Target::default()))),
                        Err(TryRecvError::Empty) => {
                            let w = waker.clone();
                            task::spawn(async move {
                                task::sleep(Duration::from_millis(20)).await;
                                w.wake();
                            });

                            Poll::Pending
                        }
                        Err(_) => Poll::Ready(Err(NetError::ConnectionLost)),
                    },
                    None => Poll::Ready(Err(NetError::ConnectionLost)),
                },
                Poll::Pending => Poll::Pending,
            }
        })
        .await
    }
}