#![deny(missing_docs)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
#[cfg(any(
not(any(feature = "backend-go-pion", feature = "backend-webrtc-rs")),
all(feature = "backend-go-pion", feature = "backend-webrtc-rs"),
))]
compile_error!("Must specify exactly 1 webrtc backend");
pub mod deps {
pub use tx5_core;
pub use tx5_core::deps::*;
pub use tx5_signal;
pub use tx5_signal::deps::*;
}
pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5InitConfig, Tx5Url};
mod ep3;
pub use ep3::*;
pub(crate) mod back_buf;
pub(crate) use back_buf::*;
pub(crate) mod proto;
#[derive(Clone)]
struct AbortableTimedSharedFuture<T: Clone> {
f: futures::future::Shared<
futures::future::BoxFuture<'static, std::result::Result<T, Error>>,
>,
a: std::sync::Arc<
std::sync::Mutex<Option<tokio::sync::oneshot::Sender<Error>>>,
>,
}
impl<T: Clone> AbortableTimedSharedFuture<T> {
pub fn new<F>(
timeout: std::time::Duration,
timeout_err: Error,
f: F,
) -> Self
where
F: std::future::Future<Output = std::result::Result<T, Error>>
+ 'static
+ Send,
{
let (a, ar) = tokio::sync::oneshot::channel();
let a = std::sync::Arc::new(std::sync::Mutex::new(Some(a)));
Self {
f: futures::future::FutureExt::shared(
futures::future::FutureExt::boxed(async move {
tokio::time::timeout(
timeout,
async move {
tokio::select! {
r = async {
Err(ar.await.map_err(|_| Error::id("AbortHandleDropped"))?)
} => r,
r = f => r,
}
},
)
.await
.map_err(|_| timeout_err)?
}),
),
a,
}
}
pub fn abort(&self, err: Error) {
let a = self.a.lock().unwrap().take();
if let Some(a) = a {
let _ = a.send(err);
}
}
}
impl<T: Clone> std::future::Future for AbortableTimedSharedFuture<T> {
type Output = std::result::Result<T, Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.f).poll(cx)
}
}
#[cfg(test)]
mod test_behavior;
#[cfg(test)]
mod test {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn atsf_traits() {
fn check<F>(_f: F)
where
F: Send + Sync + Unpin,
{
}
let a = AbortableTimedSharedFuture::new(
std::time::Duration::from_millis(10),
Error::str("my timeout err").into(),
async move { Ok(()) },
);
check(a);
}
#[tokio::test(flavor = "multi_thread")]
async fn atsf_happy() {
AbortableTimedSharedFuture::new(
std::time::Duration::from_secs(1),
Error::id("to").into(),
async move { Ok(()) },
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn atsf_timeout() {
let r = AbortableTimedSharedFuture::new(
std::time::Duration::from_millis(1),
Error::id("to").into(),
async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
)
.await;
assert_eq!("to", r.unwrap_err().to_string());
}
#[tokio::test(flavor = "multi_thread")]
async fn atsf_abort() {
let a = AbortableTimedSharedFuture::new(
std::time::Duration::from_secs(1),
Error::id("to").into(),
async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
},
);
{
let a = a.clone();
tokio::task::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
a.abort(Error::id("abort").into());
});
}
let r = a.await;
assert_eq!("abort", r.unwrap_err().to_string());
}
}