1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4pub use tx5_core::Tx5InitConfig;
28
29use std::collections::HashMap;
30use std::io::{Error, ErrorKind, Result};
31use std::sync::{Arc, Mutex, Weak};
32
33pub use tx5_signal;
34use tx5_signal::PubKey;
35
36struct AbortTask<R>(tokio::task::JoinHandle<R>);
37
38impl<R> Drop for AbortTask<R> {
39 fn drop(&mut self) {
40 self.0.abort();
41 }
42}
43
44struct CloseRecv<T: 'static + Send>(futures::channel::mpsc::Receiver<T>);
45
46impl<T: 'static + Send> CloseRecv<T> {
47 pub async fn recv(&mut self) -> Option<T> {
48 use futures::stream::StreamExt;
49 self.0.next().await
50 }
51}
52
53struct CloseSend<T: 'static + Send> {
54 sender: Arc<Mutex<Option<futures::channel::mpsc::Sender<T>>>>,
55 close_on_drop: bool,
56}
57
58impl<T: 'static + Send> Clone for CloseSend<T> {
59 fn clone(&self) -> Self {
60 Self {
61 sender: self.sender.clone(),
62 close_on_drop: false,
63 }
64 }
65}
66
67impl<T: 'static + Send> Drop for CloseSend<T> {
68 fn drop(&mut self) {
69 if self.close_on_drop {
70 let s = self.sender.lock().unwrap().take();
71 if let Some(mut s) = s {
72 s.close_channel();
73 }
74 }
75 }
76}
77
78impl<T: 'static + Send> CloseSend<T> {
79 pub fn sized_channel(size: usize) -> (Self, CloseRecv<T>) {
80 let (s, r) = futures::channel::mpsc::channel(size);
81 (
82 Self {
83 sender: Arc::new(Mutex::new(Some(s))),
84 close_on_drop: false,
85 },
86 CloseRecv(r),
87 )
88 }
89
90 pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
91 self.close_on_drop = close_on_drop;
92 }
93
94 pub fn send_or_close(&self, t: T) -> Result<()> {
95 let mut lock = self.sender.lock().unwrap();
96 if let Some(sender) = &mut *lock {
97 if sender.try_send(t).is_ok() {
98 Ok(())
99 } else {
100 sender.close_channel();
101 *lock = None;
102 Err(ErrorKind::BrokenPipe.into())
103 }
104 } else {
105 Err(ErrorKind::BrokenPipe.into())
106 }
107 }
108
109 pub async fn send(&self, t: T) -> Result<()> {
110 let res = tokio::time::timeout(
111 std::time::Duration::from_secs(60),
114 async {
115 let sender = self.sender.lock().unwrap().clone();
116 if let Some(mut sender) = sender {
117 use futures::sink::SinkExt;
118 if sender.send(t).await.is_ok() {
119 Result::Ok(())
120 } else {
121 Err(ErrorKind::BrokenPipe.into())
122 }
123 } else {
124 Err(ErrorKind::BrokenPipe.into())
125 }
126 },
127 )
128 .await;
129
130 match res {
131 Err(_) | Ok(Err(_)) => {
132 let mut lock = self.sender.lock().unwrap();
133 if let Some(sender) = &mut *lock {
134 sender.close_channel();
135 }
136 *lock = None;
137 Err(ErrorKind::BrokenPipe.into())
138 }
139 _ => Ok(()),
140 }
141 }
142}
143
144mod config;
145pub use config::*;
146
147mod webrtc;
148
149mod hub;
150pub use hub::*;
151
152mod conn;
153pub use conn::*;
154
155mod proto;
156
157mod framed;
158pub use framed::*;
159
160#[cfg(test)]
161mod test;