allium 0.1.3

Allium is a Rust library for onion routing.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
use crate::onion::circuit::CircuitId;
use crate::onion::crypto::SessionKey;
use crate::onion::protocol::*;
use crate::onion::tunnel::TunnelId;
use crate::utils::{ToBytes, TryFromBytes};
use crate::Result;
use bytes::{Bytes, BytesMut};
use std::fmt;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{error::Elapsed, timeout, Duration};

/// timeout applied during a read on the socket
const READ_TIMEOUT: Duration = Duration::from_secs(5);
/// timeout applied during a write on the socket
const WRITE_TIMEOUT: Duration = Duration::from_secs(2);

#[derive(Error, Debug)]
pub(crate) enum OnionSocketError {
    /// The stream of this `OnionSocket` has been terminated and is unavailable for communication.
    /// The cause is underlying network layer stream of type  `S` threw an I/O error during interaction
    #[error("stream has been terminated")]
    StreamTerminated(#[from] io::Error),
    /// Reading or writing on the stream of this `OnionSocket` timed out. Be aware of any possible
    /// scenarios in which a partial message has been received and the buffer is partially filled.
    /// If another operation on this `OnionSocket` is called, the buffer may not be filled with one
    /// complete message as expected and `BrokenMessage` may be returned. Alternatively, clearing
    /// the buffer may cause broken message fragments to remain in the underlying stream, which will
    /// be ending up in the buffer on the next read call.
    #[error("stream operation has timed out")]
    StreamTimeout(#[from] Elapsed),
    /// The received message is of type `TEARDOWN` and the function throwing this error cannot deal
    /// with it. The `TEARDOWN` message is always allowed by protocol to indicate a closed circuit
    /// by the connected peer.
    #[error("received teardown message that cannot be handled")]
    TeardownMessage,
    /// The received message does not comply with the protocol
    /// This may be caused by:
    /// - an undefined message type or tunnel message type
    /// - an irregular message type (like waiting for an Tunnel `TRUNCATED` message, yet receiving a
    /// `TUNNEL EXTENDED` message)
    /// - a wrong circuit id
    // In the case that a command response is awaited, incoming Tunnel Data messages may or may
    // not be ignored. Either way would be conforming to the specification.
    #[error("received broken message that cannot be parsed and violates protocol")]
    BrokenMessage,
    /// Indicates that the remote peer returned a tunnel request with an error code
    #[error("tunnel request returned error")]
    Peer,
}

pub(crate) type SocketResult<T> = std::result::Result<T, OnionSocketError>;

impl From<CircuitProtocolError> for OnionSocketError {
    fn from(e: CircuitProtocolError) -> Self {
        match e {
            CircuitProtocolError::Teardown { .. } => OnionSocketError::TeardownMessage,
            CircuitProtocolError::Unknown { .. } => OnionSocketError::BrokenMessage,
        }
    }
}

impl<E: fmt::Debug> From<TunnelProtocolError<E>> for OnionSocketError {
    fn from(e: TunnelProtocolError<E>) -> Self {
        match e {
            TunnelProtocolError::Peer(_) => OnionSocketError::Peer,
            _ => OnionSocketError::BrokenMessage,
        }
    }
}

/// Wraps an underlying network primitive (eg. TCP or TLS stream) and provides methods implementing the protocol.
/// Utilizes an internal buffer for (de-)serialization.
///
/// The socket layer serves as glue between the onion protocol and higher layers.
pub(crate) struct OnionSocket<S> {
    stream: S,
    buf: BytesMut,
}

impl<S> OnionSocket<S> {
    pub(crate) fn new(stream: S) -> Self {
        OnionSocket {
            stream,
            buf: BytesMut::with_capacity(MESSAGE_SIZE),
        }
    }
}

impl<S: AsyncRead + Unpin> OnionSocket<S> {
    async fn read_buf_from_stream(&mut self) -> SocketResult<usize> {
        Ok(timeout(READ_TIMEOUT, self.stream.read_exact(&mut self.buf)).await??)
    }

    /// Listends for incoming `CIRCUIT CREATE` messages and returns the circuit id and key in this
    /// message.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    /// - `TeardownMessage` - A `TEARDOWN` message has been received instead of `CIRCUIT CREATE`
    /// - `BrokenMessage` - The received answer message could not be parsed
    pub(crate) async fn accept_handshake(&mut self) -> SocketResult<(CircuitId, Key)> {
        self.buf.resize(MESSAGE_SIZE, 0);
        self.read_buf_from_stream().await?;
        let msg = CircuitCreate::try_read_from(&mut self.buf)?;
        Ok((msg.circuit_id, msg.key))
    }

    /// Tries to read an entire onion protocol message before returning. This function does not
    /// apply a timeout on stream listening, so expect this function to deadlock if the stream is
    /// idle, but kept alive.
    ///
    /// Returns a `CIRCUIT OPAQUE` message if the received message could successfully be parsed. If
    /// not, an error will be returned.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `TeardownMessage` - A `TEARDOWN` message has been received instead of `CIRCUIT OPAQUE`
    /// - `BrokenMessage` - The received answer message could not be parsed
    pub(crate) async fn accept_opaque(
        &mut self,
    ) -> SocketResult<CircuitOpaque<CircuitOpaqueBytes>> {
        self.buf.resize(MESSAGE_SIZE, 0);
        // NOTE: no timeout applied here, parent is supposed to handle that
        self.stream.read_exact(&mut self.buf).await?;
        //.context("Error while reading CircuitOpaque")?;
        let msg = CircuitOpaque::try_read_from(&mut self.buf)?;
        Ok(msg)
    }
}

impl<S: AsyncWrite + Unpin> OnionSocket<S> {
    async fn write_buf_to_stream(&mut self) -> SocketResult<()> {
        Ok(timeout(WRITE_TIMEOUT, self.stream.write_all(self.buf.as_ref())).await??)
    }

    async fn encrypt_and_send_opaque<K: ToBytes>(
        &mut self,
        circuit_id: u16,
        session_keys: &[SessionKey],
        tunnel_res: K,
    ) -> SocketResult<()> {
        let req = CircuitOpaque {
            circuit_id,
            payload: CircuitOpaquePayload {
                msg: &tunnel_res,
                encrypt_keys: session_keys,
            },
        };

        req.write_to(&mut self.buf);
        assert_eq!(self.buf.len(), MESSAGE_SIZE);
        // TODO omit timeout here?
        self.write_buf_to_stream().await
    }

    /// Sends a `CIRCUIT CREATED` reply message to the connected peer with the given `circuit_id`
    /// and `key`.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn finalize_handshake(
        &mut self,
        circuit_id: CircuitId,
        key: SignKey<'_>,
    ) -> SocketResult<()> {
        self.buf.clear();
        let res = CircuitCreated { circuit_id, key };
        res.write_padded_to(&mut self.buf, MESSAGE_SIZE);
        self.write_buf_to_stream().await?;
        Ok(())
    }

    /// Replies on this `OnionSocket` with an `EXTENDED` message to a successful `EXTEND` call.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn finalize_tunnel_handshake(
        &mut self,
        circuit_id: CircuitId,
        key: VerifyKey,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_res = TunnelResponseExtended { peer_key: key };
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_res)
            .await
        //.context("Error while writing CircuitOpaque<TunnelResponse::Extended>")?;
    }

    /// Replies on this `OnionSocket` with an `EXTENDED` message to an unsuccessful `EXTEND` call
    /// with error code `error_code`.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn reject_tunnel_handshake(
        &mut self,
        circuit_id: CircuitId,
        session_keys: &[SessionKey],
        error: TunnelExtendedError,
    ) -> SocketResult<()> {
        self.buf.clear();
        self.encrypt_and_send_opaque(circuit_id, session_keys, error)
            .await
        //.context("Error while writing CircuitOpaque<TunnelResponse::Extended>")?;
    }

    /// Replies on this `OnionSocket` with a `TRUNCATED` message to a successful `TRUNCATE` call.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn finalize_tunnel_truncate(
        &mut self,
        circuit_id: CircuitId,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_res = TunnelResponseTruncated;
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_res)
            .await
        //.context("Error while writing CircuitOpaque<TunnelResponse::Truncated>")?;
    }

    /// Replies on this `OnionSocket` with an `TRUNCATED` message to a unsuccessful `TRUNCATE` call
    /// with error code `error_code`.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn reject_tunnel_truncate(
        &mut self,
        circuit_id: CircuitId,
        session_keys: &[SessionKey],
        error: TunnelTruncatedError,
    ) -> SocketResult<()> {
        self.buf.clear();
        self.encrypt_and_send_opaque(circuit_id, session_keys, error)
            .await
        //.context("Error while writing CircuitOpaque<TunnelResponse::Extended>")?;
    }

    /// Forwards an already correctly encrypted `payload` to the stream in this `OnionSocket`
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn forward_opaque(
        &mut self,
        circuit_id: CircuitId,
        payload: CircuitOpaqueBytes,
    ) -> SocketResult<()> {
        self.buf.clear();
        let msg = CircuitOpaque {
            circuit_id,
            payload,
        };

        msg.write_padded_to(&mut self.buf, MESSAGE_SIZE);
        // FIXME Do we want to apply the timeout here? Generally: no, but what do we do instead?
        self.write_buf_to_stream().await?;
        //.context("Error while writing CircuitOpaque")?;
        Ok(())
    }

    /// Sends a `TEARDOWN` message via the stream.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    pub(crate) async fn teardown(&mut self, circuit_id: CircuitId) -> SocketResult<()> {
        self.buf.clear();
        let res = CircuitTeardown { circuit_id };
        res.write_padded_to(&mut self.buf, MESSAGE_SIZE);
        // NOTE: A timeout needs to be applied here
        self.write_buf_to_stream().await?;
        Ok(())
    }

    /// Sends a `TUNNEL BEGIN` message via this stream with the given `tunnel_id` to indicate a
    /// conversation begin to the final hop on this socket. This function does not block for
    /// responses.
    ///
    /// If the targeted hop is not the final hop or forbids the connection, it may send a `TEARDOWN`
    /// message that will not be read here, but listening on the connection for `TUNNEL DATA`
    /// packets may reveal this.
    /// If the connection is rejected, an `END` packet may be sent by the connected hop, which may
    /// be evaluated when handling the incoming packets.
    pub(crate) async fn begin(
        &mut self,
        circuit_id: CircuitId,
        tunnel_id: TunnelId,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_res = TunnelRequest::Begin(tunnel_id);
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_res)
            .await
    }

    pub(crate) async fn send_data(
        &mut self,
        circuit_id: CircuitId,
        tunnel_id: TunnelId,
        data: Bytes,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_req = TunnelRequest::Data(tunnel_id, data);
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_req)
            .await
    }

    /// Sends a `TUNNEL END` message via this stream with the given `tunnel_id` to indicate a
    /// conversation end to the final hop on this socket. This function does not block for
    /// responses.
    ///
    /// If the targeted hop is not the final hop or forbids the connection, it may send a `TEARDOWN`
    /// message that will not be read here, but listening on the connection for `TUNNEL DATA`
    /// packets may reveal this.
    pub(crate) async fn end(
        &mut self,
        circuit_id: CircuitId,
        tunnel_id: TunnelId,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_res = TunnelRequest::End(tunnel_id);
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_res)
            .await
    }

    pub(crate) async fn send_keep_alive(
        &mut self,
        circuit_id: CircuitId,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_req = TunnelRequest::KeepAlive;
        self.encrypt_and_send_opaque(circuit_id, session_keys, tunnel_req)
            .await
    }
}

