1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
//! # tx5-connection
//!
//! Holochain webrtc connection.
//! Starts by sending messages over the sbd signal server, if we can
//! upgrade to a proper webrtc p2p connection, we do so.
//!
//! # WebRTC Backend Features
//!
//! Tx5 can be backed currently by 1 of 2 backend webrtc libraries.
//!
//! - <b><i>`*`DEFAULT`*`</i></b> `backend-go-pion` - The pion webrtc library
//!   writen in go (golang).
//!   - [https://github.com/pion/webrtc](https://github.com/pion/webrtc)
//! - `backend-webrtc-rs` - The rust webrtc library.
//!   - [https://github.com/webrtc-rs/webrtc](https://github.com/webrtc-rs/webrtc)
//!
//! The go pion library is currently the default as it is more mature
//! and well tested, but comes with some overhead of calling into a different
//! memory/runtime. When the rust library is stable enough for holochain's
//! needs, we will switch the default. To switch now, or if you want to
//! make sure the backend doesn't change out from under you, set
//! no-default-features and explicitly enable the backend of your choice.

#[cfg(any(
    not(any(feature = "backend-go-pion", feature = "backend-webrtc-rs")),
    all(feature = "backend-go-pion", feature = "backend-webrtc-rs"),
))]
compile_error!("Must specify exactly 1 webrtc backend");

#[cfg(feature = "backend-go-pion")]
pub use tx5_go_pion::Tx5InitConfig;

use std::collections::HashMap;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::sync::{Arc, Mutex, Weak};

pub use tx5_signal;
use tx5_signal::PubKey;

struct AbortTask<R>(tokio::task::JoinHandle<R>);

impl<R> Drop for AbortTask<R> {
    fn drop(&mut self) {
        self.0.abort();
    }
}

struct CloseSend<T: 'static + Send> {
    sender: Arc<Mutex<Option<tokio::sync::mpsc::Sender<T>>>>,
    close_on_drop: bool,
}

impl<T: 'static + Send> Clone for CloseSend<T> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            close_on_drop: false,
        }
    }
}

impl<T: 'static + Send> Drop for CloseSend<T> {
    fn drop(&mut self) {
        if self.close_on_drop {
            self.sender.lock().unwrap().take();
        }
    }
}

impl<T: 'static + Send> CloseSend<T> {
    pub fn channel() -> (Self, tokio::sync::mpsc::Receiver<T>) {
        let (s, r) = tokio::sync::mpsc::channel(32);
        (
            Self {
                sender: Arc::new(Mutex::new(Some(s))),
                close_on_drop: false,
            },
            r,
        )
    }

    pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
        self.close_on_drop = close_on_drop;
    }

    pub fn send(
        &self,
        t: T,
    ) -> impl Future<Output = Result<()>> + 'static + Send {
        let s = self.sender.lock().unwrap().clone();
        async move {
            match s {
                Some(s) => {
                    s.send(t).await.map_err(|_| ErrorKind::BrokenPipe.into())
                }
                None => Err(ErrorKind::BrokenPipe.into()),
            }
        }
    }

    pub fn send_slow_app(
        &self,
        t: T,
    ) -> impl Future<Output = Result<()>> + 'static + Send {
        // Grace time to allow a slow app to catch up before we close a
        // connection to prevent our memory from filling up with backlogged
        // message data.
        const SLOW_APP_TO: std::time::Duration =
            std::time::Duration::from_millis(99);

        let s = self.sender.lock().unwrap().clone();
        async move {
            match s {
                Some(s) => match s.send_timeout(t, SLOW_APP_TO).await {
                    Err(
                        tokio::sync::mpsc::error::SendTimeoutError::Timeout(_),
                    ) => {
                        tracing::warn!("Closing connection due to slow app");
                        Err(ErrorKind::TimedOut.into())
                    }
                    Err(_) => Err(ErrorKind::BrokenPipe.into()),
                    Ok(_) => Ok(()),
                },
                None => Err(ErrorKind::BrokenPipe.into()),
            }
        }
    }
}

mod webrtc;

mod hub;
pub use hub::*;

mod conn;
pub use conn::*;

mod proto;

mod framed;
pub use framed::*;

#[cfg(test)]
mod test;