ya_runtime_sdk/
event.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3
4use futures::channel::mpsc;
5use futures::future::BoxFuture;
6use futures::{FutureExt, SinkExt, StreamExt};
7
8use ya_runtime_api::server::*;
9
10use crate::common::IntoVec;
11use crate::ProcessId;
12
13/// Runtime event kind
14#[derive(Clone, Debug)]
15pub enum EventKind {
16    Process(ProcessStatus),
17    Runtime(RuntimeStatus),
18}
19
20impl From<ProcessStatus> for EventKind {
21    fn from(status: ProcessStatus) -> Self {
22        Self::Process(status)
23    }
24}
25
26impl From<RuntimeStatus> for EventKind {
27    fn from(status: RuntimeStatus) -> Self {
28        Self::Runtime(status)
29    }
30}
31
32impl From<RuntimeStatusKind> for EventKind {
33    fn from(kind: RuntimeStatusKind) -> Self {
34        Self::Runtime(RuntimeStatus { kind: Some(kind) })
35    }
36}
37
38/// Runtime event emitter
39#[derive(Clone)]
40pub struct EventEmitter {
41    tx_process: mpsc::Sender<ProcessStatus>,
42    tx_runtime: mpsc::Sender<RuntimeStatus>,
43}
44
45impl EventEmitter {
46    pub fn spawn(emitter: impl RuntimeHandler + 'static) -> Self {
47        let (tx_p, rx_p) = mpsc::channel(1);
48        let (tx_r, rx_r) = mpsc::channel(1);
49        let e_p = Rc::new(RefCell::new(emitter));
50        let e_r = e_p.clone();
51
52        tokio::task::spawn_local(
53            rx_p.for_each(move |status| e_p.borrow().on_process_status(status)),
54        );
55        tokio::task::spawn_local(
56            rx_r.for_each(move |status| e_r.borrow().on_runtime_status(status)),
57        );
58
59        Self {
60            tx_process: tx_p,
61            tx_runtime: tx_r,
62        }
63    }
64}
65
66impl EventEmitter {
67    /// Emit a command started event
68    pub fn command_started(&mut self, process_id: ProcessId) -> BoxFuture<()> {
69        self.emit(ProcessStatus {
70            pid: process_id,
71            running: true,
72            return_code: 0,
73            stdout: Default::default(),
74            stderr: Default::default(),
75        })
76    }
77
78    /// Emit a command stopped event
79    pub fn command_stopped(&mut self, process_id: ProcessId, return_code: i32) -> BoxFuture<()> {
80        self.emit(ProcessStatus {
81            pid: process_id,
82            running: false,
83            return_code,
84            stdout: Default::default(),
85            stderr: Default::default(),
86        })
87    }
88
89    /// Emit a command output event (stdout)
90    pub fn command_stdout(
91        &mut self,
92        process_id: ProcessId,
93        stdout: impl IntoVec<u8>,
94    ) -> BoxFuture<()> {
95        self.emit(ProcessStatus {
96            pid: process_id,
97            running: true,
98            return_code: 0,
99            stdout: stdout.into_vec(),
100            stderr: Default::default(),
101        })
102    }
103
104    /// Emit a command output event (stderr)
105    pub fn command_stderr(
106        &mut self,
107        process_id: ProcessId,
108        stderr: impl IntoVec<u8>,
109    ) -> BoxFuture<()> {
110        self.emit(ProcessStatus {
111            pid: process_id,
112            running: true,
113            return_code: 0,
114            stdout: Default::default(),
115            stderr: stderr.into_vec(),
116        })
117    }
118
119    /// Emit a state event
120    pub fn state(&mut self, state: RuntimeState) -> BoxFuture<()> {
121        self.emit(RuntimeStatusKind::State(state))
122    }
123
124    /// Emit a counter event
125    pub fn counter(&mut self, counter: RuntimeCounter) -> BoxFuture<()> {
126        self.emit(RuntimeStatusKind::Counter(counter))
127    }
128
129    /// Emit an event
130    pub fn emit(&mut self, event: impl Into<EventKind>) -> BoxFuture<()> {
131        match event.into() {
132            EventKind::Process(status) => self.tx_process.send(status).then(|_| async {}).boxed(),
133            EventKind::Runtime(status) => self.tx_runtime.send(status).then(|_| async {}).boxed(),
134        }
135    }
136}