impl<S: AsyncWrite + AsyncRead + Unpin> OnionSocket<S> {
    /// Performs a circuit handshake with the peer connected to this socket.
    /// The `CIRCUIT CREATE` message is sent with the given `key` and sent to the peer. Then, this
    /// method tries to receive a `CIRCUIT CREATED` message from the peer. If parsed correctly, the
    /// received peer's key is returned.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    /// - `TeardownMessage` - A `TEARDOWN`message has been received instead of `CIRCUIT CREATED`
    /// - `BrokenMessage` - The received answer message could not be parsed or has an unexpected
    ///   circuit_id
    pub(crate) async fn initiate_handshake(
        &mut self,
        circuit_id: CircuitId,
        key: Key,
    ) -> SocketResult<VerifyKey> {
        self.buf.clear();
        let req = CircuitCreate { circuit_id, key };

        req.write_padded_to(&mut self.buf, MESSAGE_SIZE);
        self.write_buf_to_stream().await?;

        self.read_buf_from_stream().await?;
        let res = CircuitCreated::try_read_from(&mut self.buf)?;
        if res.circuit_id == circuit_id {
            Ok(res.key)
        } else {
            Err(OnionSocketError::BrokenMessage)
        }
    }

    /// Initializes a tunnel handshake by forwarding the given `circuit_id` and `key` through a
    /// tunnel with the connected peer as its first hop.
    ///
    /// The last hop in the tunnel will try to extend the tunnel to the peer defined by its address
    /// in `peer_addr`.
    ///
    /// To encrypt the `OPAQUE` message, `aes_keys` will be used. The keys in `aes_keys` are
    /// expected to be in encrypt order.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    /// - `TeardownMessage` - A `TEARDOWN`message has been received instead of `CIRCUIT OPAQUE`
    /// - `BrokenMessage` - The received answer message could not be parsed
    pub(crate) async fn initiate_tunnel_handshake(
        &mut self,
        circuit_id: CircuitId,
        peer_addr: SocketAddr,
        key: Key,
        session_keys: &[SessionKey],
    ) -> SocketResult<VerifyKey> {
        self.buf.clear();
        let tunnel_req = TunnelRequest::Extend(peer_addr, key);
        let req = CircuitOpaque {
            circuit_id,
            payload: CircuitOpaquePayload {
                msg: &tunnel_req,
                encrypt_keys: session_keys,
            },
        };

        req.write_to(&mut self.buf);
        assert_eq!(self.buf.len(), MESSAGE_SIZE);
        // TODO Fix timeout
        self.write_buf_to_stream().await?;

        self.read_buf_from_stream().await?;
        let mut res = CircuitOpaque::try_read_from(&mut self.buf)?;

        if res.circuit_id != circuit_id {
            return Err(OnionSocketError::BrokenMessage);
            //return Err(anyhow!(
            //    "Circuit ID in Opaque response does not match ID in request"
            //));
        }

        res.decrypt(session_keys.iter().rev())
            .map_err(|_| OnionSocketError::BrokenMessage)?;
        let tunnel_res = TunnelResponseExtended::read_with_digest_from(&mut res.payload.bytes)?;
        //.context("Invalid TunnelResponse message")?;

        Ok(tunnel_res.peer_key)
    }

