1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
pub use crate::wait::NotifyReady;
use crate::{wait::Wait, Error, Result};
use std::fmt;
#[must_use = "Confirmation should be used or you can miss errors"]
pub struct Confirmation<T, I = ()> {
kind: ConfirmationKind<T, I>,
}
impl<T, I> Confirmation<T, I> {
pub(crate) fn new(wait: Wait<T>) -> Self {
Self {
kind: ConfirmationKind::Wait(wait),
}
}
pub(crate) fn new_error(error: Error) -> Self {
let (wait, wait_handle) = Wait::new();
wait_handle.error(error);
Self::new(wait)
}
pub(crate) fn into_error(self) -> Result<()> {
self.try_wait().transpose().map(|_| ())
}
pub fn subscribe(&self, task: Box<dyn NotifyReady + Send>) {
match &self.kind {
ConfirmationKind::Wait(wait) => wait.subscribe(task),
ConfirmationKind::Map(wait, _) => wait.subscribe(task),
}
}
pub fn has_subscriber(&self) -> bool {
match &self.kind {
ConfirmationKind::Wait(wait) => wait.has_subscriber(),
ConfirmationKind::Map(wait, _) => wait.has_subscriber(),
}
}
pub fn try_wait(&self) -> Option<Result<T>> {
match &self.kind {
ConfirmationKind::Wait(wait) => wait.try_wait(),
ConfirmationKind::Map(wait, f) => wait.try_wait().map(|res| res.map(f)),
}
}
pub fn wait(self) -> Result<T> {
match self.kind {
ConfirmationKind::Wait(wait) => wait.wait(),
ConfirmationKind::Map(wait, f) => wait.wait().map(f),
}
}
}
impl<T> Confirmation<T> {
pub(crate) fn map<M>(self, f: Box<dyn Fn(T) -> M + Send + 'static>) -> Confirmation<M, T> {
Confirmation {
kind: ConfirmationKind::Map(Box::new(self), f),
}
}
}
enum ConfirmationKind<T, I> {
Wait(Wait<T>),
Map(Box<Confirmation<I>>, Box<dyn Fn(I) -> T + Send + 'static>),
}
impl<T> From<Result<Wait<T>>> for Confirmation<T> {
fn from(res: Result<Wait<T>>) -> Self {
match res {
Ok(wait) => Confirmation::new(wait),
Err(err) => Confirmation::new_error(err),
}
}
}
impl<T> fmt::Debug for Confirmation<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Confirmation")
}
}
#[cfg(feature = "futures")]
pub(crate) mod futures {
use super::*;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
};
impl<T, I> Future for Confirmation<T, I> {
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.has_subscriber() {
self.subscribe(Box::new(Watcher(cx.waker().clone())));
}
self.try_wait().map(Poll::Ready).unwrap_or(Poll::Pending)
}
}
pub(crate) struct Watcher(pub(crate) Waker);
impl NotifyReady for Watcher {
fn notify(&self) {
self.0.wake_by_ref();
}
}
}