tx5_connection/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
//! # tx5-connection
//!
//! Holochain webrtc connection.
//! Starts by sending messages over the sbd signal server, if we can
//! upgrade to a proper webrtc p2p connection, we do so.
//!
//! # WebRTC Backend Features
//!
//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
//!
//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
//! writen in go (golang).
//! - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
//! - `backend-webrtc-rs` - The rust webrtc library.
//! - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
//!
//! The go pion library is currently the default as it is more mature
//! and well tested, but comes with some overhead of calling into a different
//! memory/runtime. When the rust library is stable enough for holochain's
//! needs, we will switch the default. To switch now, or if you want to
//! make sure the backend doesn't change out from under you, set
//! no-default-features and explicitly enable the backend of your choice.
pub use tx5_core::Tx5InitConfig;
use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::sync::{Arc, Mutex, Weak};
pub use tx5_signal;
use tx5_signal::PubKey;
struct AbortTask<R>(tokio::task::JoinHandle<R>);
impl<R> Drop for AbortTask<R> {
fn drop(&mut self) {
self.0.abort();
}
}
struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
impl<T: 'static + Send> CloseRecv<T> {
pub async fn recv(&mut self) -> Option<T> {
use futures::stream::StreamExt;
self.0.next().await
}
}
struct CloseSend<T: 'static + Send> {
sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
close_on_drop: bool,
}
impl<T: 'static + Send> Clone for CloseSend<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
close_on_drop: false,
}
}
}
impl<T: 'static + Send> Drop for CloseSend<T> {
fn drop(&mut self) {
if self.close_on_drop {
let s = self.sender.lock().unwrap().take();
if let Some(mut s) = s {
s.close_channel();
}
}
}
}
impl<T: 'static + Send> CloseSend<T> {
pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
let (s, r) = futures::channel::mpsc::channel(size);
(
Self {
sender: Arc::new(Mutex::new(Some(s))),
close_on_drop: false,
},
CloseRecv(r),
)
}
pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
self.close_on_drop = close_on_drop;
}
pub fn send_or_close(&self, t: T) -> Result<()> {
let mut lock = self.sender.lock().unwrap();
if let Some(sender) = &mut *lock {
if sender.try_send(t).is_ok() {
Ok(())
} else {
sender.close_channel();
*lock = None;
Err(ErrorKind::BrokenPipe.into())
}
} else {
Err(ErrorKind::BrokenPipe.into())
}
}
pub async fn send(&self, t: T) -> Result<()> {
let res = tokio::time::timeout(
// hard-coded to 1 minute for now. This indicates a system
// is very backed up, and is here just to prevent forever hangs
std::time::Duration::from_secs(60),
async {
let sender = self.sender.lock().unwrap().clone();
if let Some(mut sender) = sender {
use futures::sink::SinkExt;
if sender.send(t).await.is_ok() {
Result::Ok(())
} else {
Err(ErrorKind::BrokenPipe.into())
}
} else {
Err(ErrorKind::BrokenPipe.into())
}
},
)
.await;
match res {
Err(_) | Ok(Err(_)) => {
let mut lock = self.sender.lock().unwrap();
if let Some(sender) = &mut *lock {
sender.close_channel();
}
*lock = None;
Err(ErrorKind::BrokenPipe.into())
}
_ => Ok(()),
}
}
}
mod config;
pub use config::*;
mod webrtc;
mod hub;
pub use hub::*;
mod conn;
pub use conn::*;
mod proto;
mod framed;
pub use framed::*;
#[cfg(test)]
mod test;