    /// Sends a `TUNNEL TRUNCATE` message to the socket. The receiving peer is determined by the
    /// length of `aes_keys`. For example, if `aes_keys` consists of two elements, the hop at
    /// index 1 (the second hop) will be able to read the packet and process the truncate.
    ///
    /// To encrypt the `OPAQUE` message, `aes_keys` will be used. The keys in `aes_keys` are
    /// expected to be in encrypt order.
    ///
    /// # Errors:
    /// - `StreamTerminated` - The stream is broken
    /// - `StreamTimeout` -  The stream operations timed out
    /// - `TeardownMessage` - A `TEARDOWN`message has been received instead of `CIRCUIT OPAQUE`
    /// - `BrokenMessage` - The received answer message could not be parsed
    pub(crate) async fn truncate_tunnel(
        &mut self,
        circuit_id: CircuitId,
        session_keys: &[SessionKey],
    ) -> SocketResult<()> {
        self.buf.clear();
        let tunnel_req = TunnelRequest::Truncate;
        let req = CircuitOpaque {
            circuit_id,
            payload: CircuitOpaquePayload {
                msg: &tunnel_req,
                encrypt_keys: session_keys,
            },
        };

        req.write_to(&mut self.buf);
        assert_eq!(self.buf.len(), MESSAGE_SIZE);
        // TODO Fix timeout
        self.write_buf_to_stream().await?;

        self.read_buf_from_stream().await?;
        let mut res = CircuitOpaque::try_read_from(&mut self.buf)?;

        if res.circuit_id != circuit_id {
            return Err(OnionSocketError::BrokenMessage);
            //return Err(anyhow!(
            //    "Circuit ID in Opaque response does not match ID in request"
            //));
        }

        res.decrypt(session_keys.iter().rev())
            .map_err(|_| OnionSocketError::BrokenMessage)?;
        let _tunnel_res = TunnelResponseTruncated::read_with_digest_from(&mut res.payload.bytes)?;
        //.context("Invalid TunnelResponse message")?;

        Ok(())
    }
}

impl OnionSocket<TcpStream> {
    pub(crate) fn peer_addr(&self) -> Result<SocketAddr> {
        Ok(self.stream.peer_addr()?)
    }
}

impl<S: fmt::Debug> fmt::Debug for OnionSocket<S> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("OnionSocket")
            .field("stream", &self.stream)
            .field("buf_len", &self.buf.len())
            .finish()
    }
}