1use std::cell::RefCell;
2use std::rc::Rc;
3
4use futures::channel::mpsc;
5use futures::future::BoxFuture;
6use futures::{FutureExt, SinkExt, StreamExt};
7
8use ya_runtime_api::server::*;
9
10use crate::common::IntoVec;
11use crate::ProcessId;
12
13#[derive(Clone, Debug)]
15pub enum EventKind {
16 Process(ProcessStatus),
17 Runtime(RuntimeStatus),
18}
19
20impl From<ProcessStatus> for EventKind {
21 fn from(status: ProcessStatus) -> Self {
22 Self::Process(status)
23 }
24}
25
26impl From<RuntimeStatus> for EventKind {
27 fn from(status: RuntimeStatus) -> Self {
28 Self::Runtime(status)
29 }
30}
31
32impl From<RuntimeStatusKind> for EventKind {
33 fn from(kind: RuntimeStatusKind) -> Self {
34 Self::Runtime(RuntimeStatus { kind: Some(kind) })
35 }
36}
37
38#[derive(Clone)]
40pub struct EventEmitter {
41 tx_process: mpsc::Sender<ProcessStatus>,
42 tx_runtime: mpsc::Sender<RuntimeStatus>,
43}
44
45impl EventEmitter {
46 pub fn spawn(emitter: impl RuntimeHandler + 'static) -> Self {
47 let (tx_p, rx_p) = mpsc::channel(1);
48 let (tx_r, rx_r) = mpsc::channel(1);
49 let e_p = Rc::new(RefCell::new(emitter));
50 let e_r = e_p.clone();
51
52 tokio::task::spawn_local(
53 rx_p.for_each(move |status| e_p.borrow().on_process_status(status)),
54 );
55 tokio::task::spawn_local(
56 rx_r.for_each(move |status| e_r.borrow().on_runtime_status(status)),
57 );
58
59 Self {
60 tx_process: tx_p,
61 tx_runtime: tx_r,
62 }
63 }
64}
65
66impl EventEmitter {
67 pub fn command_started(&mut self, process_id: ProcessId) -> BoxFuture<()> {
69 self.emit(ProcessStatus {
70 pid: process_id,
71 running: true,
72 return_code: 0,
73 stdout: Default::default(),
74 stderr: Default::default(),
75 })
76 }
77
78 pub fn command_stopped(&mut self, process_id: ProcessId, return_code: i32) -> BoxFuture<()> {
80 self.emit(ProcessStatus {
81 pid: process_id,
82 running: false,
83 return_code,
84 stdout: Default::default(),
85 stderr: Default::default(),
86 })
87 }
88
89 pub fn command_stdout(
91 &mut self,
92 process_id: ProcessId,
93 stdout: impl IntoVec<u8>,
94 ) -> BoxFuture<()> {
95 self.emit(ProcessStatus {
96 pid: process_id,
97 running: true,
98 return_code: 0,
99 stdout: stdout.into_vec(),
100 stderr: Default::default(),
101 })
102 }
103
104 pub fn command_stderr(
106 &mut self,
107 process_id: ProcessId,
108 stderr: impl IntoVec<u8>,
109 ) -> BoxFuture<()> {
110 self.emit(ProcessStatus {
111 pid: process_id,
112 running: true,
113 return_code: 0,
114 stdout: Default::default(),
115 stderr: stderr.into_vec(),
116 })
117 }
118
119 pub fn state(&mut self, state: RuntimeState) -> BoxFuture<()> {
121 self.emit(RuntimeStatusKind::State(state))
122 }
123
124 pub fn counter(&mut self, counter: RuntimeCounter) -> BoxFuture<()> {
126 self.emit(RuntimeStatusKind::Counter(counter))
127 }
128
129 pub fn emit(&mut self, event: impl Into<EventKind>) -> BoxFuture<()> {
131 match event.into() {
132 EventKind::Process(status) => self.tx_process.send(status).then(|_| async {}).boxed(),
133 EventKind::Runtime(status) => self.tx_runtime.send(status).then(|_| async {}).boxed(),
134 }
135 }
136}