websocat/
foreachmsg_peer.rs

1use futures::future::ok;
2
3use std::rc::Rc;
4
5use super::{BoxedNewPeerFuture, Peer};
6use super::{ConstructParams, PeerConstructor, Specifier};
7
8use std::cell::RefCell;
9
10use std::io::{Error as IoError, Read, Write};
11use tokio_io::{AsyncRead, AsyncWrite};
12
13use super::{once, simple_err, wouldblock};
14use futures::{Async, Future, Poll};
15
16#[derive(Debug)]
17pub struct Foreachmsg(pub Rc<dyn Specifier>);
18impl Specifier for Foreachmsg {
19    fn construct(&self, cp: ConstructParams) -> PeerConstructor {
20        once(foreachmsg_peer(self.0.clone(), cp))
21    }
22    specifier_boilerplate!(singleconnect noglobalstate has_subspec);
23    self_0_is_subspecifier!(...);
24}
25specifier_class!(
26    name = ForeachmsgClass,
27    target = Foreachmsg,
28    prefixes = ["foreachmsg:"],
29    arg_handling = subspec,
30    overlay = true,
31    MessageBoundaryStatusDependsOnInnerType,
32    SingleConnect,
33    help = r#"
34Execute something for each incoming message.
35
36Somewhat the reverse of the `autoreconnect:`.
37
38Example:
39
40    websocat -t -u ws://server/listen_for_updates foreachmsg:writefile:status.txt
41
42This keeps only recent incoming message in file and discards earlier messages.
43"#
44);
45
46#[derive(Default)]
47struct State2 {
48    already_warned: bool,
49}
50
51#[derive(Clone)]
52enum Phase {
53    Idle,
54    WriteDebt(Vec<u8>),
55    Flushing,
56    Closing,
57    WaitingForReadToFinish,
58}
59
60struct State {
61    s: Rc<dyn Specifier>,
62    p: Option<Peer>,
63    n: Option<BoxedNewPeerFuture>,
64    cp: ConstructParams,
65    aux: State2,
66    ph: Phase,
67    finished_reading: bool,
68    read_waiter_tx: Option<futures::sync::oneshot::Sender<()>>,
69    read_waiter_rx: Option<futures::sync::oneshot::Receiver<()>>,
70    wait_for_new_peer_tx: Option<futures::sync::oneshot::Sender<()>>,
71    wait_for_new_peer_rx: Option<futures::sync::oneshot::Receiver<()>>,
72    need_wait_for_reading: bool,
73}
74
75/// This implementation's poll is to be reused many times, both after returning item and error
76impl State {
77    //type Item = &'mut Peer;
78    //type Error = Box<::std::error::Error>;
79
80    fn poll(&mut self) -> Poll<&mut Peer, Box<dyn (::std::error::Error)>> {
81        let pp = &mut self.p;
82        let nn = &mut self.n;
83
84        let aux = &mut self.aux;
85
86        loop {
87            let cp = self.cp.clone();
88            if let Some(ref mut p) = *pp {
89                return Ok(Async::Ready(p));
90            }
91
92            // Peer is not present: trying to create a new one
93
94            if let Some(mut bnpf) = nn.take() {
95                match bnpf.poll() {
96                    Ok(Async::Ready(p)) => {
97                        *pp = Some(p);
98                        if let Some(tx) = self.wait_for_new_peer_tx.take() {
99                            let _ = tx.send(());
100                        }
101                        continue;
102                    }
103                    Ok(Async::NotReady) => {
104                        *nn = Some(bnpf);
105                        return Ok(Async::NotReady);
106                    }
107                    Err(_x) => {
108                        // Stop on error:
109                        //return Err(_x);
110
111                        // Just reconnect again on error
112
113                        if !aux.already_warned {
114                            aux.already_warned = true;
115                            error!("Reconnecting failed. Trying again in tight endless loop.");
116                        }
117                    }
118                }
119            }
120            let l2r = cp.left_to_right.clone();
121            let pc: PeerConstructor = self.s.construct(cp);
122            *nn = Some(pc.get_only_first_conn(l2r));
123            self.finished_reading = false;
124            self.ph = Phase::Idle;
125            self.read_waiter_tx = None;
126            self.read_waiter_rx = None;
127        }
128    }
129}
130
131#[derive(Clone)]
132struct PeerHandle(Rc<RefCell<State>>);
133
134macro_rules! getpeer {
135    ($state:ident -> $p:ident) => {
136        let $p: &mut Peer = match $state.poll() {
137            Ok(Async::Ready(p)) => p,
138            Ok(Async::NotReady) => return wouldblock(),
139            Err(e) => {
140                return Err(simple_err(format!("{}", e)));
141            }
142        };
143    };
144}
145
146impl State {
147    fn reconnect(&mut self) {
148        info!("Reconnect");
149        self.p = None;
150        self.ph = Phase::Idle;
151        self.finished_reading = false;
152        self.read_waiter_tx = None;
153        self.read_waiter_rx = None;
154    }
155}
156
157impl Read for PeerHandle {
158    fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
159        let mut state = self.0.borrow_mut();
160        loop {
161            if let Some(w) = state.wait_for_new_peer_rx.as_mut() {
162                match w.poll() {
163                    Ok(Async::NotReady) => return wouldblock(),
164                    _ => {
165                        state.wait_for_new_peer_rx = None;
166                    }
167                }
168            }
169            let p : &mut Peer = match state.poll() {
170                Ok(Async::Ready(p)) => p,
171                Ok(Async::NotReady) => return wouldblock(),
172                Err(e) => {
173                    return Err(simple_err(format!("{}", e)));
174                }
175            };
176            #[allow(unused_assignments)]
177            let mut finished_but_loop_around = false;
178            match p.0.read(b) {
179                Ok(0) => { 
180                    state.finished_reading = true;
181                    if state.need_wait_for_reading {
182                        finished_but_loop_around = true;
183                    } else {
184                        return Ok(0);
185                    }
186                }
187                Err(e) => {
188                    if e.kind() == ::std::io::ErrorKind::WouldBlock {
189                        return Err(e);
190                    }
191                    state.finished_reading = true;
192                    warn!("{}", e);
193
194                    if state.need_wait_for_reading {
195                        // Get a new peer to read from
196                        finished_but_loop_around = true;
197                    } else {
198                        return Err(e);
199                    }
200                }
201                Ok(x) => {
202                    return Ok(x);
203                }
204            }
205            if finished_but_loop_around {
206                state.finished_reading = true;
207                let (tx,rx) = futures::sync::oneshot::channel();
208                state.wait_for_new_peer_tx = Some(tx);
209                state.wait_for_new_peer_rx = Some(rx);
210                if let Some(rw) = state.read_waiter_tx.take() {
211                    let _ = rw.send(());
212                }
213            }
214        }
215    }
216}
217impl AsyncRead for PeerHandle {}
218
219impl Write for PeerHandle {
220    fn write(&mut self, b: &[u8]) -> Result<usize, IoError> {
221        let mut state = self.0.borrow_mut();
222        
223        let mut do_reconnect = false;
224        let mut finished = false;
225        loop {
226            if do_reconnect {
227                state.reconnect();
228                do_reconnect = false;
229            } else if finished {
230                state.p = None;
231                state.ph = Phase::Idle;
232                return Ok(b.len());
233            } else {
234                let mut ph = state.ph.clone();
235                {
236                    getpeer!(state -> p);
237
238                    match ph {
239                        Phase::Idle => {
240                            match p.1.write(b) {
241                                Ok(0) => { 
242                                    info!("End-of-file write?");
243                                    return Ok(0);
244                                }
245                                Err(e) => {
246                                    if e.kind() == ::std::io::ErrorKind::WouldBlock {
247                                        return Err(e);
248                                    }
249                                    warn!("{}", e);
250                                    return Err(e);
251                                }
252                                Ok(x) if x == b.len() => {
253                                    debug!("Full write");
254                                    // A successful write. Flushing and closing the peer.
255                                    ph = Phase::Flushing;
256                                },
257                                Ok(x) => {
258                                    debug!("Partial write of {} bytes", x);
259                                    // A partial write. Creating write debt.
260                                    let debt = b[x..b.len()].to_vec();
261                                    ph = Phase::WriteDebt(debt);
262                                }
263                            }
264                        },
265                        Phase::WriteDebt(d) => {
266                            match p.1.write(&d[..]) {
267                                Ok(0) => { 
268                                    info!("End-of-file write v2?");
269                                    return Ok(0);
270                                }
271                                Err(e) => {
272                                    if e.kind() == ::std::io::ErrorKind::WouldBlock {
273                                        return Err(e);
274                                    }
275                                    warn!("{}", e);
276                                    return Err(e);
277                                }
278                                Ok(x) if x == d.len() => {
279                                    debug!("Closing the debt");
280                                    // A successful write. Flushing and closing the peer.
281                                    ph = Phase::Flushing;
282                                },
283                                Ok(x) => {
284                                    debug!("Partial write of {} debt bytes", x);
285                                    // A partial write. Retaining the write debt.
286                                    let debt = d[x..d.len()].to_vec();
287                                    ph = Phase::WriteDebt(debt);
288                                }
289                            }
290                        },
291                        Phase::Flushing => {
292                            match p.1.flush() {
293                                Err(e) => {
294                                    if e.kind() == ::std::io::ErrorKind::WouldBlock {
295                                        return Err(e);
296                                    }
297                                    warn!("{}", e);
298                                    return Err(e);
299                                }
300                                Ok(()) => {
301                                    debug!("Flushed");
302                                    ph = Phase::Closing;
303                                }
304                            }
305                        },
306                        Phase::Closing => {
307                            match p.1.shutdown() {
308                                Err(e) => {
309                                    if e.kind() == ::std::io::ErrorKind::WouldBlock {
310                                        return Err(e);
311                                    }
312                                    warn!("{}", e);
313                                    return Err(e);
314                                },
315                                Ok(Async::NotReady) => {
316                                    return wouldblock();
317                                },
318                                Ok(Async::Ready(())) => {
319                                    if state.need_wait_for_reading {
320                                        if state.finished_reading {
321                                            debug!("Closed and reading is also done");
322                                            finished=true;
323                                        } else {
324                                            debug!("Closed, but need to wait for other direction to finish");
325                                            ph = Phase::WaitingForReadToFinish;
326                                            let (tx,rx) = futures::sync::oneshot::channel();
327                                            state.read_waiter_tx = Some(tx);
328                                            state.read_waiter_rx = Some(rx);
329                                        }
330                                    } else {
331                                        debug!("Closed");
332                                        finished=true;
333                                    }
334                                }
335                            }
336                        },
337                        Phase::WaitingForReadToFinish => {
338                            match state.read_waiter_rx.as_mut().unwrap().poll() {
339                                Ok(Async::NotReady) => {
340                                    return wouldblock();
341                                }
342                                _ => {
343                                    debug!("Waited for read to finish");
344                                    finished=true;
345                                }
346                            }
347                        }
348                    }
349                }
350                state.ph = ph;
351            }
352        }
353    }
354    fn flush(&mut self) -> Result<(), IoError> {
355        // No-op here: we flush and close after each write
356        Ok(())
357    }
358}
359impl AsyncWrite for PeerHandle {
360    fn shutdown(&mut self) -> futures::Poll<(), IoError> {
361        // No-op here: we flush and close after each write
362        Ok(Async::Ready(()))
363    }
364}
365
366pub fn foreachmsg_peer(s: Rc<dyn Specifier>, cp: ConstructParams) -> BoxedNewPeerFuture {
367    let need_wait_for_reading = cp.program_options.foreachmsg_wait_reads;
368    let s = Rc::new(RefCell::new(State {
369        cp,
370        s,
371        p: None,
372        n: None,
373        aux: Default::default(),
374        ph: Phase::Idle,
375        finished_reading: false,
376        read_waiter_tx: None,
377        read_waiter_rx: None,
378        wait_for_new_peer_rx: None,
379        wait_for_new_peer_tx: None,
380        need_wait_for_reading,
381    }));
382    let ph1 = PeerHandle(s.clone());
383    let ph2 = PeerHandle(s);
384    let peer = Peer::new(ph1, ph2, None /* we handle hups ourselves */);
385    Box::new(ok(peer)) as BoxedNewPeerFuture
386}