1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4pub use tx5_core::Tx5InitConfig;
26
27use std::collections::HashMap;
28use std::io::{Error, ErrorKind, Result};
29use std::sync::{Arc, Mutex, Weak};
30
31pub use tx5_signal;
32use tx5_signal::PubKey;
33
34struct AbortTask<R>(tokio::task::JoinHandle<R>);
35
36impl<R> Drop for AbortTask<R> {
37 fn drop(&mut self) {
38 self.0.abort();
39 }
40}
41
42struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
43
44impl<T: 'static + Send> CloseRecv<T> {
45 pub async fn recv(&mut self) -> Option<T> {
46 use futures::stream::StreamExt;
47 self.0.next().await
48 }
49}
50
51struct CloseSend<T: 'static + Send> {
52 sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
53 close_on_drop: bool,
54}
55
56impl<T: 'static + Send> Clone for CloseSend<T> {
57 fn clone(&self) -> Self {
58 Self {
59 sender: self.sender.clone(),
60 close_on_drop: false,
61 }
62 }
63}
64
65impl<T: 'static + Send> Drop for CloseSend<T> {
66 fn drop(&mut self) {
67 if self.close_on_drop {
68 let s = self.sender.lock().unwrap().take();
69 if let Some(mut s) = s {
70 s.close_channel();
71 }
72 }
73 }
74}
75
76impl<T: 'static + Send> CloseSend<T> {
77 pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
78 let (s, r) = futures::channel::mpsc::channel(size);
79 (
80 Self {
81 sender: Arc::new(Mutex::new(Some(s))),
82 close_on_drop: false,
83 },
84 CloseRecv(r),
85 )
86 }
87
88 pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
89 self.close_on_drop = close_on_drop;
90 }
91
92 pub fn send_or_close(&self, t: T) -> Result<()> {
93 let mut lock = self.sender.lock().unwrap();
94 if let Some(sender) = &mut *lock {
95 if sender.try_send(t).is_ok() {
96 Ok(())
97 } else {
98 sender.close_channel();
99 *lock = None;
100 Err(ErrorKind::BrokenPipe.into())
101 }
102 } else {
103 Err(ErrorKind::BrokenPipe.into())
104 }
105 }
106
107 pub async fn send(&self, t: T) -> Result<()> {
108 let res = tokio::time::timeout(
109 std::time::Duration::from_secs(60),
112 async {
113 let sender = self.sender.lock().unwrap().clone();
114 if let Some(mut sender) = sender {
115 use futures::sink::SinkExt;
116 if sender.send(t).await.is_ok() {
117 Result::Ok(())
118 } else {
119 Err(ErrorKind::BrokenPipe.into())
120 }
121 } else {
122 Err(ErrorKind::BrokenPipe.into())
123 }
124 },
125 )
126 .await;
127
128 match res {
129 Err(_) | Ok(Err(_)) => {
130 let mut lock = self.sender.lock().unwrap();
131 if let Some(sender) = &mut *lock {
132 sender.close_channel();
133 }
134 *lock = None;
135 Err(ErrorKind::BrokenPipe.into())
136 }
137 _ => Ok(()),
138 }
139 }
140}
141
142mod config;
143pub use config::*;
144
145mod webrtc;
146
147mod hub;
148pub use hub::*;
149
150mod conn;
151pub use conn::*;
152
153mod proto;
154
155mod framed;
156pub use framed::*;
157
158#[cfg(test)]
159mod test;