use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::{future::Future, task::{Poll, Waker}};
use crate::Promise;
#[derive(Debug)]
pub struct Producer<T, E> {
promise: Arc<Mutex<Inner<T, E>>>,
}
#[derive(Clone)]
pub struct Consumer<T, E> {
promise: Arc<Mutex<Inner<T, E>>>,
}
#[derive(Debug)]
struct Inner<T, E> {
value: Option<Arc<Result<T, E>>>,
waker: Vec<Waker>, }
impl<T, E> Promise for Producer<T,E> {
type Output = T;
type Error = E;
type Waiter = Consumer<T,E>;
#[allow(dead_code)]
fn resolve(self, value: T) {
let mut promise = self.promise.lock().unwrap();
promise.value = Some(Arc::new(Ok(value)));
for waker in promise.waker.drain(..) {
waker.wake()
}
}
#[allow(dead_code)]
fn reject(self, err: E) {
let mut promise = self.promise.lock().unwrap();
promise.value = Some(Arc::new(Err(err)));
for waker in promise.waker.drain(..) {
waker.wake()
}
}
fn new() -> (Self, Self::Waiter) {
let producer = Self {
promise: Arc::new(Mutex::new(Inner {
value: None,
waker: vec![],
})),
};
let consumer = Consumer { promise: producer.promise.clone() };
(producer, consumer)
}
}
impl<T, E> Future for Consumer<T, E> {
type Output = Arc<Result<T, E>>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut promise = self.promise.lock().unwrap();
match promise.value {
Some(ref value) => Poll::Ready(value.clone()),
None => {
promise.waker.push(cx.waker().clone());
Poll::Pending
}
}
}
}
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use futures::executor::block_on;
#[allow(unused_imports)]
use std::thread;
use super::Producer;
use crate::Promise;
#[allow(unused_must_use)]
#[test]
fn test_promise_out_resolve() {
let (op, op_a) = Producer::<String, ()>::new();
let task1 = thread::spawn(move || {
block_on(async {
println!("我等到了{:?}", op_a.await);
})
});
let task2 = thread::spawn(move || {
block_on(async {
println!("我发送了了{:?}", op.resolve(String::from("🍓")));
})
});
task1.join().expect("The task1 thread has panicked");
task2.join().expect("The task2 thread has panicked");
}
#[allow(unused_must_use)]
#[test]
fn test_two_promises_out_resolve() {
let (op, op_a) = Producer::<String, ()>::new();
let op_b = op_a.clone();
let task1 = thread::spawn(move || {
block_on(async {
println!("我等到了{:?} task1", op_a.await);
})
});
let task2 = thread::spawn(move || {
block_on(async {
println!("我等到了{:?} task2", op_b.await);
})
});
let task3 = thread::spawn(move || {
block_on(async {
println!("我发送了了{:?} task3", op.resolve(String::from("🍓")));
})
});
task1.join().expect("The task1 thread has panicked");
task2.join().expect("The task2 thread has panicked");
task3.join().expect("The task3 thread has panicked");
}
#[test]
fn test_promise_out_reject() {
let (a, b) = Producer::<String, String>::new();
let task1 = thread::spawn(|| {
block_on(async {
println!("我等到了{:?}", b.await);
})
});
let task2 = thread::spawn(|| {
block_on(async {
println!("我发送了了{:?}", a.reject(String::from("reject!!")));
})
});
task1.join().expect("The task1 thread has panicked");
task2.join().expect("The task2 thread has panicked");
}
#[allow(unused_must_use)]
#[test]
fn test_promise_resolve_twice() {
let (a, _b) = Producer::<String, ()>::new();
a.resolve("hi".into());
}
}