1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
//! A client library for interfacing with ssb.
//!
//! ```rust,ignore
//! sodiumoxide::init();
//! let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);
//!
//! current_thread::run(|_| {
//!     current_thread::spawn(TcpStream::connect(&addr)
//!     .and_then(|tcp| easy_ssb(tcp).unwrap().map_err(|err| panic!("{:?}", err)))
//!     .map_err(|err| panic!("{:?}", err))
//!     .map(|(mut client, receive, _)| {
//!         current_thread::spawn(receive.map_err(|err| panic!("{:?}", err)));
//!
//!         let (send_request, response) = client.whoami();
//!
//!         current_thread::spawn(send_request.map_err(|err| panic!("{:?}", err)));
//!         current_thread::spawn(response
//!                                   .map(|res| println!("{:?}", res))
//!                                   .map_err(|err| panic!("{:?}", err))
//!                                   .and_then(|_| {
//!                                                 client.close().map_err(|err| panic!("{:?}", err))
//!                                             }));
//!     }))
//! });
//! ```

#![deny(missing_docs)]
#![feature(try_from)]
#![feature(ip_constructors)] // only for tests

#[macro_use]
extern crate futures;
extern crate muxrpc;
extern crate ssb_common;
extern crate ssb_rpc;
extern crate tokio_io;
#[macro_use]
extern crate lazy_static;
extern crate serde_json;
extern crate ssb_keyfile;
extern crate secret_stream;
extern crate secret_handshake;
extern crate box_stream;
extern crate sodiumoxide;
#[cfg(test)]
extern crate tokio;

use std::convert::{From, TryInto};
use std::io;

use futures::prelude::*;
use futures::future::Then;
use futures::unsync::oneshot::Canceled;
use muxrpc::{muxrpc, RpcIn, RpcOut, Closed as RpcClosed, CloseRpc, RpcError, OutSync};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{ReadHalf, WriteHalf};
use ssb_keyfile::{KeyfileError, load_or_create_keys};
use secret_stream::OwningClient;
use ssb_common::MAINNET_IDENTIFIER;
use sodiumoxide::crypto::box_;
use box_stream::BoxDuplex;
use secret_handshake::ClientHandshakeFailure;

#[cfg(feature = "ssb")]
mod ssb;
#[cfg(feature = "ssb")]
pub use ssb::Whoami;

/// Take ownership of an AsyncRead and an AsyncWrite to create an ssb client.
///
/// # Example
///
/// ```rust,ignore
/// sodiumoxide::init();
///
/// let (pk, sk) = load_or_create_keys().unwrap();
/// let pk = pk.try_into().unwrap();
/// let sk = sk.try_into().unwrap();
/// let (ephemeral_pk, ephemeral_sk) = box_::gen_keypair();
///
/// let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);
///
/// current_thread::run(|_| {
///     current_thread::spawn(TcpStream::connect(&addr)
///                               .and_then(move |tcp| {
///         // Performs a secret-handshake and yields an encrypted duplex connection.
///         OwningClient::new(tcp,
///                           &MAINNET_IDENTIFIER,
///                           &pk, &sk,
///                           &ephemeral_pk, &ephemeral_sk,
///                           &pk)
///                 .map_err(|(err, _)| err)
///     })
///       .map_err(|err| panic!("{:?}", err))
///       .map(move |connection| {
///         let (read, write) = connection.unwrap().split();
///         let (mut client, receive, _) = ssb(read, write);
///         current_thread::spawn(receive.map_err(|err| panic!("{:?}", err)));
///
///         let (send_request, response) = client.whoami();
///
///         current_thread::spawn(send_request.map_err(|err| panic!("{:?}", err)));
///         current_thread::spawn(response
///                                   .map(|res| println!("{:?}", res))
///                                   .map_err(|err| panic!("{:?}", err))
///                                   .and_then(|_| client.close().map_err(|err| panic!("{:?}", err))));
///     }))
/// });
/// ```
pub fn ssb<R: AsyncRead, W: AsyncWrite>(r: R, w: W) -> (Client<R, W>, Receive<R, W>, Closed<W>) {
    let (rpc_in, rpc_out, rpc_closed) = muxrpc(r, w);
    (Client(rpc_out), Receive(rpc_in), Closed(rpc_closed))
}

