tx5_connection/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
//! # tx5-connection
//!
//! Holochain webrtc connection.
//! Starts by sending messages over the sbd signal server, if we can
//! upgrade to a proper webrtc p2p connection, we do so.
//!
//! # WebRTC Backend Features
//!
//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
//!
//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
//!   writen in go (golang).
//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
//! - `backend-webrtc-rs` - The rust webrtc library.
//!   - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
//!
//! The go pion library is currently the default as it is more mature
//! and well tested, but comes with some overhead of calling into a different
//! memory/runtime. When the rust library is stable enough for holochain's
//! needs, we will switch the default. To switch now, or if you want to
//! make sure the backend doesn't change out from under you, set
//! no-default-features and explicitly enable the backend of your choice.

pub use tx5_core::Tx5InitConfig;

use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::sync::{Arc, Mutex, Weak};

pub use tx5_signal;
use tx5_signal::PubKey;

struct AbortTask<R>(tokio::task::JoinHandle<R>);

impl<R> Drop for AbortTask<R> {
    fn drop(&mut self) {
        self.0.abort();
    }
}

struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);

impl<T: 'static + Send> CloseRecv<T> {
    pub async fn recv(&mut self) -> Option<T> {
        use futures::stream::StreamExt;
        self.0.next().await
    }
}

struct CloseSend<T: 'static + Send> {
    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
    close_on_drop: bool,
}

impl<T: 'static + Send> Clone for CloseSend<T> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            close_on_drop: false,
        }
    }
}

impl<T: 'static + Send> Drop for CloseSend<T> {
    fn drop(&mut self) {
        if self.close_on_drop {
            let s = self.sender.lock().unwrap().take();
            if let Some(mut s) = s {
                s.close_channel();
            }
        }
    }
}

impl<T: 'static + Send> CloseSend<T> {
    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
        let (s, r) = futures::channel::mpsc::channel(size);
        (
            Self {
                sender: Arc::new(Mutex::new(Some(s))),
                close_on_drop: false,
            },
            CloseRecv(r),
        )
    }

    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
        self.close_on_drop = close_on_drop;
    }

    pub fn send_or_close(&self, t: T) -> Result<()> {
        let mut lock = self.sender.lock().unwrap();
        if let Some(sender) = &mut *lock {
            if sender.try_send(t).is_ok() {
                Ok(())
            } else {
                sender.close_channel();
                *lock = None;
                Err(ErrorKind::BrokenPipe.into())
            }
        } else {
            Err(ErrorKind::BrokenPipe.into())
        }
    }
}

mod config;
pub use config::*;

mod webrtc;

mod hub;
pub use hub::*;

mod conn;
pub use conn::*;

mod proto;

mod framed;
pub use framed::*;

#[cfg(test)]
mod test;