websocat/
foreachmsg_peer.rs1use futures::future::ok;
2
3use std::rc::Rc;
4
5use super::{BoxedNewPeerFuture, Peer};
6use super::{ConstructParams, PeerConstructor, Specifier};
7
8use std::cell::RefCell;
9
10use std::io::{Error as IoError, Read, Write};
11use tokio_io::{AsyncRead, AsyncWrite};
12
13use super::{once, simple_err, wouldblock};
14use futures::{Async, Future, Poll};
15
16#[derive(Debug)]
17pub struct Foreachmsg(pub Rc<dyn Specifier>);
18impl Specifier for Foreachmsg {
19 fn construct(&self, cp: ConstructParams) -> PeerConstructor {
20 once(foreachmsg_peer(self.0.clone(), cp))
21 }
22 specifier_boilerplate!(singleconnect noglobalstate has_subspec);
23 self_0_is_subspecifier!(...);
24}
25specifier_class!(
26 name = ForeachmsgClass,
27 target = Foreachmsg,
28 prefixes = ["foreachmsg:"],
29 arg_handling = subspec,
30 overlay = true,
31 MessageBoundaryStatusDependsOnInnerType,
32 SingleConnect,
33 help = r#"
34Execute something for each incoming message.
35
36Somewhat the reverse of the `autoreconnect:`.
37
38Example:
39
40 websocat -t -u ws://server/listen_for_updates foreachmsg:writefile:status.txt
41
42This keeps only recent incoming message in file and discards earlier messages.
43"#
44);
45
46#[derive(Default)]
47struct State2 {
48 already_warned: bool,
49}
50
51#[derive(Clone)]
52enum Phase {
53 Idle,
54 WriteDebt(Vec<u8>),
55 Flushing,
56 Closing,
57 WaitingForReadToFinish,
58}
59
60struct State {
61 s: Rc<dyn Specifier>,
62 p: Option<Peer>,
63 n: Option<BoxedNewPeerFuture>,
64 cp: ConstructParams,
65 aux: State2,
66 ph: Phase,
67 finished_reading: bool,
68 read_waiter_tx: Option<futures::sync::oneshot::Sender<()>>,
69 read_waiter_rx: Option<futures::sync::oneshot::Receiver<()>>,
70 wait_for_new_peer_tx: Option<futures::sync::oneshot::Sender<()>>,
71 wait_for_new_peer_rx: Option<futures::sync::oneshot::Receiver<()>>,
72 need_wait_for_reading: bool,
73}
74
75impl State {
77 fn poll(&mut self) -> Poll<&mut Peer, Box<dyn (::std::error::Error)>> {
81 let pp = &mut self.p;
82 let nn = &mut self.n;
83
84 let aux = &mut self.aux;
85
86 loop {
87 let cp = self.cp.clone();
88 if let Some(ref mut p) = *pp {
89 return Ok(Async::Ready(p));
90 }
91
92 if let Some(mut bnpf) = nn.take() {
95 match bnpf.poll() {
96 Ok(Async::Ready(p)) => {
97 *pp = Some(p);
98 if let Some(tx) = self.wait_for_new_peer_tx.take() {
99 let _ = tx.send(());
100 }
101 continue;
102 }
103 Ok(Async::NotReady) => {
104 *nn = Some(bnpf);
105 return Ok(Async::NotReady);
106 }
107 Err(_x) => {
108 if !aux.already_warned {
114 aux.already_warned = true;
115 error!("Reconnecting failed. Trying again in tight endless loop.");
116 }
117 }
118 }
119 }
120 let l2r = cp.left_to_right.clone();
121 let pc: PeerConstructor = self.s.construct(cp);
122 *nn = Some(pc.get_only_first_conn(l2r));
123 self.finished_reading = false;
124 self.ph = Phase::Idle;
125 self.read_waiter_tx = None;
126 self.read_waiter_rx = None;
127 }
128 }
129}
130
131#[derive(Clone)]
132struct PeerHandle(Rc<RefCell<State>>);
133
134macro_rules! getpeer {
135 ($state:ident -> $p:ident) => {
136 let $p: &mut Peer = match $state.poll() {
137 Ok(Async::Ready(p)) => p,
138 Ok(Async::NotReady) => return wouldblock(),
139 Err(e) => {
140 return Err(simple_err(format!("{}", e)));
141 }
142 };
143 };
144}
145
146impl State {
147 fn reconnect(&mut self) {
148 info!("Reconnect");
149 self.p = None;
150 self.ph = Phase::Idle;
151 self.finished_reading = false;
152 self.read_waiter_tx = None;
153 self.read_waiter_rx = None;
154 }
155}
156
157impl Read for PeerHandle {
158 fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
159 let mut state = self.0.borrow_mut();
160 loop {
161 if let Some(w) = state.wait_for_new_peer_rx.as_mut() {
162 match w.poll() {
163 Ok(Async::NotReady) => return wouldblock(),
164 _ => {
165 state.wait_for_new_peer_rx = None;
166 }
167 }
168 }
169 let p : &mut Peer = match state.poll() {
170 Ok(Async::Ready(p)) => p,
171 Ok(Async::NotReady) => return wouldblock(),
172 Err(e) => {
173 return Err(simple_err(format!("{}", e)));
174 }
175 };
176 #[allow(unused_assignments)]
177 let mut finished_but_loop_around = false;
178 match p.0.read(b) {
179 Ok(0) => {
180 state.finished_reading = true;
181 if state.need_wait_for_reading {
182 finished_but_loop_around = true;
183 } else {
184 return Ok(0);
185 }
186 }
187 Err(e) => {
188 if e.kind() == ::std::io::ErrorKind::WouldBlock {
189 return Err(e);
190 }
191 state.finished_reading = true;
192 warn!("{}", e);
193
194 if state.need_wait_for_reading {
195 finished_but_loop_around = true;
197 } else {
198 return Err(e);
199 }
200 }
201 Ok(x) => {
202 return Ok(x);
203 }
204 }
205 if finished_but_loop_around {
206 state.finished_reading = true;
207 let (tx,rx) = futures::sync::oneshot::channel();
208 state.wait_for_new_peer_tx = Some(tx);
209 state.wait_for_new_peer_rx = Some(rx);
210 if let Some(rw) = state.read_waiter_tx.take() {
211 let _ = rw.send(());
212 }
213 }
214 }
215 }
216}
217impl AsyncRead for PeerHandle {}
218
219impl Write for PeerHandle {
220 fn write(&mut self, b: &[u8]) -> Result<usize, IoError> {
221 let mut state = self.0.borrow_mut();
222
223 let mut do_reconnect = false;
224 let mut finished = false;
225 loop {
226 if do_reconnect {
227 state.reconnect();
228 do_reconnect = false;
229 } else if finished {
230 state.p = None;
231 state.ph = Phase::Idle;
232 return Ok(b.len());
233 } else {
234 let mut ph = state.ph.clone();
235 {
236 getpeer!(state -> p);
237
238 match ph {
239 Phase::Idle => {
240 match p.1.write(b) {
241 Ok(0) => {
242 info!("End-of-file write?");
243 return Ok(0);
244 }
245 Err(e) => {
246 if e.kind() == ::std::io::ErrorKind::WouldBlock {
247 return Err(e);
248 }
249 warn!("{}", e);
250 return Err(e);
251 }
252 Ok(x) if x == b.len() => {
253 debug!("Full write");
254 ph = Phase::Flushing;
256 },
257 Ok(x) => {
258 debug!("Partial write of {} bytes", x);
259 let debt = b[x..b.len()].to_vec();
261 ph = Phase::WriteDebt(debt);
262 }
263 }
264 },
265 Phase::WriteDebt(d) => {
266 match p.1.write(&d[..]) {
267 Ok(0) => {
268 info!("End-of-file write v2?");
269 return Ok(0);
270 }
271 Err(e) => {
272 if e.kind() == ::std::io::ErrorKind::WouldBlock {
273 return Err(e);
274 }
275 warn!("{}", e);
276 return Err(e);
277 }
278 Ok(x) if x == d.len() => {
279 debug!("Closing the debt");
280 ph = Phase::Flushing;
282 },
283 Ok(x) => {
284 debug!("Partial write of {} debt bytes", x);
285 let debt = d[x..d.len()].to_vec();
287 ph = Phase::WriteDebt(debt);
288 }
289 }
290 },
291 Phase::Flushing => {
292 match p.1.flush() {
293 Err(e) => {
294 if e.kind() == ::std::io::ErrorKind::WouldBlock {
295 return Err(e);
296 }
297 warn!("{}", e);
298 return Err(e);
299 }
300 Ok(()) => {
301 debug!("Flushed");
302 ph = Phase::Closing;
303 }
304 }
305 },
306 Phase::Closing => {
307 match p.1.shutdown() {
308 Err(e) => {
309 if e.kind() == ::std::io::ErrorKind::WouldBlock {
310 return Err(e);
311 }
312 warn!("{}", e);
313 return Err(e);
314 },
315 Ok(Async::NotReady) => {
316 return wouldblock();
317 },
318 Ok(Async::Ready(())) => {
319 if state.need_wait_for_reading {
320 if state.finished_reading {
321 debug!("Closed and reading is also done");
322 finished=true;
323 } else {
324 debug!("Closed, but need to wait for other direction to finish");
325 ph = Phase::WaitingForReadToFinish;
326 let (tx,rx) = futures::sync::oneshot::channel();
327 state.read_waiter_tx = Some(tx);
328 state.read_waiter_rx = Some(rx);
329 }
330 } else {
331 debug!("Closed");
332 finished=true;
333 }
334 }
335 }
336 },
337 Phase::WaitingForReadToFinish => {
338 match state.read_waiter_rx.as_mut().unwrap().poll() {
339 Ok(Async::NotReady) => {
340 return wouldblock();
341 }
342 _ => {
343 debug!("Waited for read to finish");
344 finished=true;
345 }
346 }
347 }
348 }
349 }
350 state.ph = ph;
351 }
352 }
353 }
354 fn flush(&mut self) -> Result<(), IoError> {
355 Ok(())
357 }
358}
359impl AsyncWrite for PeerHandle {
360 fn shutdown(&mut self) -> futures::Poll<(), IoError> {
361 Ok(Async::Ready(()))
363 }
364}
365
366pub fn foreachmsg_peer(s: Rc<dyn Specifier>, cp: ConstructParams) -> BoxedNewPeerFuture {
367 let need_wait_for_reading = cp.program_options.foreachmsg_wait_reads;
368 let s = Rc::new(RefCell::new(State {
369 cp,
370 s,
371 p: None,
372 n: None,
373 aux: Default::default(),
374 ph: Phase::Idle,
375 finished_reading: false,
376 read_waiter_tx: None,
377 read_waiter_rx: None,
378 wait_for_new_peer_rx: None,
379 wait_for_new_peer_tx: None,
380 need_wait_for_reading,
381 }));
382 let ph1 = PeerHandle(s.clone());
383 let ph2 = PeerHandle(s);
384 let peer = Peer::new(ph1, ph2, None );
385 Box::new(ok(peer)) as BoxedNewPeerFuture
386}