distant_net/server/
reply.rs

1use std::io;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::mpsc;
5
6use crate::common::{Id, Response};
7
8/// Interface to send a reply to some request
9pub trait Reply: Send + Sync {
10    type Data;
11
12    /// Sends a reply out from the server.
13    fn send(&self, data: Self::Data) -> io::Result<()>;
14
15    /// Clones this reply.
16    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>>;
17}
18
19impl<T: Send + 'static> Reply for mpsc::UnboundedSender<T> {
20    type Data = T;
21
22    fn send(&self, data: Self::Data) -> io::Result<()> {
23        mpsc::UnboundedSender::send(self, data)
24            .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))
25    }
26
27    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
28        Box::new(self.clone())
29    }
30}
31
32/// Utility to send ad-hoc replies from the server back through the connection
33pub struct ServerReply<T> {
34    pub(crate) origin_id: Id,
35    pub(crate) tx: mpsc::UnboundedSender<Response<T>>,
36}
37
38impl<T> Clone for ServerReply<T> {
39    fn clone(&self) -> Self {
40        Self {
41            origin_id: self.origin_id.clone(),
42            tx: self.tx.clone(),
43        }
44    }
45}
46
47impl<T> ServerReply<T> {
48    pub fn send(&self, data: T) -> io::Result<()> {
49        self.tx
50            .send(Response::new(self.origin_id.clone(), data))
51            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed"))
52    }
53
54    pub fn is_closed(&self) -> bool {
55        self.tx.is_closed()
56    }
57
58    pub fn queue(self) -> QueuedServerReply<T> {
59        QueuedServerReply {
60            inner: self,
61            queue: Arc::new(Mutex::new(Vec::new())),
62            hold: Arc::new(Mutex::new(true)),
63        }
64    }
65}
66
67impl<T: Send + 'static> Reply for ServerReply<T> {
68    type Data = T;
69
70    fn send(&self, data: Self::Data) -> io::Result<()> {
71        ServerReply::send(self, data)
72    }
73
74    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
75        Box::new(self.clone())
76    }
77}
78
79/// Represents a reply where all sends are queued up but not sent until
80/// after the flush method is called. This reply supports injecting
81/// at the front of the queue in order to support sending messages
82/// but ensuring that some specific message is sent out first
83pub struct QueuedServerReply<T> {
84    inner: ServerReply<T>,
85    queue: Arc<Mutex<Vec<T>>>,
86    hold: Arc<Mutex<bool>>,
87}
88
89impl<T> Clone for QueuedServerReply<T> {
90    fn clone(&self) -> Self {
91        Self {
92            inner: self.inner.clone(),
93            queue: Arc::clone(&self.queue),
94            hold: Arc::clone(&self.hold),
95        }
96    }
97}
98
99impl<T> QueuedServerReply<T> {
100    /// Updates the hold status for the queue
101    ///
102    /// * If true, all messages are held until the queue is flushed
103    /// * If false, messages are sent directly as they come in
104    pub fn hold(&self, hold: bool) {
105        *self.hold.lock().unwrap() = hold;
106    }
107
108    /// Send this message, adding it to a queue if holding messages.
109    pub fn send(&self, data: T) -> io::Result<()> {
110        if *self.hold.lock().unwrap() {
111            self.queue.lock().unwrap().push(data);
112            Ok(())
113        } else {
114            self.inner.send(data)
115        }
116    }
117
118    /// Send this message before anything else in the queue
119    pub fn send_before(&self, data: T) -> io::Result<()> {
120        if *self.hold.lock().unwrap() {
121            self.queue.lock().unwrap().insert(0, data);
122            Ok(())
123        } else {
124            self.inner.send(data)
125        }
126    }
127
128    /// Sends all pending msgs queued up and clears the queue
129    ///
130    /// Additionally, takes `hold` to indicate whether or not new msgs
131    /// after the flush should continue to be held within the queue
132    /// or if all future msgs will be sent immediately
133    pub fn flush(&self, hold: bool) -> io::Result<()> {
134        // Lock hold so we can ensure that nothing gets sent
135        // to the queue after we clear it
136        let mut hold_lock = self.hold.lock().unwrap();
137
138        // Clear the queue by sending everything
139        for data in self.queue.lock().unwrap().drain(..) {
140            self.inner.send(data)?;
141        }
142
143        // Update hold to
144        *hold_lock = hold;
145
146        Ok(())
147    }
148
149    pub fn is_closed(&self) -> bool {
150        self.inner.is_closed()
151    }
152}
153
154impl<T: Send + 'static> Reply for QueuedServerReply<T> {
155    type Data = T;
156
157    fn send(&self, data: Self::Data) -> io::Result<()> {
158        QueuedServerReply::send(self, data)
159    }
160
161    fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
162        Box::new(self.clone())
163    }
164}