1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
extern crate websocket;
use self::websocket::WebSocketError;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::RefCell;
use std::rc::Rc;
use self::websocket::server::upgrade::async::IntoWs;
use super::readdebt::{DebtHandling, ReadDebt};
use super::ws_peer::{Mode1, PeerForWs, WsReadWrapper, WsWriteWrapper};
use super::{box_up_err, io_other_error, BoxedNewPeerFuture, Peer};
use super::{ConstructParams, PeerConstructor, Specifier};
#[derive(Debug)]
pub struct WsServer<T: Specifier>(pub T);
impl<T: Specifier> Specifier for WsServer<T> {
fn construct(&self, cp: ConstructParams) -> PeerConstructor {
let mode1 = if cp.program_options.websocket_text_mode {
Mode1::Text
} else {
Mode1::Binary
};
let inner = self.0.construct(cp.clone());
inner.map(move |p| ws_upgrade_peer(p, mode1, cp.program_options.read_debt_handling))
}
specifier_boilerplate!(typ=WebSocket noglobalstate has_subspec);
self_0_is_subspecifier!(proxy_is_multiconnect);
}
specifier_class!(
name = WsServerClass,
target = WsServer,
prefixes = ["ws-upgrade:", "upgrade-ws:", "ws-u:", "u-ws:"],
arg_handling = subspec,
overlay = true,
MessageOriented,
MulticonnectnessDependsOnInnerType,
help = r#"
WebSocket upgrader / raw server. Specify your own protocol instead of usual TCP. [A]
All other WebSocket server modes actually use this overlay under the hood.
Example: serve incoming connection from socat
socat tcp-l:1234,fork,reuseaddr exec:'websocat -t ws-u\:stdio\: mirror\:'
"#
);
specifier_alias!(
name = WsTcpServerClass,
prefixes = ["ws-listen:", "ws-l:", "l-ws:", "listen-ws:"],
alias = "ws-u:tcp-l:",
help = r#"
WebSocket server. Argument is host and port to listen.
Example: Dump all incoming websocket data to console
websocat ws-l:127.0.0.1:8808 -
Example: the same, but more verbose:
websocat ws-l:tcp-l:127.0.0.1:8808 reuse:-
"#
);
specifier_alias!(
name = WsInetdServerClass,
prefixes = ["inetd-ws:", "ws-inetd:"],
alias = "ws-u:inetd:",
help = r#"
WebSocket inetd server. [A]
TODO: transfer the example here
"#
);
specifier_alias!(
name = WsUnixServerClass,
prefixes = ["l-ws-unix:"],
alias = "ws-u:unix-l:",
help = r#"
WebSocket UNIX socket-based server. [A]
"#
);
specifier_alias!(
name = WsAbstractUnixServerClass,
prefixes = ["l-ws-abstract:"],
alias = "ws-l:abstract-l:",
help = r#"
WebSocket abstract-namespaced UNIX socket server. [A]
"#
);
pub fn ws_upgrade_peer(
inner_peer: Peer,
mode1: Mode1,
ws_read_debt_handling: DebtHandling,
) -> BoxedNewPeerFuture {
let step1 = PeerForWs(inner_peer);
let step2: Box<
Future<Item = self::websocket::server::upgrade::async::Upgrade<_>, Error = _>,
> = step1.into_ws();
let step3 = step2
.map_err(|(_, _, _, e)| WebSocketError::IoError(io_other_error(e)))
.and_then(move |x| {
info!("Incoming connection to websocket: {}", x.request.subject.1);
debug!("{:?}", x.request);
debug!("{:?}", x.headers);
x.accept().map(move |(y, headers)| {
debug!("{:?}", headers);
info!("Upgraded");
let (sink, stream) = y.split();
let mpsink = Rc::new(RefCell::new(sink));
let ws_str = WsReadWrapper {
s: stream,
pingreply: mpsink.clone(),
debt: ReadDebt(Default::default(), ws_read_debt_handling),
};
let ws_sin = WsWriteWrapper(mpsink, mode1, true );
Peer::new(ws_str, ws_sin)
})
});
let step4 = step3.map_err(box_up_err);
Box::new(step4) as BoxedNewPeerFuture
}