1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use crate::Listener;
use async_trait::async_trait;
use derive_more::From;
use std::io;
use tokio::sync::oneshot;

/// Represents a [`Listener`] that only has a single connection
#[derive(From)]
pub struct OneshotListener<T: Send> {
    inner: Option<oneshot::Receiver<T>>,
}

impl<T: Send> OneshotListener<T> {
    pub fn from_value(value: T) -> Self {
        let (tx, listener) = Self::channel();

        // NOTE: Impossible to fail as the receiver has not been dropped at this point
        let _ = tx.send(value);

        listener
    }

    pub fn channel() -> (oneshot::Sender<T>, Self) {
        let (tx, rx) = oneshot::channel();
        (tx, Self { inner: Some(rx) })
    }
}

#[async_trait]
impl<T: Send> Listener for OneshotListener<T> {
    type Output = T;

    /// First call to accept will return listener tied to [`OneshotListener`] while future
    /// calls will yield an error of `io::ErrorKind::ConnectionAborted`
    async fn accept(&mut self) -> io::Result<Self::Output> {
        match self.inner.take() {
            Some(rx) => rx
                .await
                .map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x)),
            None => Err(io::Error::new(
                io::ErrorKind::ConnectionAborted,
                "Oneshot listener has concluded",
            )),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::task::JoinHandle;

    #[tokio::test]
    async fn from_value_should_return_value_on_first_call_to_accept() {
        let mut listener = OneshotListener::from_value("hello world");
        assert_eq!(listener.accept().await.unwrap(), "hello world");
        assert_eq!(
            listener.accept().await.unwrap_err().kind(),
            io::ErrorKind::ConnectionAborted
        );
    }

    #[tokio::test]
    async fn channel_should_return_a_oneshot_sender_to_feed_first_call_to_accept() {
        let (tx, mut listener) = OneshotListener::channel();
        let accept_task: JoinHandle<(io::Result<&str>, io::Result<&str>)> =
            tokio::spawn(async move {
                let result_1 = listener.accept().await;
                let result_2 = listener.accept().await;
                (result_1, result_2)
            });
        tokio::spawn(async move {
            tx.send("hello world").unwrap();
        });

        let (result_1, result_2) = accept_task.await.unwrap();

        assert_eq!(result_1.unwrap(), "hello world");
        assert_eq!(
            result_2.unwrap_err().kind(),
            io::ErrorKind::ConnectionAborted
        );
    }
}