tx5_connection/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4//! # tx5-connection
5//!
6//! Holochain webrtc connection.
7//! Starts by sending messages over the sbd signal server, if we can
8//! upgrade to a proper webrtc p2p connection, we do so.
9//!
10//! # WebRTC Backend Features
11//!
12//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
13//!
14//! - <b><i>`*`DEFAULT`*`</i></b> `backend-libdatachannel` - WebRTC library
15//!   written in C++.
16//!   - [https://github.com/paullouisageneau/libdatachannel](https://github.com/paullouisageneau/libdatachannel)
17//! - `backend-go-pion` - The pion webrtc library
18//!   written in Go (golang).
19//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
20//!
21//! The go pion library was the original implementation, but as libdatachannel
22//! has reached stability, we have switched it over to be the default as
23//! it is much easier to write rust FFI bindings to C++ code than Go code.
24
25pub use tx5_core::Tx5InitConfig;
26
27use std::collections::HashMap;
28use std::io::{Error, ErrorKind, Result};
29use std::sync::{Arc, Mutex, Weak};
30
31pub use tx5_signal;
32use tx5_signal::PubKey;
33
34struct AbortTask<R>(tokio::task::JoinHandle<R>);
35
36impl<R> Drop for AbortTask<R> {
37    fn drop(&mut self) {
38        self.0.abort();
39    }
40}
41
42struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
43
44impl<T: 'static + Send> CloseRecv<T> {
45    pub async fn recv(&mut self) -> Option<T> {
46        use futures::stream::StreamExt;
47        self.0.next().await
48    }
49}
50
51struct CloseSend<T: 'static + Send> {
52    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
53    close_on_drop: bool,
54}
55
56impl<T: 'static + Send> Clone for CloseSend<T> {
57    fn clone(&self) -> Self {
58        Self {
59            sender: self.sender.clone(),
60            close_on_drop: false,
61        }
62    }
63}
64
65impl<T: 'static + Send> Drop for CloseSend<T> {
66    fn drop(&mut self) {
67        if self.close_on_drop {
68            let s = self.sender.lock().unwrap().take();
69            if let Some(mut s) = s {
70                s.close_channel();
71            }
72        }
73    }
74}
75
76impl<T: 'static + Send> CloseSend<T> {
77    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
78        let (s, r) = futures::channel::mpsc::channel(size);
79        (
80            Self {
81                sender: Arc::new(Mutex::new(Some(s))),
82                close_on_drop: false,
83            },
84            CloseRecv(r),
85        )
86    }
87
88    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
89        self.close_on_drop = close_on_drop;
90    }
91
92    pub fn send_or_close(&self, t: T) -> Result<()> {
93        let mut lock = self.sender.lock().unwrap();
94        if let Some(sender) = &mut *lock {
95            if sender.try_send(t).is_ok() {
96                Ok(())
97            } else {
98                sender.close_channel();
99                *lock = None;
100                Err(ErrorKind::BrokenPipe.into())
101            }
102        } else {
103            Err(ErrorKind::BrokenPipe.into())
104        }
105    }
106
107    pub async fn send(&self, t: T) -> Result<()> {
108        let res = tokio::time::timeout(
109            // hard-coded to 1 minute for now. This indicates a system
110            // is very backed up, and is here just to prevent forever hangs
111            std::time::Duration::from_secs(60),
112            async {
113                let sender = self.sender.lock().unwrap().clone();
114                if let Some(mut sender) = sender {
115                    use futures::sink::SinkExt;
116                    if sender.send(t).await.is_ok() {
117                        Result::Ok(())
118                    } else {
119                        Err(ErrorKind::BrokenPipe.into())
120                    }
121                } else {
122                    Err(ErrorKind::BrokenPipe.into())
123                }
124            },
125        )
126        .await;
127
128        match res {
129            Err(_) | Ok(Err(_)) => {
130                let mut lock = self.sender.lock().unwrap();
131                if let Some(sender) = &mut *lock {
132                    sender.close_channel();
133                }
134                *lock = None;
135                Err(ErrorKind::BrokenPipe.into())
136            }
137            _ => Ok(()),
138        }
139    }
140}
141
142mod config;
143pub use config::*;
144
145mod webrtc;
146
147mod hub;
148pub use hub::*;
149
150mod conn;
151pub use conn::*;
152
153mod proto;
154
155mod framed;
156pub use framed::*;
157
158#[cfg(test)]
159mod test;