/// An ssb client. This struct is used to send rpcs to the server.
pub struct Client<R: AsyncRead, W>(RpcOut<R, W>);

impl<R: AsyncRead, W: AsyncWrite> Client<R, W> {
    /// Close the connection to the server. If there are still active rpcs, it is not closed
    /// immediately. It will get closed once the last of them is done.
    pub fn close(self) -> Close<R, W> {
        Close(self.0.close())
    }

    /// Give access to the underlying muxrpc `RpcOut`, to send rpcs which are not directly
    /// supported by this module.
    pub fn muxrpc(&mut self) -> &mut RpcOut<R, W> {
        &mut self.0
    }

    #[cfg(feature = "ssb")]
    /// Query information about the current user.
    pub fn whoami(&mut self) -> (SendRpc<W>, Whoami<R>) {
        ssb::whoami(self)
    }
}

/// A future that has to be polled in order to receive responses from the server.
pub struct Receive<R: AsyncRead, W>(RpcIn<R, W>);

impl<R: AsyncRead, W: AsyncWrite> Future for Receive<R, W> {
    /// Yielded when the server properly terminates the connection.
    type Item = ();
    type Error = RpcError;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while let Some(_) = try_ready!(self.0.poll()) {}
        Ok(Async::Ready(()))
    }
}

/// A future that indicates when the write-half of the channel to the server has been closed.
pub struct Closed<W>(RpcClosed<W>);

impl<W> Future for Closed<W> {
    type Item = W;
    /// This can only be emitted if some rpc future/sink/stream has been polled but was then
    /// dropped before it was done. If all handles are properly polled/closed, this is never
    /// emitted.
    type Error = Canceled;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.0.poll()
    }
}

/// A future for closing the connection to the server. If there are still active rpcs, it is not
/// closed immediately. It will get closed once the last of them is done.
pub struct Close<R: AsyncRead, W>(CloseRpc<R, W>);

impl<R: AsyncRead, W: AsyncWrite> Future for Close<R, W> {
    type Item = ();

    type Error = Option<io::Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.0.poll()
    }
}

/// A future for sending an rpc to the server. If this isn't polled, the rpc is never sent.
pub struct SendRpc<W: AsyncWrite>(_SendRpc<W>);

impl<W: AsyncWrite> SendRpc<W> {
    fn new_sync(out_sync: OutSync<W>) -> SendRpc<W> {
        SendRpc(_SendRpc::Sync(out_sync))
    }
}

impl<W: AsyncWrite> Future for SendRpc<W> {
    type Item = ();
    /// `Some(err)` signals that a fatal io error occured when trying to send this rpc. `None`
    /// signals that a fatal io error happend upon sending another rpc.
    type Error = Option<io::Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.0 {
            _SendRpc::Sync(ref mut out_sync) => out_sync.poll(),
        }
    }
}

enum _SendRpc<W: AsyncWrite> {
    Sync(OutSync<W>),
}

/// Return a future for setting up an encrypted connection via the given transport
/// (using keys from the ssb keyfile) and then calling `ssb` on the connection.
///
/// This function performs blocking file io (for reading the keyfile).
///
/// This function uses libsodium internally, so ensure that `sodiumoxide::init()` has been called
/// before using this function.
///
/// # Example
///
/// ```rust,ignore
/// sodiumoxide::init();
/// let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);
///
/// current_thread::run(|_| {
///     current_thread::spawn(TcpStream::connect(&addr)
///     .and_then(|tcp| easy_ssb(tcp).unwrap().map_err(|err| panic!("{:?}", err)))
///     .map_err(|err| panic!("{:?}", err))
///     .map(|(mut client, receive, _)| {
///         current_thread::spawn(receive.map_err(|err| panic!("{:?}", err)));
///
///         let (send_request, response) = client.whoami();
///
///         current_thread::spawn(send_request.map_err(|err| panic!("{:?}", err)));
///         current_thread::spawn(response
///                                   .map(|res| println!("{:?}", res))
///                                   .map_err(|err| panic!("{:?}", err))
///                                   .and_then(|_| {
///                                                 client.close().map_err(|err| panic!("{:?}", err))
///                                             }));
///     }))
/// });
/// ```
pub fn easy_ssb<T: AsyncRead + AsyncWrite>(transport: T) -> Result<EasySsb<T>, KeyfileError> {
    let (pk, sk) = load_or_create_keys()?;
    let pk = pk.try_into().unwrap();
    let sk = sk.try_into().unwrap();
    let (ephemeral_pk, ephemeral_sk) = box_::gen_keypair();

    Ok(EasySsb::new(OwningClient::new(transport,
                                      &MAINNET_IDENTIFIER,
                                      &pk,
                                      &sk,
                                      &ephemeral_pk,
                                      &ephemeral_sk,
                                      &pk)))
}

