1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
use crate::{Id, Response};
use std::{future::Future, io, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, Mutex};

/// Interface to send a reply to some request
pub trait Reply: Send + Sync {
    type Data;

    /// Sends a reply out from the server
    fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;

    /// Blocking version of sending a reply out from the server
    fn blocking_send(&self, data: Self::Data) -> io::Result<()>;

    /// Clones this reply
    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>>;
}

impl<T: Send + 'static> Reply for mpsc::Sender<T> {
    type Data = T;

    fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
        Box::pin(async move {
            mpsc::Sender::send(self, data)
                .await
                .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))
        })
    }

    fn blocking_send(&self, data: Self::Data) -> io::Result<()> {
        mpsc::Sender::blocking_send(self, data)
            .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))
    }

    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
        Box::new(self.clone())
    }
}

/// Utility to send ad-hoc replies from the server back through the connection
pub struct ServerReply<T> {
    pub(crate) origin_id: Id,
    pub(crate) tx: mpsc::Sender<Response<T>>,
}

impl<T> Clone for ServerReply<T> {
    fn clone(&self) -> Self {
        Self {
            origin_id: self.origin_id.clone(),
            tx: self.tx.clone(),
        }
    }
}

impl<T> ServerReply<T> {
    pub async fn send(&self, data: T) -> io::Result<()> {
        self.tx
            .send(Response::new(self.origin_id.clone(), data))
            .await
            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed"))
    }

    pub fn blocking_send(&self, data: T) -> io::Result<()> {
        self.tx
            .blocking_send(Response::new(self.origin_id.clone(), data))
            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed"))
    }

    pub fn is_closed(&self) -> bool {
        self.tx.is_closed()
    }

    pub fn queue(self) -> QueuedServerReply<T> {
        QueuedServerReply {
            inner: self,
            queue: Arc::new(Mutex::new(Vec::new())),
            hold: Arc::new(Mutex::new(true)),
        }
    }
}

impl<T: Send + 'static> Reply for ServerReply<T> {
    type Data = T;

    fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
        Box::pin(ServerReply::send(self, data))
    }

    fn blocking_send(&self, data: Self::Data) -> io::Result<()> {
        ServerReply::blocking_send(self, data)
    }

    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
        Box::new(self.clone())
    }
}

/// Represents a reply where all sends are queued up but not sent until
/// after the flush method is called. This reply supports injecting
/// at the front of the queue in order to support sending messages
/// but ensuring that some specific message is sent out first
pub struct QueuedServerReply<T> {
    inner: ServerReply<T>,
    queue: Arc<Mutex<Vec<T>>>,
    hold: Arc<Mutex<bool>>,
}

impl<T> Clone for QueuedServerReply<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            queue: Arc::clone(&self.queue),
            hold: Arc::clone(&self.hold),
        }
    }
}

impl<T> QueuedServerReply<T> {
    /// Updates the hold status for the queue
    ///
    /// * If true, all messages are held until the queue is flushed
    /// * If false, messages are sent directly as they come in
    pub async fn hold(&self, hold: bool) {
        *self.hold.lock().await = hold;
    }

    /// Send this message, adding it to a queue if holding messages
    pub async fn send(&self, data: T) -> io::Result<()> {
        if *self.hold.lock().await {
            self.queue.lock().await.push(data);
            Ok(())
        } else {
            self.inner.send(data).await
        }
    }

    /// Send this message, adding it to a queue if holding messages, blocking
    /// for access to locks and other internals
    pub fn blocking_send(&self, data: T) -> io::Result<()> {
        if *self.hold.blocking_lock() {
            self.queue.blocking_lock().push(data);
            Ok(())
        } else {
            self.inner.blocking_send(data)
        }
    }

    /// Send this message before anything else in the queue
    pub async fn send_before(&self, data: T) -> io::Result<()> {
        if *self.hold.lock().await {
            self.queue.lock().await.insert(0, data);
            Ok(())
        } else {
            self.inner.send(data).await
        }
    }

    /// Sends all pending msgs queued up and clears the queue
    ///
    /// Additionally, takes `hold` to indicate whether or not new msgs
    /// after the flush should continue to be held within the queue
    /// or if all future msgs will be sent immediately
    pub async fn flush(&self, hold: bool) -> io::Result<()> {
        // Lock hold so we can ensure that nothing gets sent
        // to the queue after we clear it
        let mut hold_lock = self.hold.lock().await;

        // Clear the queue by sending everything
        for data in self.queue.lock().await.drain(..) {
            self.inner.send(data).await?;
        }

        // Update hold to
        *hold_lock = hold;

        Ok(())
    }

    pub fn is_closed(&self) -> bool {
        self.inner.is_closed()
    }
}

impl<T: Send + 'static> Reply for QueuedServerReply<T> {
    type Data = T;

    fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
        Box::pin(QueuedServerReply::send(self, data))
    }

    fn blocking_send(&self, data: Self::Data) -> io::Result<()> {
        QueuedServerReply::blocking_send(self, data)
    }

    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
        Box::new(self.clone())
    }
}