sbd_e2e_crypto_client/
lib.rs

1//! Sbd end to end encryption client.
2#![deny(missing_docs)]
3
4// Mutex strategy in this file is built on the assumption that this will
5// largely be network bound. Since we only have the one rate-limited connection
6// to the sbd server, it is okay to wrap it with a tokio Mutex and do the
7// encryption / decryption while that mutex is locked. Without this top-level
8// locking it is much easier to send secretstream headers out of order,
9// especially on the receiving new connection side when a naive implementation
10// trying to be clever might not lock the send side correctly.
11
12use std::collections::HashMap;
13use std::io::{Error, Result};
14use std::sync::{Arc, Mutex, Weak};
15
16pub use sbd_client::PubKey;
17
18mod sodoken_crypto;
19pub use sodoken_crypto::*;
20
21/// Configuration for setting up an SbdClientCrypto connection.
22pub struct Config {
23    /// If `true` we will accept incoming "connections", otherwise
24    /// messages from nodes we didn't explicitly "connect" to will
25    /// be ignored.
26    pub listener: bool,
27
28    /// If `true` we will allow connecting to insecure plaintext servers.
29    pub allow_plain_text: bool,
30
31    /// Cooldown time to prevent comms on "connection" close.
32    pub cooldown: std::time::Duration,
33
34    /// Max connection count.
35    pub max_connections: usize,
36
37    /// Max time without receiving before a connection is "closed".
38    pub max_idle: std::time::Duration,
39}
40
41impl Default for Config {
42    fn default() -> Self {
43        Self {
44            listener: false,
45            allow_plain_text: false,
46            cooldown: std::time::Duration::from_secs(10),
47            max_connections: 4096,
48            max_idle: std::time::Duration::from_secs(10),
49        }
50    }
51}
52
53enum Conn {
54    Cooldown(tokio::time::Instant),
55    Active {
56        last_active: tokio::time::Instant,
57        enc: sodoken_crypto::Encryptor,
58        dec: sodoken_crypto::Decryptor,
59    },
60}
61
62struct Inner {
63    config: Arc<Config>,
64    crypto: sodoken_crypto::SodokenCrypto,
65    client: sbd_client::SbdClient,
66    map: HashMap<PubKey, Conn>,
67}
68
69fn do_close_peer(pk: &PubKey, conn: &mut Conn, cooldown: std::time::Duration) {
70    tracing::debug!(
71        target: "NETAUDIT",
72        pub_key = ?pk,
73        cooldown_s = cooldown.as_secs_f64(),
74        m = "sbd-e2e-crypto-client",
75        a = "close_peer",
76    );
77    *conn = Conn::Cooldown(tokio::time::Instant::now() + cooldown);
78}
79
80impl Inner {
81    pub async fn close(&mut self) {
82        self.client.close().await;
83    }
84
85    pub fn close_peer(&mut self, pk: &PubKey) {
86        if let Some(conn) = self.map.get_mut(pk) {
87            do_close_peer(pk, conn, self.config.cooldown);
88        }
89    }
90
91    pub async fn assert(&mut self, pk: &PubKey) -> Result<()> {
92        let Self {
93            config,
94            crypto,
95            client,
96            map,
97        } = self;
98
99        let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?;
100
101        match conn {
102            Conn::Cooldown(_) => {
103                Err(Error::other("connection still cooling down"))
104            }
105            Conn::Active { .. } => {
106                if let Err(err) = async {
107                    if let Some(hdr) = hdr {
108                        client.send(pk, &hdr).await
109                    } else {
110                        Ok(())
111                    }
112                }
113                .await
114                {
115                    do_close_peer(pk, conn, config.cooldown);
116                    Err(err)
117                } else {
118                    Ok(())
119                }
120            }
121        }
122    }
123
124    pub async fn recv(
125        &mut self,
126        msg: sbd_client::Msg,
127    ) -> Result<Option<(PubKey, Vec<u8>)>> {
128        let Self {
129            config,
130            crypto,
131            client,
132            map,
133        } = self;
134
135        let pk = msg.pub_key();
136
137        match Self::priv_assert_con(&pk, config, crypto, map, config.listener) {
138            Err(_) => Ok(None),
139            Ok((conn, hdr)) => {
140                if let Some(hdr) = hdr {
141                    client.send(&pk, &hdr).await?;
142                }
143
144                match conn {
145                    Conn::Cooldown(_) => Ok(None),
146                    Conn::Active {
147                        last_active, dec, ..
148                    } => {
149                        *last_active = tokio::time::Instant::now();
150
151                        match dec.decrypt(msg.message()) {
152                            Err(_) => {
153                                do_close_peer(&pk, conn, config.cooldown);
154                                Ok(None)
155                            }
156                            Ok(None) => Ok(None),
157                            Ok(Some(msg)) => Ok(Some((pk, msg))),
158                        }
159                    }
160                }
161            }
162        }
163    }
164
165    pub async fn send(&mut self, pk: &PubKey, msg: &[u8]) -> Result<()> {
166        let Self {
167            config,
168            crypto,
169            client,
170            map,
171        } = self;
172
173        let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?;
174
175        match conn {
176            Conn::Cooldown(_) => {
177                Err(Error::other("connection still cooling down"))
178            }
179            Conn::Active { enc, .. } => {
180                if let Err(err) = async {
181                    if let Some(hdr) = hdr {
182                        client.send(pk, &hdr).await?;
183                    }
184                    let msg = enc.encrypt(msg)?;
185                    client.send(pk, &msg).await
186                }
187                .await
188                {
189                    do_close_peer(pk, conn, config.cooldown);
190                    Err(err)
191                } else {
192                    Ok(())
193                }
194            }
195        }
196    }
197
198    fn prune(config: &Config, map: &mut HashMap<PubKey, Conn>) {
199        let now = tokio::time::Instant::now();
200
201        map.retain(|pk, c| {
202            if let Conn::Active { last_active, .. } = c {
203                if now - *last_active > config.max_idle {
204                    do_close_peer(pk, c, config.cooldown);
205                }
206            }
207
208            if let Conn::Cooldown(at) = c {
209                now < *at
210            } else {
211                true
212            }
213        })
214    }
215
216    fn priv_assert_con<'a>(
217        pk: &PubKey,
218        config: &Config,
219        crypto: &sodoken_crypto::SodokenCrypto,
220        map: &'a mut HashMap<PubKey, Conn>,
221        do_create: bool,
222    ) -> Result<(&'a mut Conn, Option<[u8; 24]>)> {
223        use std::collections::hash_map::Entry;
224
225        // TODO - more efficient to only prune if we need to
226        //        but then, we'd need to manage expired cooldowns
227        //        in-line, lest we keep denying connections
228        //if map.len() >= config.max_connections {
229        //    Self::prune(config, map);
230        //}
231        // instead, for now, we just always prune
232        Self::prune(config, map);
233
234        let len = map.len();
235
236        match map.entry(pk.clone()) {
237            Entry::Occupied(e) => Ok((e.into_mut(), None)),
238            Entry::Vacant(e) => {
239                if !do_create {
240                    return Err(Error::other("ignore"));
241                }
242                if len >= config.max_connections {
243                    tracing::debug!(
244                        target: "NETAUDIT",
245                        pub_key = ?pk,
246                        m = "sbd-e2e-crypto-client",
247                        "cannot open: too many connections",
248                    );
249                    return Err(Error::other("too many connections"));
250                }
251                tracing::debug!(
252                    target: "NETAUDIT",
253                    pub_key = ?pk,
254                    m = "sbd-e2e-crypto-client",
255                    a = "open_peer",
256                );
257                let (enc, hdr, dec) = crypto.new_enc(pk)?;
258                Ok((
259                    e.insert(Conn::Active {
260                        last_active: tokio::time::Instant::now(),
261                        enc,
262                        dec,
263                    }),
264                    Some(hdr),
265                ))
266            }
267        }
268    }
269}
270
271async fn close_inner(inner: &mut Option<Inner>) {
272    if let Some(mut inner) = inner.take() {
273        inner.close().await;
274    }
275}
276
277/// Handle to receive data from the crypto connection.
278pub struct MsgRecv {
279    inner: Weak<tokio::sync::Mutex<Option<Inner>>>,
280    recv: sbd_client::MsgRecv,
281}
282
283impl MsgRecv {
284    /// Receive data from the crypto connection.
285    pub async fn recv(&mut self) -> Option<(PubKey, Vec<u8>)> {
286        loop {
287            let raw_msg = match self.recv.recv().await {
288                None => return None,
289                Some(raw_msg) => raw_msg,
290            };
291
292            if let Some(inner) = self.inner.upgrade() {
293                let mut lock = inner.lock().await;
294
295                if let Some(inner) = &mut *lock {
296                    match inner.recv(raw_msg).await {
297                        Err(_) => (),
298                        Ok(None) => continue,
299                        Ok(Some(o)) => return Some(o),
300                    }
301                } else {
302                    return None;
303                }
304
305                // the only code path leading out of the branches above
306                // is the error one where we need to close the connection
307                close_inner(&mut lock).await;
308            } else {
309                return None;
310            }
311        }
312    }
313}
314
315/// An encrypted connection to peers through an Sbd server.
316pub struct SbdClientCrypto {
317    pub_key: PubKey,
318    inner: Arc<tokio::sync::Mutex<Option<Inner>>>,
319}
320
321impl SbdClientCrypto {
322    /// Establish a new connection.
323    pub async fn new(
324        url: &str,
325        config: Arc<Config>,
326    ) -> Result<(Self, MsgRecv)> {
327        let client_config = sbd_client::SbdClientConfig {
328            allow_plain_text: config.allow_plain_text,
329            ..Default::default()
330        };
331        let crypto = sodoken_crypto::SodokenCrypto::new()?;
332        use sbd_client::Crypto;
333        let pub_key = PubKey(Arc::new(*crypto.pub_key()));
334        let (client, recv) =
335            sbd_client::SbdClient::connect_config(url, &crypto, client_config)
336                .await?;
337        let inner = Arc::new(tokio::sync::Mutex::new(Some(Inner {
338            config,
339            crypto,
340            client,
341            map: HashMap::default(),
342        })));
343        let weak_inner = Arc::downgrade(&inner);
344        Ok((
345            Self { pub_key, inner },
346            MsgRecv {
347                inner: weak_inner,
348                recv,
349            },
350        ))
351    }
352
353    /// Get the public key of this node.
354    pub fn pub_key(&self) -> &PubKey {
355        &self.pub_key
356    }
357
358    /// Assert that we are connected to a peer without sending any data.
359    pub async fn assert(&self, pk: &PubKey) -> Result<()> {
360        let mut lock = self.inner.lock().await;
361        if let Some(inner) = &mut *lock {
362            inner.assert(pk).await
363        } else {
364            Err(Error::other("closed"))
365        }
366    }
367
368    /// Send a message to a peer.
369    pub async fn send(&self, pk: &PubKey, msg: &[u8]) -> Result<()> {
370        const SBD_MAX: usize = 20_000;
371        const SBD_HDR: usize = 32;
372        const SS_ABYTES: usize = sodoken::secretstream::ABYTES;
373        const MAX_MSG: usize = SBD_MAX - SBD_HDR - SS_ABYTES;
374
375        if msg.len() > MAX_MSG {
376            return Err(Error::other("message too long"));
377        }
378
379        let mut lock = self.inner.lock().await;
380        if let Some(inner) = &mut *lock {
381            inner.send(pk, msg).await
382        } else {
383            Err(Error::other("closed"))
384        }
385    }
386
387    /// Close a connection to a specific peer.
388    pub async fn close_peer(&self, pk: &PubKey) {
389        if let Some(inner) = self.inner.lock().await.as_mut() {
390            inner.close_peer(pk);
391        }
392    }
393
394    /// Close the entire sbd client connection.
395    pub async fn close(&self) {
396        close_inner(&mut *self.inner.lock().await).await;
397    }
398}
399
400#[cfg(test)]
401mod test;