lapin_async/
wait.rs

1use parking_lot::Mutex;
2
3use std::{
4  fmt,
5  sync::{
6    Arc,
7    mpsc::{SyncSender, Receiver, sync_channel},
8  },
9};
10
11use crate::error::Error;
12
13#[deprecated(note = "use lapin instead")]
14pub struct Wait<T> {
15  recv: Receiver<Result<T, Error>>,
16  send: SyncSender<Result<T, Error>>,
17  task: Arc<Mutex<Option<Box<dyn NotifyReady + Send>>>>,
18}
19
20#[derive(Clone)]
21#[deprecated(note = "use lapin instead")]
22pub struct WaitHandle<T> {
23  send: SyncSender<Result<T, Error>>,
24  task: Arc<Mutex<Option<Box<dyn NotifyReady + Send>>>>,
25}
26
27#[deprecated(note = "use lapin instead")]
28pub trait NotifyReady {
29  fn notify(&self);
30}
31
32impl<T> Wait<T> {
33  pub(crate) fn new() -> (Self, WaitHandle<T>) {
34    let (send, recv) = sync_channel(1);
35    let wait         = Self { recv, send, task: Arc::new(Mutex::new(None)) };
36    let wait_handle  = wait.handle();
37    (wait, wait_handle)
38  }
39
40  fn handle(&self) -> WaitHandle<T> {
41    WaitHandle { send: self.send.clone(), task: self.task.clone() }
42  }
43
44  pub(crate) fn try_wait(&self) -> Option<Result<T, Error>> {
45    self.recv.try_recv().ok()
46  }
47
48  pub(crate) fn wait(&self) -> Result<T, Error> {
49    self.recv.recv().unwrap()
50  }
51
52  pub(crate) fn subscribe(&self, task: Box<dyn NotifyReady + Send>) {
53    *self.task.lock() = Some(task);
54  }
55
56  pub(crate) fn has_subscriber(&self) -> bool {
57    self.task.lock().is_some()
58  }
59}
60
61impl<T> WaitHandle<T> {
62  pub(crate) fn finish(&self, val: T) {
63    let _ = self.send.send(Ok(val));
64    self.notify();
65  }
66
67  pub(crate) fn error(&self, error: Error) {
68    let _ = self.send.send(Err(error));
69    self.notify();
70  }
71
72  fn notify(&self) {
73    if let Some(task) = self.task.lock().take() {
74      task.notify();
75    }
76  }
77}
78
79impl<T> fmt::Debug for Wait<T> {
80  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81    write!(f, "Wait")
82  }
83}
84
85impl<T> fmt::Debug for WaitHandle<T> {
86  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87    write!(f, "WaitHandle")
88  }
89}