tx5_connection/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
//! # tx5-connection
//!
//! Holochain webrtc connection.
//! Starts by sending messages over the sbd signal server, if we can
//! upgrade to a proper webrtc p2p connection, we do so.
//!
//! # WebRTC Backend Features
//!
//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
//!
//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
//!   writen in go (golang).
//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
//! - `backend-webrtc-rs` - The rust webrtc library.
//!   - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
//!
//! The go pion library is currently the default as it is more mature
//! and well tested, but comes with some overhead of calling into a different
//! memory/runtime. When the rust library is stable enough for holochain's
//! needs, we will switch the default. To switch now, or if you want to
//! make sure the backend doesn't change out from under you, set
//! no-default-features and explicitly enable the backend of your choice.

pub use tx5_core::Tx5InitConfig;

use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::sync::{Arc, Mutex, Weak};

pub use tx5_signal;
use tx5_signal::PubKey;

struct AbortTask<R>(tokio::task::JoinHandle<R>);

impl<R> Drop for AbortTask<R> {
    fn drop(&mut self) {
        self.0.abort();
    }
}

struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);

impl<T: 'static + Send> CloseRecv<T> {
    pub async fn recv(&mut self) -> Option<T> {
        use futures::stream::StreamExt;
        self.0.next().await
    }
}

struct CloseSend<T: 'static + Send> {
    sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
    close_on_drop: bool,
}

impl<T: 'static + Send> Clone for CloseSend<T> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            close_on_drop: false,
        }
    }
}

impl<T: 'static + Send> Drop for CloseSend<T> {
    fn drop(&mut self) {
        if self.close_on_drop {
            let s = self.sender.lock().unwrap().take();
            if let Some(mut s) = s {
                s.close_channel();
            }
        }
    }
}

impl<T: 'static + Send> CloseSend<T> {
    pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
        let (s, r) = futures::channel::mpsc::channel(size);
        (
            Self {
                sender: Arc::new(Mutex::new(Some(s))),
                close_on_drop: false,
            },
            CloseRecv(r),
        )
    }

    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
        self.close_on_drop = close_on_drop;
    }

    pub fn send_or_close(&self, t: T) -> Result<()> {
        let mut lock = self.sender.lock().unwrap();
        if let Some(sender) = &mut *lock {
            if sender.try_send(t).is_ok() {
                Ok(())
            } else {
                sender.close_channel();
                *lock = None;
                Err(ErrorKind::BrokenPipe.into())
            }
        } else {
            Err(ErrorKind::BrokenPipe.into())
        }
    }

    pub async fn send(&self, t: T) -> Result<()> {
        let res = tokio::time::timeout(
            // hard-coded to 1 minute for now. This indicates a system
            // is very backed up, and is here just to prevent forever hangs
            std::time::Duration::from_secs(60),
            async {
                let sender = self.sender.lock().unwrap().clone();
                if let Some(mut sender) = sender {
                    use futures::sink::SinkExt;
                    if sender.send(t).await.is_ok() {
                        Result::Ok(())
                    } else {
                        Err(ErrorKind::BrokenPipe.into())
                    }
                } else {
                    Err(ErrorKind::BrokenPipe.into())
                }
            },
        )
        .await;

        match res {
            Err(_) | Ok(Err(_)) => {
                let mut lock = self.sender.lock().unwrap();
                if let Some(sender) = &mut *lock {
                    sender.close_channel();
                }
                *lock = None;
                Err(ErrorKind::BrokenPipe.into())
            }
            _ => Ok(()),
        }
    }
}

mod config;
pub use config::*;

mod webrtc;

mod hub;
pub use hub::*;

mod conn;
pub use conn::*;

mod proto;

mod framed;
pub use framed::*;

#[cfg(test)]
mod test;