1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
extern crate futures;
extern crate tokio_io;

use futures::future::ok;
use std::cell::RefCell;
use std::rc::Rc;

use super::{BoxedNewPeerFuture, Peer};

use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};

use super::{once, ConstructParams, PeerConstructor, Specifier};
use futures::Future;
use std::ops::DerefMut;

#[derive(Debug)]
pub struct Reuser(pub Rc<dyn Specifier>);
impl Specifier for Reuser {
    fn construct(&self, p: ConstructParams) -> PeerConstructor {
        let send_zero_msg_on_disconnect = p.program_options.reuser_send_zero_msg_on_disconnect;
        let reuser = p.global(GlobalState::default).clone();
        let mut reuser = reuser.clone();
        let l2r = p.left_to_right.clone();
        let inner = || self.0.construct(p).get_only_first_conn(l2r);
        once(connection_reuser(
            &mut reuser,
            inner,
            send_zero_msg_on_disconnect,
        ))
    }
    specifier_boilerplate!(singleconnect has_subspec globalstate);
    self_0_is_subspecifier!(...);
}

specifier_class!(
    name = ReuserClass,
    target = Reuser,
    prefixes = ["reuse-raw:", "raw-reuse:"],
    arg_handling = subspec,
    overlay = true,
    MessageBoundaryStatusDependsOnInnerType,
    SingleConnect,
    help = r#"
Reuse subspecifier for serving multiple clients: unpredictable mode. [A]

Better used with --unidirectional, otherwise replies get directed to
random connected client.

Example: Forward multiple parallel WebSocket connections to a single persistent TCP connection

    websocat -u ws-l:0.0.0.0:8800 reuse:tcp:127.0.0.1:4567

Example (unreliable): don't disconnect SSH when websocket reconnects

    websocat ws-l:[::]:8088 reuse:tcp:127.0.0.1:22
"#
);

type PeerSlot = Rc<RefCell<Option<Peer>>>;

#[derive(Default, Clone)]
pub struct GlobalState(PeerSlot);

#[derive(Clone)]
struct PeerHandle(PeerSlot, bool);

impl Read for PeerHandle {
    fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
        if let Some(ref mut x) = *self.0.borrow_mut().deref_mut() {
            x.0.read(b)
        } else {
            unreachable!()
        }
    }
}
impl AsyncRead for PeerHandle {}

impl Write for PeerHandle {
    fn write(&mut self, b: &[u8]) -> Result<usize, IoError> {
        if let Some(ref mut x) = *self.0.borrow_mut().deref_mut() {
            x.1.write(b)
        } else {
            unreachable!()
        }
    }
    fn flush(&mut self) -> Result<(), IoError> {
        if let Some(ref mut x) = *self.0.borrow_mut().deref_mut() {
            x.1.flush()
        } else {
            unreachable!()
        }
    }
}
impl AsyncWrite for PeerHandle {
    fn shutdown(&mut self) -> futures::Poll<(), IoError> {
        if self.1 {
            let _ = self.write(b"");
        }
        if let Some(ref mut _x) = *self.0.borrow_mut().deref_mut() {
            // Ignore shutdown attempts
            Ok(futures::Async::Ready(()))
        //_x.1.shutdown()
        } else {
            unreachable!()
        }
    }
}

pub fn connection_reuser<F: FnOnce() -> BoxedNewPeerFuture>(
    s: &mut GlobalState,
    inner_peer: F,
    send_zero_msg_on_disconnect: bool,
) -> BoxedNewPeerFuture {
    let need_init = s.0.borrow().is_none();

    let rc = s.0.clone();

    if need_init {
        info!("Initializing");
        Box::new(inner_peer().and_then(move |inner| {
            {
                let mut b = rc.borrow_mut();
                let x: &mut Option<Peer> = b.deref_mut();
                *x = Some(inner);
            }

            let ps: PeerSlot = rc.clone();

            let ph1 = PeerHandle(ps, send_zero_msg_on_disconnect);
            let ph2 = ph1.clone();
            let peer = Peer::new(ph1, ph2, None /* TODO */);
            ok(peer)
        })) as BoxedNewPeerFuture
    } else {
        info!("Reusing");
        let ps: PeerSlot = rc.clone();

        let ph1 = PeerHandle(ps, send_zero_msg_on_disconnect);
        let ph2 = ph1.clone();
        let peer = Peer::new(ph1, ph2, None /* TODO */);
        Box::new(ok(peer)) as BoxedNewPeerFuture
    }
}