1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use typemap_rev::TypeMap;
#[derive(Clone)]
pub struct Context {
pub emitter: StreamEmitter,
pub data: Arc<RwLock<TypeMap>>,
stop_sender: Arc<Mutex<Option<Sender<()>>>>,
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
}
impl Context {
pub(crate) fn new(
emitter: StreamEmitter,
data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>,
) -> Self {
Self {
emitter,
reply_listeners: Arc::new(Mutex::new(HashMap::new())),
data,
stop_sender: Arc::new(Mutex::new(stop_sender)),
}
}
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx);
}
let event = tx.await?;
Ok(event)
}
pub async fn stop(self) -> Result<()> {
let mut sender = self.stop_sender.lock().await;
if let Some(sender) = mem::take(&mut *sender) {
sender.send(()).map_err(|_| Error::SendError)?;
}
Ok(())
}
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id)
}
}