1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4pub use tx5_core::Tx5InitConfig;
28
29macro_rules! breakable_timeout {
30 ($($t:tt)*) => {
31 tokio::time::timeout(
32 ::tx5_core::Tx5InitConfig::get().slow_app_timeout,
33 async {
34 loop {
35 {$($t)*}
36 break;
37 }
38 std::io::Result::Ok(())
39 }
40 ).await
41 };
42}
43
44use std::collections::HashMap;
45use std::future::Future;
46use std::io::{Error, ErrorKind, Result};
47use std::sync::{Arc, Mutex, Weak};
48
49pub use tx5_signal;
50use tx5_signal::PubKey;
51
52struct AbortTask<R>(tokio::task::JoinHandle<R>);
53
54impl<R> Drop for AbortTask<R> {
55 fn drop(&mut self) {
56 self.0.abort();
57 }
58}
59
60struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
61
62impl<T: 'static + Send> CloseRecv<T> {
63 pub async fn recv(&mut self) -> Option<T> {
64 use futures::stream::StreamExt;
65 self.0.next().await
66 }
67}
68
69struct CloseSend<T: 'static + Send> {
70 sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
71 close_on_drop: bool,
72}
73
74impl<T: 'static + Send> Clone for CloseSend<T> {
75 fn clone(&self) -> Self {
76 Self {
77 sender: self.sender.clone(),
78 close_on_drop: false,
79 }
80 }
81}
82
83impl<T: 'static + Send> Drop for CloseSend<T> {
84 fn drop(&mut self) {
85 if self.close_on_drop {
86 let s = self.sender.lock().unwrap().take();
87 if let Some(mut s) = s {
88 s.close_channel();
89 }
90 }
91 }
92}
93
94impl<T: 'static + Send> CloseSend<T> {
95 pub fn channel() -> (Self, CloseRecv<T>) {
96 Self::sized_channel(32)
97 }
98
99 pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
100 let (s, r) = futures::channel::mpsc::channel(size);
101 (
102 Self {
103 sender: Arc::new(Mutex::new(Some(s))),
104 close_on_drop: false,
105 },
106 CloseRecv(r),
107 )
108 }
109
110 pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
111 self.close_on_drop = close_on_drop;
112 }
113
114 #[allow(dead_code)] pub fn send_or_close(&self, t: T) {
116 let mut lock = self.sender.lock().unwrap();
117 if let Some(sender) = &mut *lock {
118 if sender.try_send(t).is_err() {
119 sender.close_channel();
120 *lock = None;
121 }
122 }
123 }
124
125 pub fn send(
126 &self,
127 t: T,
128 ) -> impl Future<Output = Result<()>> + 'static + Send {
129 use futures::sink::SinkExt;
130 let s = self.sender.lock().unwrap().clone();
131 async move {
132 match s {
133 Some(mut s) => {
134 s.send(t).await.map_err(|_| ErrorKind::BrokenPipe.into())
135 }
136 None => Err(ErrorKind::BrokenPipe.into()),
137 }
138 }
139 }
140
141 #[allow(dead_code)] pub fn send_slow_app(
143 &self,
144 t: T,
145 ) -> impl Future<Output = Result<()>> + 'static + Send {
146 use futures::sink::SinkExt;
147
148 let s = self.sender.lock().unwrap().clone();
149 async move {
150 match s {
151 Some(mut s) => {
152 match tokio::time::timeout(
153 tx5_core::Tx5InitConfig::get().slow_app_timeout,
154 s.send(t),
155 )
156 .await
157 {
158 Err(_) => {
159 tracing::warn!(
160 "Closing connection due to slow app"
161 );
162 Err(ErrorKind::TimedOut.into())
163 }
164 Ok(Err(_)) => Err(ErrorKind::BrokenPipe.into()),
165 Ok(Ok(_)) => Ok(()),
166 }
167 }
168 None => Err(ErrorKind::BrokenPipe.into()),
169 }
170 }
171 }
172}
173
174mod config;
175pub use config::*;
176
177mod webrtc;
178
179mod hub;
180pub use hub::*;
181
182mod conn;
183pub use conn::*;
184
185mod proto;
186
187mod framed;
188pub use framed::*;
189
190#[cfg(test)]
191mod test;