1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4pub use tx5_core::Tx5InitConfig;
26
27use std::collections::HashMap;
28use std::io::{Error, ErrorKind, Result};
29use std::sync::{Arc, Mutex, Weak};
30
31pub use tx5_signal;
32use tx5_signal::PubKey;
33
34struct AbortTask<R>(tokio::task::JoinHandle<R>);
35
36impl<R> Drop for AbortTask<R> {
37 fn drop(&mut self) {
38 self.0.abort();
39 }
40}
41
42struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
43
44impl<T: 'static + Send> CloseRecv<T> {
45 pub async fn recv(&mut self) -> Option<T> {
46 use futures::stream::StreamExt;
47 self.0.next().await
48 }
49}
50
51struct CloseSend<T: 'static + Send> {
52 sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
53 close_on_drop: bool,
54}
55
56impl<T: 'static + Send> Clone for CloseSend<T> {
57 fn clone(&self) -> Self {
58 Self {
59 sender: self.sender.clone(),
60 close_on_drop: false,
61 }
62 }
63}
64
65impl<T: 'static + Send> Drop for CloseSend<T> {
66 fn drop(&mut self) {
67 if self.close_on_drop {
68 let s = self.sender.lock().unwrap().take();
69 if let Some(mut s) = s {
70 s.close_channel();
71 }
72 }
73 }
74}
75
76impl<T: 'static + Send> CloseSend<T> {
77 pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
78 let (s, r) = futures::channel::mpsc::channel(size);
79 (
80 Self {
81 sender: Arc::new(Mutex::new(Some(s))),
82 close_on_drop: false,
83 },
84 CloseRecv(r),
85 )
86 }
87
88 pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
89 self.close_on_drop = close_on_drop;
90 }
91
92 pub fn send_or_close(&self, t: T) -> Result<()> {
93 let mut lock = self.sender.lock().unwrap();
94 if let Some(sender) = &mut *lock {
95 if sender.try_send(t).is_ok() {
96 Ok(())
97 } else {
98 tracing::warn!("Failed to send message, closing channel");
99 sender.close_channel();
100 *lock = None;
101 Err(ErrorKind::BrokenPipe.into())
102 }
103 } else {
104 Err(ErrorKind::BrokenPipe.into())
105 }
106 }
107
108 pub async fn send(&self, t: T) -> Result<()> {
109 let res = tokio::time::timeout(
110 std::time::Duration::from_secs(60),
113 async {
114 let sender = self.sender.lock().unwrap().clone();
115 if let Some(mut sender) = sender {
116 use futures::sink::SinkExt;
117 if sender.send(t).await.is_ok() {
118 Result::Ok(())
119 } else {
120 Err(ErrorKind::BrokenPipe.into())
121 }
122 } else {
123 Err(ErrorKind::BrokenPipe.into())
124 }
125 },
126 )
127 .await;
128
129 match res {
130 Err(_) | Ok(Err(_)) => {
131 let mut lock = self.sender.lock().unwrap();
132 if let Some(sender) = &mut *lock {
133 tracing::warn!(
134 ?res,
135 "Failed to send message, closing channel"
136 );
137 sender.close_channel();
138 }
139 *lock = None;
140 Err(ErrorKind::BrokenPipe.into())
141 }
142 _ => Ok(()),
143 }
144 }
145}
146
147mod config;
148pub use config::*;
149
150mod webrtc;
151
152mod hub;
153pub use hub::*;
154
155mod conn;
156pub use conn::*;
157
158mod proto;
159
160mod framed;
161pub use framed::*;
162
163#[cfg(test)]
164mod test;