1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#![cfg_attr(feature="clippy", feature(plugin))]
#![deny(missing_docs)]
#[macro_use]
extern crate capnp_rpc;
extern crate capnp;
extern crate futures;
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tls;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
pub mod errors;
pub mod microservice_capnp {
#![allow(missing_docs)]
include!(concat!(env!("OUT_DIR"), "/proto/microservice_capnp.rs"));
}
use std::net::ToSocketAddrs;
use std::fs::File;
use std::io::{self, Read};
use capnp::capability::Promise;
use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp, Server};
use futures::{Future, Stream};
use native_tls::{Certificate, Pkcs12, TlsAcceptor, TlsConnector};
use tokio_core::{net, reactor};
use tokio_io::AsyncRead;
use tokio_tls::{TlsAcceptorExt, TlsConnectorExt};
use errors::*;
use microservice_capnp::microservice;
pub struct Microservice {
socket_addr: std::net::SocketAddr,
cert_filename: String,
}
impl Microservice {
pub fn new(address: &str, cert_filename: &str) -> Result<Self> {
let parsed_address = address.to_socket_addrs()?.next().ok_or_else(
|| "Could not parse socket address.",
)?;
info!("Parsed socket address: {:}", parsed_address);
Ok(Microservice {
socket_addr: parsed_address,
cert_filename: cert_filename.to_owned(),
})
}
pub fn serve(&self) -> Result<()> {
info!("Creating server and binding socket.");
let mut core = reactor::Core::new()?;
let handle = core.handle();
let socket = net::TcpListener::bind(&self.socket_addr, &handle)?;
info!("Opening server certificate");
let mut bytes = vec![];
File::open(&self.cert_filename)?.read_to_end(&mut bytes)?;
let cert = Pkcs12::from_der(&bytes, "")?;
let tls_acceptor = TlsAcceptor::builder(cert)?.build()?;
let server_impl = microservice::ToClient::new(ServerImplementation).from_server::<Server>();
let connections = socket.incoming();
let tls_handshake = connections.map(|(socket, _addr)| {
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
tls_acceptor.accept_async(socket)
});
let server = tls_handshake.map(|acceptor| {
let handle = handle.clone();
let server_impl = server_impl.clone();
acceptor.and_then(move |socket| {
let (reader, writer) = socket.split();
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system = RpcSystem::new(Box::new(network), Some(server_impl.client));
handle.spawn(rpc_system.map_err(|e| error!("{}", e)));
Ok(())
})
});
info!("Running server");
Ok(core.run(server.for_each(|client| {
handle.spawn(client.map_err(|e| error!("{}", e)));
Ok(())
}))?)
}
pub fn get_client(&self, client_cert_file: &str) -> Result<(microservice::Client, reactor::Core)> {
info!("Opening TLS client certificate.");
let mut bytes = vec![];
File::open(client_cert_file)?.read_to_end(&mut bytes)?;
let ca_cert = Certificate::from_der(&bytes)?;
info!("Creating client.");
let mut core = reactor::Core::new()?;
let handle = core.handle();
let socket = net::TcpStream::connect(&self.socket_addr, &handle);
let mut builder = TlsConnector::builder()?;
builder.add_root_certificate(ca_cert)?;
let cx = builder.build()?;
let tls_handshake = socket.and_then(|socket| {
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
cx.connect_async("localhost", socket).map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
})
});
let stream = core.run(tls_handshake)?;
let (reader, writer) = stream.split();
let network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(network, None);
let client: microservice::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc_system.map_err(|e| error!("{}", e)));
info!("Client creation successful.");
Ok((client, core))
}
}
struct ServerImplementation;
impl microservice::Server for ServerImplementation {
fn hello(
&mut self,
params: microservice::HelloParams,
mut results: microservice::HelloResults,
) -> Promise<(), capnp::Error> {
let request = pry!(pry!(params.get()).get_request());
info!("Got request: {}", request);
let response: String = request.chars().rev().collect();
results.get().set_response(&response);
info!("Returned reponse: {}", response);
Promise::ok(())
}
}