type AR<T> = ReadHalf<BoxDuplex<T>>;
type AW<T> = WriteHalf<BoxDuplex<T>>;
type ClientTriple<T> = (Client<AR<T>, AW<T>>, Receive<AR<T>, AW<T>>, Closed<AW<T>>);

/// A future for setting up an encrypted connection via the given AsyncRead and AsyncWrite
/// (using keys from the ssb keyfile) and then calling `ssb` on the connection.
pub struct EasySsb<T: AsyncRead + AsyncWrite>(Then<OwningClient<T>,
                                                    Result<ClientTriple<T>, EasySsbError<T>>,
                                                    fn(Result<Result<BoxDuplex<T>,
                                                                     (ClientHandshakeFailure,
                                                                      T)>,
                                                              (io::Error, T)>)
                                                       -> Result<ClientTriple<T>,
                                                                  EasySsbError<T>>>);

impl<T: AsyncRead + AsyncWrite> EasySsb<T> {
    fn new(secret_client: OwningClient<T>) -> EasySsb<T> {
        EasySsb(secret_client.then(|res| match res {
                                       Ok(Ok(duplex)) => {
            let (read, write) = duplex.split();
            Ok(ssb(read, write))
        }
                                       Ok(Err((failure, transport))) => {
                                           Err(EasySsbError::FailedHandshake(failure, transport))
                                       }
                                       Err((err, transport)) => {
                                           Err(EasySsbError::IoError(err, transport))
                                       }
                                   }))
    }
}

impl<T: AsyncRead + AsyncWrite> Future for EasySsb<T> {
    type Item = (Client<AR<T>, AW<T>>, Receive<AR<T>, AW<T>>, Closed<AW<T>>);
    type Error = EasySsbError<T>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.0.poll()
    }
}

/// Everything that can go wrong when creating a client via `easy_ssb`.
#[derive(Debug)]
pub enum EasySsbError<T> {
    /// The handshake was performed without io errors but did not terminate successfully.
    FailedHandshake(ClientHandshakeFailure, T),
    /// An io error happened during the handshake.
    IoError(io::Error, T),
}

#[cfg(test)]
mod tests {
    use std::net::{SocketAddr, Ipv6Addr};

    use tokio::executor::current_thread;
    use tokio::net::TcpStream;
    use ssb_common::*;

    use super::*;

    #[test]
    fn test_easy_ssb() {
        sodiumoxide::init();
        let addr = SocketAddr::new(Ipv6Addr::localhost().into(), DEFAULT_TCP_PORT);

        current_thread::run(|_| {
            current_thread::spawn(TcpStream::connect(&addr)
            .and_then(|tcp| easy_ssb(tcp).unwrap().map_err(|err| panic!("{:?}", err)))
            .map_err(|err| panic!("{:?}", err))
            .map(|(mut client, receive, _)| {
                current_thread::spawn(receive.map_err(|err| panic!("{:?}", err)));

                let (send_request, response) = client.whoami();

                current_thread::spawn(send_request.map_err(|err| panic!("{:?}", err)));
                current_thread::spawn(response
                                          .map(|res| println!("{:?}", res))
                                          .map_err(|err| panic!("{:?}", err))
                                          .and_then(|_| {
                                                        client.close().map_err(|err| panic!("{:?}", err))
                                                    }));
            }))
        });
    }
}