1#![deny(missing_docs)]
3
4use std::collections::HashMap;
13use std::io::{Error, Result};
14use std::sync::{Arc, Mutex, Weak};
15
16pub use sbd_client::PubKey;
17
18mod sodoken_crypto;
19pub use sodoken_crypto::*;
20
21pub struct Config {
23 pub listener: bool,
27
28 pub allow_plain_text: bool,
30
31 pub cooldown: std::time::Duration,
33
34 pub max_connections: usize,
36
37 pub max_idle: std::time::Duration,
39}
40
41impl Default for Config {
42 fn default() -> Self {
43 Self {
44 listener: false,
45 allow_plain_text: false,
46 cooldown: std::time::Duration::from_secs(10),
47 max_connections: 4096,
48 max_idle: std::time::Duration::from_secs(10),
49 }
50 }
51}
52
53enum Conn {
54 Cooldown(tokio::time::Instant),
55 Active {
56 last_active: tokio::time::Instant,
57 enc: sodoken_crypto::Encryptor,
58 dec: sodoken_crypto::Decryptor,
59 },
60}
61
62struct Inner {
63 config: Arc<Config>,
64 crypto: sodoken_crypto::SodokenCrypto,
65 client: sbd_client::SbdClient,
66 map: HashMap<PubKey, Conn>,
67}
68
69fn do_close_peer(pk: &PubKey, conn: &mut Conn, cooldown: std::time::Duration) {
70 tracing::debug!(
71 target: "NETAUDIT",
72 pub_key = ?pk,
73 cooldown_s = cooldown.as_secs_f64(),
74 m = "sbd-e2e-crypto-client",
75 a = "close_peer",
76 );
77 *conn = Conn::Cooldown(tokio::time::Instant::now() + cooldown);
78}
79
80impl Inner {
81 pub async fn close(&mut self) {
82 self.client.close().await;
83 }
84
85 pub fn close_peer(&mut self, pk: &PubKey) {
86 if let Some(conn) = self.map.get_mut(pk) {
87 do_close_peer(pk, conn, self.config.cooldown);
88 }
89 }
90
91 pub async fn assert(&mut self, pk: &PubKey) -> Result<()> {
92 let Self {
93 config,
94 crypto,
95 client,
96 map,
97 } = self;
98
99 let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?;
100
101 match conn {
102 Conn::Cooldown(_) => {
103 Err(Error::other("connection still cooling down"))
104 }
105 Conn::Active { .. } => {
106 if let Err(err) = async {
107 if let Some(hdr) = hdr {
108 client.send(pk, &hdr).await
109 } else {
110 Ok(())
111 }
112 }
113 .await
114 {
115 do_close_peer(pk, conn, config.cooldown);
116 Err(err)
117 } else {
118 Ok(())
119 }
120 }
121 }
122 }
123
124 pub async fn recv(
125 &mut self,
126 msg: sbd_client::Msg,
127 ) -> Result<Option<(PubKey, Vec<u8>)>> {
128 let Self {
129 config,
130 crypto,
131 client,
132 map,
133 } = self;
134
135 let pk = msg.pub_key();
136
137 match Self::priv_assert_con(&pk, config, crypto, map, config.listener) {
138 Err(_) => Ok(None),
139 Ok((conn, hdr)) => {
140 if let Some(hdr) = hdr {
141 client.send(&pk, &hdr).await?;
142 }
143
144 match conn {
145 Conn::Cooldown(_) => Ok(None),
146 Conn::Active {
147 last_active, dec, ..
148 } => {
149 *last_active = tokio::time::Instant::now();
150
151 match dec.decrypt(msg.message()) {
152 Err(_) => {
153 do_close_peer(&pk, conn, config.cooldown);
154 Ok(None)
155 }
156 Ok(None) => Ok(None),
157 Ok(Some(msg)) => Ok(Some((pk, msg))),
158 }
159 }
160 }
161 }
162 }
163 }
164
165 pub async fn send(&mut self, pk: &PubKey, msg: &[u8]) -> Result<()> {
166 let Self {
167 config,
168 crypto,
169 client,
170 map,
171 } = self;
172
173 let (conn, hdr) = Self::priv_assert_con(pk, config, crypto, map, true)?;
174
175 match conn {
176 Conn::Cooldown(_) => {
177 Err(Error::other("connection still cooling down"))
178 }
179 Conn::Active { enc, .. } => {
180 if let Err(err) = async {
181 if let Some(hdr) = hdr {
182 client.send(pk, &hdr).await?;
183 }
184 let msg = enc.encrypt(msg)?;
185 client.send(pk, &msg).await
186 }
187 .await
188 {
189 do_close_peer(pk, conn, config.cooldown);
190 Err(err)
191 } else {
192 Ok(())
193 }
194 }
195 }
196 }
197
198 fn prune(config: &Config, map: &mut HashMap<PubKey, Conn>) {
199 let now = tokio::time::Instant::now();
200
201 map.retain(|pk, c| {
202 if let Conn::Active { last_active, .. } = c {
203 if now - *last_active > config.max_idle {
204 do_close_peer(pk, c, config.cooldown);
205 }
206 }
207
208 if let Conn::Cooldown(at) = c {
209 now < *at
210 } else {
211 true
212 }
213 })
214 }
215
216 fn priv_assert_con<'a>(
217 pk: &PubKey,
218 config: &Config,
219 crypto: &sodoken_crypto::SodokenCrypto,
220 map: &'a mut HashMap<PubKey, Conn>,
221 do_create: bool,
222 ) -> Result<(&'a mut Conn, Option<[u8; 24]>)> {
223 use std::collections::hash_map::Entry;
224
225 Self::prune(config, map);
233
234 let len = map.len();
235
236 match map.entry(pk.clone()) {
237 Entry::Occupied(e) => Ok((e.into_mut(), None)),
238 Entry::Vacant(e) => {
239 if !do_create {
240 return Err(Error::other("ignore"));
241 }
242 if len >= config.max_connections {
243 tracing::debug!(
244 target: "NETAUDIT",
245 pub_key = ?pk,
246 m = "sbd-e2e-crypto-client",
247 "cannot open: too many connections",
248 );
249 return Err(Error::other("too many connections"));
250 }
251 tracing::debug!(
252 target: "NETAUDIT",
253 pub_key = ?pk,
254 m = "sbd-e2e-crypto-client",
255 a = "open_peer",
256 );
257 let (enc, hdr, dec) = crypto.new_enc(pk)?;
258 Ok((
259 e.insert(Conn::Active {
260 last_active: tokio::time::Instant::now(),
261 enc,
262 dec,
263 }),
264 Some(hdr),
265 ))
266 }
267 }
268 }
269}
270
271async fn close_inner(inner: &mut Option<Inner>) {
272 if let Some(mut inner) = inner.take() {
273 inner.close().await;
274 }
275}
276
277pub struct MsgRecv {
279 inner: Weak<tokio::sync::Mutex<Option<Inner>>>,
280 recv: sbd_client::MsgRecv,
281}
282
283impl MsgRecv {
284 pub async fn recv(&mut self) -> Option<(PubKey, Vec<u8>)> {
286 loop {
287 let raw_msg = match self.recv.recv().await {
288 None => return None,
289 Some(raw_msg) => raw_msg,
290 };
291
292 if let Some(inner) = self.inner.upgrade() {
293 let mut lock = inner.lock().await;
294
295 if let Some(inner) = &mut *lock {
296 match inner.recv(raw_msg).await {
297 Err(_) => (),
298 Ok(None) => continue,
299 Ok(Some(o)) => return Some(o),
300 }
301 } else {
302 return None;
303 }
304
305 close_inner(&mut lock).await;
308 } else {
309 return None;
310 }
311 }
312 }
313}
314
315pub struct SbdClientCrypto {
317 pub_key: PubKey,
318 inner: Arc<tokio::sync::Mutex<Option<Inner>>>,
319}
320
321impl SbdClientCrypto {
322 pub async fn new(
324 url: &str,
325 config: Arc<Config>,
326 ) -> Result<(Self, MsgRecv)> {
327 let client_config = sbd_client::SbdClientConfig {
328 allow_plain_text: config.allow_plain_text,
329 ..Default::default()
330 };
331 let crypto = sodoken_crypto::SodokenCrypto::new()?;
332 use sbd_client::Crypto;
333 let pub_key = PubKey(Arc::new(*crypto.pub_key()));
334 let (client, recv) =
335 sbd_client::SbdClient::connect_config(url, &crypto, client_config)
336 .await?;
337 let inner = Arc::new(tokio::sync::Mutex::new(Some(Inner {
338 config,
339 crypto,
340 client,
341 map: HashMap::default(),
342 })));
343 let weak_inner = Arc::downgrade(&inner);
344 Ok((
345 Self { pub_key, inner },
346 MsgRecv {
347 inner: weak_inner,
348 recv,
349 },
350 ))
351 }
352
353 pub fn pub_key(&self) -> &PubKey {
355 &self.pub_key
356 }
357
358 pub async fn assert(&self, pk: &PubKey) -> Result<()> {
360 let mut lock = self.inner.lock().await;
361 if let Some(inner) = &mut *lock {
362 inner.assert(pk).await
363 } else {
364 Err(Error::other("closed"))
365 }
366 }
367
368 pub async fn send(&self, pk: &PubKey, msg: &[u8]) -> Result<()> {
370 const SBD_MAX: usize = 20_000;
371 const SBD_HDR: usize = 32;
372 const SS_ABYTES: usize = sodoken::secretstream::ABYTES;
373 const MAX_MSG: usize = SBD_MAX - SBD_HDR - SS_ABYTES;
374
375 if msg.len() > MAX_MSG {
376 return Err(Error::other("message too long"));
377 }
378
379 let mut lock = self.inner.lock().await;
380 if let Some(inner) = &mut *lock {
381 inner.send(pk, msg).await
382 } else {
383 Err(Error::other("closed"))
384 }
385 }
386
387 pub async fn close_peer(&self, pk: &PubKey) {
389 if let Some(inner) = self.inner.lock().await.as_mut() {
390 inner.close_peer(pk);
391 }
392 }
393
394 pub async fn close(&self) {
396 close_inner(&mut *self.inner.lock().await).await;
397 }
398}
399
400#[cfg(test)]
401mod test;