1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::result::Result;
use crate::sendable::Sendable;
use futures::{select, FutureExt};
use js_sys::Function;
use serde::Serialize;
use serde_wasm_bindgen::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use wasm_bindgen::prelude::*;
use workflow_core::channel::{DuplexChannel, Multiplexer};
use workflow_core::task::*;
use workflow_log::log_error;
#[wasm_bindgen]
pub struct MultiplexerClient {
callback: Arc<Mutex<Option<Sendable<Function>>>>,
task_running: AtomicBool,
task_ctl: DuplexChannel,
}
impl Default for MultiplexerClient {
fn default() -> Self {
MultiplexerClient::new()
}
}
impl MultiplexerClient {
pub async fn start_notification_task<T>(&self, multiplexer: &Arc<Multiplexer<T>>) -> Result<()>
where
T: Clone + Serialize + Send + Sync + 'static,
{
if self.task_running.load(Ordering::SeqCst) {
panic!("ReflectorClient task is already running");
}
let ctl_receiver = self.task_ctl.request.receiver.clone();
let ctl_sender = self.task_ctl.response.sender.clone();
let callback = self.callback.clone();
self.task_running.store(true, Ordering::SeqCst);
let (channel_id, _, receiver) = multiplexer.register_event_channel();
let multiplexer = multiplexer.clone();
spawn(async move {
loop {
select! {
_ = ctl_receiver.recv().fuse() => {
break;
},
msg = receiver.recv().fuse() => {
if let Ok(notification) = &msg {
if let Some(callback) = callback.lock().unwrap().as_ref() {
if let Ok(event) = to_value(notification) {
if let Err(err) = callback.0.call1(&JsValue::undefined(), &event) {
log_error!("Error while executing notification callback: {:?}", err);
}
}
}
}
}
}
}
multiplexer.unregister_event_channel(channel_id);
ctl_sender.send(()).await.ok();
});
Ok(())
}
}
#[wasm_bindgen]
impl MultiplexerClient {
#[wasm_bindgen(constructor)]
pub fn new() -> MultiplexerClient {
MultiplexerClient {
callback: Arc::new(Mutex::new(None)),
task_running: AtomicBool::new(false),
task_ctl: DuplexChannel::oneshot(),
}
}
#[wasm_bindgen(js_name = "setHandler")]
pub fn set_handler(&self, callback: JsValue) -> Result<()> {
if callback.is_function() {
let fn_callback: Function = callback.into();
self.callback.lock().unwrap().replace(fn_callback.into());
} else {
self.remove_handler()?;
}
Ok(())
}
#[wasm_bindgen(js_name = "removeHandler")]
pub fn remove_handler(&self) -> Result<()> {
*self.callback.lock().unwrap() = None;
Ok(())
}
#[wasm_bindgen(js_name = "stop")]
pub async fn stop_notification_task(&self) -> Result<()> {
if self.task_running.load(Ordering::SeqCst) {
self.task_running.store(false, Ordering::SeqCst);
self.task_ctl
.signal(())
.await
.map_err(|err| JsValue::from_str(&err.to_string()))?;
}
Ok(())
}
}