tx5_connection/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4//! # tx5-connection
5//!
6//! Holochain webrtc connection.
7//! Starts by sending messages over the sbd signal server, if we can
8//! upgrade to a proper webrtc p2p connection, we do so.
9//!
10//! # WebRTC Backend Features
11//!
12//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
13//!
14//! - <b><i>`*`DEFAULT`*`</i></b> `backend-libdatachannel` - WebRTC library
15//!   written in C++.
16//!   - [https://github.com/paullouisageneau/libdatachannel](https://github.com/paullouisageneau/libdatachannel)
17//! - `backend-go-pion` - The pion webrtc library
18//!   written in Go (golang).
19//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
20//!
21//! The go pion library was the original implementation, but as libdatachannel
22//! has reached stability, we have switched it over to be the default as
23//! it is much easier to write rust FFI bindings to C++ code than Go code.
24
25pub use tx5_core::Tx5InitConfig;
26
27use std::collections::HashMap;
28use std::io::{Error, ErrorKind, Result};
29use std::sync::{Arc, Mutex, Weak};
30
31pub use tx5_signal;
32use tx5_signal::PubKey;
33
34struct AbortTask<R>(tokio::task::JoinHandle<R>);
35
36impl<R> Drop for AbortTask<R> {
37    fn drop(&mut self) {
38        self.0.abort();
39    }
40}
41
42struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
43
44impl<T: 'static + Send> CloseRecv<T> {
45    pub async fn recv(&mut self) -> Option<T> {
46        use futures::stream::StreamExt;
47        self.0.next().await
48    }
49}
50
51struct CloseSend<T: 'static + Send> {
52    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
53    close_on_drop: bool,
54}
55
56impl<T: 'static + Send> Clone for CloseSend<T> {
57    fn clone(&self) -> Self {
58        Self {
59            sender: self.sender.clone(),
60            close_on_drop: false,
61        }
62    }
63}
64
65impl<T: 'static + Send> Drop for CloseSend<T> {
66    fn drop(&mut self) {
67        if self.close_on_drop {
68            let s = self.sender.lock().unwrap().take();
69            if let Some(mut s) = s {
70                s.close_channel();
71            }
72        }
73    }
74}
75
76impl<T: 'static + Send> CloseSend<T> {
77    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
78        let (s, r) = futures::channel::mpsc::channel(size);
79        (
80            Self {
81                sender: Arc::new(Mutex::new(Some(s))),
82                close_on_drop: false,
83            },
84            CloseRecv(r),
85        )
86    }
87
88    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
89        self.close_on_drop = close_on_drop;
90    }
91
92    pub fn send_or_close(&self, t: T) -> Result<()> {
93        let mut lock = self.sender.lock().unwrap();
94        if let Some(sender) = &mut *lock {
95            if sender.try_send(t).is_ok() {
96                Ok(())
97            } else {
98                tracing::warn!("Failed to send message, closing channel");
99                sender.close_channel();
100                *lock = None;
101                Err(ErrorKind::BrokenPipe.into())
102            }
103        } else {
104            Err(ErrorKind::BrokenPipe.into())
105        }
106    }
107
108    pub async fn send(&self, t: T) -> Result<()> {
109        let res = tokio::time::timeout(
110            // hard-coded to 1 minute for now. This indicates a system
111            // is very backed up, and is here just to prevent forever hangs
112            std::time::Duration::from_secs(60),
113            async {
114                let sender = self.sender.lock().unwrap().clone();
115                if let Some(mut sender) = sender {
116                    use futures::sink::SinkExt;
117                    if sender.send(t).await.is_ok() {
118                        Result::Ok(())
119                    } else {
120                        Err(ErrorKind::BrokenPipe.into())
121                    }
122                } else {
123                    Err(ErrorKind::BrokenPipe.into())
124                }
125            },
126        )
127        .await;
128
129        match res {
130            Err(_) | Ok(Err(_)) => {
131                let mut lock = self.sender.lock().unwrap();
132                if let Some(sender) = &mut *lock {
133                    tracing::warn!(
134                        ?res,
135                        "Failed to send message, closing channel"
136                    );
137                    sender.close_channel();
138                }
139                *lock = None;
140                Err(ErrorKind::BrokenPipe.into())
141            }
142            _ => Ok(()),
143        }
144    }
145}
146
147mod config;
148pub use config::*;
149
150mod webrtc;
151
152mod hub;
153pub use hub::*;
154
155mod conn;
156pub use conn::*;
157
158mod proto;
159
160mod framed;
161pub use framed::*;
162
163#[cfg(test)]
164mod test;