tx5_connection/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4//! # tx5-connection
5//!
6//! Holochain webrtc connection.
7//! Starts by sending messages over the sbd signal server, if we can
8//! upgrade to a proper webrtc p2p connection, we do so.
9//!
10//! # WebRTC Backend Features
11//!
12//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
13//!
14//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
15//!   writen in go (golang).
16//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
17//! - `backend-webrtc-rs` - The rust webrtc library.
18//!   - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
19//!
20//! The go pion library is currently the default as it is more mature
21//! and well tested, but comes with some overhead of calling into a different
22//! memory/runtime. When the rust library is stable enough for holochain's
23//! needs, we will switch the default. To switch now, or if you want to
24//! make sure the backend doesn't change out from under you, set
25//! no-default-features and explicitly enable the backend of your choice.
26
27pub use tx5_core::Tx5InitConfig;
28
29macro_rules! breakable_timeout {
30    ($($t:tt)*) => {
31        tokio::time::timeout(
32            ::tx5_core::Tx5InitConfig::get().slow_app_timeout,
33            async {
34                loop {
35                    {$($t)*}
36                    break;
37                }
38                std::io::Result::Ok(())
39            }
40        ).await
41    };
42}
43
44use std::collections::HashMap;
45use std::future::Future;
46use std::io::{Error, ErrorKind, Result};
47use std::sync::{Arc, Mutex, Weak};
48
49pub use tx5_signal;
50use tx5_signal::PubKey;
51
52struct AbortTask<R>(tokio::task::JoinHandle<R>);
53
54impl<R> Drop for AbortTask<R> {
55    fn drop(&mut self) {
56        self.0.abort();
57    }
58}
59
60struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
61
62impl<T: 'static + Send> CloseRecv<T> {
63    pub async fn recv(&mut self) -> Option<T> {
64        use futures::stream::StreamExt;
65        self.0.next().await
66    }
67}
68
69struct CloseSend<T: 'static + Send> {
70    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
71    close_on_drop: bool,
72}
73
74impl<T: 'static + Send> Clone for CloseSend<T> {
75    fn clone(&self) -> Self {
76        Self {
77            sender: self.sender.clone(),
78            close_on_drop: false,
79        }
80    }
81}
82
83impl<T: 'static + Send> Drop for CloseSend<T> {
84    fn drop(&mut self) {
85        if self.close_on_drop {
86            let s = self.sender.lock().unwrap().take();
87            if let Some(mut s) = s {
88                s.close_channel();
89            }
90        }
91    }
92}
93
94impl<T: 'static + Send> CloseSend<T> {
95    pub fn channel() -> (Self, CloseRecv<T>) {
96        Self::sized_channel(32)
97    }
98
99    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
100        let (s, r) = futures::channel::mpsc::channel(size);
101        (
102            Self {
103                sender: Arc::new(Mutex::new(Some(s))),
104                close_on_drop: false,
105            },
106            CloseRecv(r),
107        )
108    }
109
110    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
111        self.close_on_drop = close_on_drop;
112    }
113
114    #[allow(dead_code)] // only used in libdatachannel backend
115    pub fn send_or_close(&self, t: T) {
116        let mut lock = self.sender.lock().unwrap();
117        if let Some(sender) = &mut *lock {
118            if sender.try_send(t).is_err() {
119                sender.close_channel();
120                *lock = None;
121            }
122        }
123    }
124
125    pub fn send(
126        &self,
127        t: T,
128    ) -> impl Future<Output = Result<()>> + 'static + Send {
129        use futures::sink::SinkExt;
130        let s = self.sender.lock().unwrap().clone();
131        async move {
132            match s {
133                Some(mut s) => {
134                    s.send(t).await.map_err(|_| ErrorKind::BrokenPipe.into())
135                }
136                None => Err(ErrorKind::BrokenPipe.into()),
137            }
138        }
139    }
140
141    #[allow(dead_code)] // only used in go_pion backend
142    pub fn send_slow_app(
143        &self,
144        t: T,
145    ) -> impl Future<Output = Result<()>> + 'static + Send {
146        use futures::sink::SinkExt;
147
148        let s = self.sender.lock().unwrap().clone();
149        async move {
150            match s {
151                Some(mut s) => {
152                    match tokio::time::timeout(
153                        tx5_core::Tx5InitConfig::get().slow_app_timeout,
154                        s.send(t),
155                    )
156                    .await
157                    {
158                        Err(_) => {
159                            tracing::warn!(
160                                "Closing connection due to slow app"
161                            );
162                            Err(ErrorKind::TimedOut.into())
163                        }
164                        Ok(Err(_)) => Err(ErrorKind::BrokenPipe.into()),
165                        Ok(Ok(_)) => Ok(()),
166                    }
167                }
168                None => Err(ErrorKind::BrokenPipe.into()),
169            }
170        }
171    }
172}
173
174mod config;
175pub use config::*;
176
177mod webrtc;
178
179mod hub;
180pub use hub::*;
181
182mod conn;
183pub use conn::*;
184
185mod proto;
186
187mod framed;
188pub use framed::*;
189
190#[cfg(test)]
191mod test;