tx5_connection/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4//! # tx5-connection
5//!
6//! Holochain webrtc connection.
7//! Starts by sending messages over the sbd signal server, if we can
8//! upgrade to a proper webrtc p2p connection, we do so.
9//!
10//! # WebRTC Backend Features
11//!
12//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
13//!
14//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
15//!   writen in go (golang).
16//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
17//! - `backend-webrtc-rs` - The rust webrtc library.
18//!   - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
19//!
20//! The go pion library is currently the default as it is more mature
21//! and well tested, but comes with some overhead of calling into a different
22//! memory/runtime. When the rust library is stable enough for holochain's
23//! needs, we will switch the default. To switch now, or if you want to
24//! make sure the backend doesn't change out from under you, set
25//! no-default-features and explicitly enable the backend of your choice.
26
27pub use tx5_core::Tx5InitConfig;
28
29use std::collections::HashMap;
30use std::io::{Error, ErrorKind, Result};
31use std::sync::{Arc, Mutex, Weak};
32
33pub use tx5_signal;
34use tx5_signal::PubKey;
35
36struct AbortTask<R>(tokio::task::JoinHandle<R>);
37
38impl<R> Drop for AbortTask<R> {
39    fn drop(&mut self) {
40        self.0.abort();
41    }
42}
43
44struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
45
46impl<T: 'static + Send> CloseRecv<T> {
47    pub async fn recv(&mut self) -> Option<T> {
48        use futures::stream::StreamExt;
49        self.0.next().await
50    }
51}
52
53struct CloseSend<T: 'static + Send> {
54    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
55    close_on_drop: bool,
56}
57
58impl<T: 'static + Send> Clone for CloseSend<T> {
59    fn clone(&self) -> Self {
60        Self {
61            sender: self.sender.clone(),
62            close_on_drop: false,
63        }
64    }
65}
66
67impl<T: 'static + Send> Drop for CloseSend<T> {
68    fn drop(&mut self) {
69        if self.close_on_drop {
70            let s = self.sender.lock().unwrap().take();
71            if let Some(mut s) = s {
72                s.close_channel();
73            }
74        }
75    }
76}
77
78impl<T: 'static + Send> CloseSend<T> {
79    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
80        let (s, r) = futures::channel::mpsc::channel(size);
81        (
82            Self {
83                sender: Arc::new(Mutex::new(Some(s))),
84                close_on_drop: false,
85            },
86            CloseRecv(r),
87        )
88    }
89
90    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
91        self.close_on_drop = close_on_drop;
92    }
93
94    pub fn send_or_close(&self, t: T) -> Result<()> {
95        let mut lock = self.sender.lock().unwrap();
96        if let Some(sender) = &mut *lock {
97            if sender.try_send(t).is_ok() {
98                Ok(())
99            } else {
100                sender.close_channel();
101                *lock = None;
102                Err(ErrorKind::BrokenPipe.into())
103            }
104        } else {
105            Err(ErrorKind::BrokenPipe.into())
106        }
107    }
108
109    pub async fn send(&self, t: T) -> Result<()> {
110        let res = tokio::time::timeout(
111            // hard-coded to 1 minute for now. This indicates a system
112            // is very backed up, and is here just to prevent forever hangs
113            std::time::Duration::from_secs(60),
114            async {
115                let sender = self.sender.lock().unwrap().clone();
116                if let Some(mut sender) = sender {
117                    use futures::sink::SinkExt;
118                    if sender.send(t).await.is_ok() {
119                        Result::Ok(())
120                    } else {
121                        Err(ErrorKind::BrokenPipe.into())
122                    }
123                } else {
124                    Err(ErrorKind::BrokenPipe.into())
125                }
126            },
127        )
128        .await;
129
130        match res {
131            Err(_) | Ok(Err(_)) => {
132                let mut lock = self.sender.lock().unwrap();
133                if let Some(sender) = &mut *lock {
134                    sender.close_channel();
135                }
136                *lock = None;
137                Err(ErrorKind::BrokenPipe.into())
138            }
139            _ => Ok(()),
140        }
141    }
142}
143
144mod config;
145pub use config::*;
146
147mod webrtc;
148
149mod hub;
150pub use hub::*;
151
152mod conn;
153pub use conn::*;
154
155mod proto;
156
157mod framed;
158pub use framed::*;
159
160#[cfg(test)]
161mod test;