distant_net/server/
reply.rs1use std::io;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::mpsc;
5
6use crate::common::{Id, Response};
7
8pub trait Reply: Send + Sync {
10 type Data;
11
12 fn send(&self, data: Self::Data) -> io::Result<()>;
14
15 fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>>;
17}
18
19impl<T: Send + 'static> Reply for mpsc::UnboundedSender<T> {
20 type Data = T;
21
22 fn send(&self, data: Self::Data) -> io::Result<()> {
23 mpsc::UnboundedSender::send(self, data)
24 .map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))
25 }
26
27 fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
28 Box::new(self.clone())
29 }
30}
31
32pub struct ServerReply<T> {
34 pub(crate) origin_id: Id,
35 pub(crate) tx: mpsc::UnboundedSender<Response<T>>,
36}
37
38impl<T> Clone for ServerReply<T> {
39 fn clone(&self) -> Self {
40 Self {
41 origin_id: self.origin_id.clone(),
42 tx: self.tx.clone(),
43 }
44 }
45}
46
47impl<T> ServerReply<T> {
48 pub fn send(&self, data: T) -> io::Result<()> {
49 self.tx
50 .send(Response::new(self.origin_id.clone(), data))
51 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Connection reply closed"))
52 }
53
54 pub fn is_closed(&self) -> bool {
55 self.tx.is_closed()
56 }
57
58 pub fn queue(self) -> QueuedServerReply<T> {
59 QueuedServerReply {
60 inner: self,
61 queue: Arc::new(Mutex::new(Vec::new())),
62 hold: Arc::new(Mutex::new(true)),
63 }
64 }
65}
66
67impl<T: Send + 'static> Reply for ServerReply<T> {
68 type Data = T;
69
70 fn send(&self, data: Self::Data) -> io::Result<()> {
71 ServerReply::send(self, data)
72 }
73
74 fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
75 Box::new(self.clone())
76 }
77}
78
79pub struct QueuedServerReply<T> {
84 inner: ServerReply<T>,
85 queue: Arc<Mutex<Vec<T>>>,
86 hold: Arc<Mutex<bool>>,
87}
88
89impl<T> Clone for QueuedServerReply<T> {
90 fn clone(&self) -> Self {
91 Self {
92 inner: self.inner.clone(),
93 queue: Arc::clone(&self.queue),
94 hold: Arc::clone(&self.hold),
95 }
96 }
97}
98
99impl<T> QueuedServerReply<T> {
100 pub fn hold(&self, hold: bool) {
105 *self.hold.lock().unwrap() = hold;
106 }
107
108 pub fn send(&self, data: T) -> io::Result<()> {
110 if *self.hold.lock().unwrap() {
111 self.queue.lock().unwrap().push(data);
112 Ok(())
113 } else {
114 self.inner.send(data)
115 }
116 }
117
118 pub fn send_before(&self, data: T) -> io::Result<()> {
120 if *self.hold.lock().unwrap() {
121 self.queue.lock().unwrap().insert(0, data);
122 Ok(())
123 } else {
124 self.inner.send(data)
125 }
126 }
127
128 pub fn flush(&self, hold: bool) -> io::Result<()> {
134 let mut hold_lock = self.hold.lock().unwrap();
137
138 for data in self.queue.lock().unwrap().drain(..) {
140 self.inner.send(data)?;
141 }
142
143 *hold_lock = hold;
145
146 Ok(())
147 }
148
149 pub fn is_closed(&self) -> bool {
150 self.inner.is_closed()
151 }
152}
153
154impl<T: Send + 'static> Reply for QueuedServerReply<T> {
155 type Data = T;
156
157 fn send(&self, data: Self::Data) -> io::Result<()> {
158 QueuedServerReply::send(self, data)
159 }
160
161 fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {
162 Box::new(self.clone())
163 }
164}