1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::{fmt, future::Future};
use async_channel::Receiver;
#[derive(Debug, PartialEq, Eq)]
pub struct InterruptError { }
impl InterruptError {
pub fn new() -> Self {
Self { }
}
}
impl std::error::Error for InterruptError { }
impl fmt::Display for InterruptError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Async fiber interrupted.")
}
}
pub async fn interruptible_straight<T, E: From<InterruptError>>(
rx: Receiver<()>,
f: impl Future<Output=Result<T, E>>
) -> Result<T, E>
{
tokio::select!{
r = f => r,
_ = async {
let _ = rx.recv().await;
} => Err(InterruptError::new().into()),
}
}
pub async fn interruptible<T, E: From<InterruptError>>(
rx: Receiver<()>,
f: impl Future<Output=Result<T, E>> + Unpin
) -> Result<T, E>
{
interruptible_straight(rx, f).await
}
pub async fn interruptible_sendable<T, E: From<InterruptError>>(
rx: Receiver<()>,
f: impl Future<Output=Result<T, E>> + Send + Unpin
) -> Result<T, E>
{
interruptible_straight(rx, f).await
}
#[cfg(test)]
mod tests {
use std::future::Future;
use async_channel::bounded;
use futures::executor::block_on;
use crate::{InterruptError, interruptible, interruptible_sendable};
#[derive(Debug, PartialEq, Eq)]
struct AnotherError { }
impl AnotherError {
pub fn new() -> Self {
return Self { }
}
}
#[derive(Debug, PartialEq, Eq)]
enum MyError {
Interrupted(InterruptError),
Another(AnotherError)
}
impl From<InterruptError> for MyError {
fn from(value: InterruptError) -> Self {
Self::Interrupted(value)
}
}
impl From<AnotherError> for MyError {
fn from(value: AnotherError) -> Self {
Self::Another(value)
}
}
struct Test {
}
impl Test {
pub fn new() -> Self {
Self {
}
}
pub async fn g(self) -> Result<u8, MyError> {
let (_tx, rx) = bounded(1);
interruptible(rx, Box::pin(async move {
Ok(123)
})).await
}
pub async fn h(self) -> Result<u8, MyError> {
let (_tx, rx) = bounded(1);
interruptible(rx, Box::pin(async move {
Err(AnotherError::new().into())
})).await
}
}
#[test]
fn interrupted() {
let test = Test::new();
block_on(async {
assert_eq!(test.g().await, Ok(123));
});
let test = Test::new();
block_on(async {
assert_eq!(test.h().await, Err(AnotherError::new().into()));
});
}
#[test]
fn check_interruptible_sendable() {
let (_tx, rx) = bounded(1);
let _: &(dyn Future<Output = Result<i32, InterruptError>> + Send) = &interruptible_sendable(rx, Box::pin(async move {
Ok(123)
}));
}
}