decthings_api/client/
event.rs1use std::{
2 collections::HashMap,
3 sync::{Arc, Weak},
4};
5
6use super::{
7 rpc::{
8 debug::DebugEvent, language::LanguageEvent, spawned::SpawnedEvent, terminal::TerminalEvent,
9 },
10 StateModification,
11};
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone)]
15pub enum DecthingsEvent {
16 Debug(DebugEvent),
17 Language(LanguageEvent),
18 Spawned(SpawnedEvent),
19 Terminal(TerminalEvent),
20 SubscriptionsRemoved,
25}
26
27impl DecthingsEvent {
28 pub(super) fn deserialize(
29 api: &[u8],
30 data: &[u8],
31 mut blobs: Vec<bytes::Bytes>,
32 ) -> Result<(DecthingsEvent, StateModification), ()> {
33 match api {
34 b"Debug" => {
35 let mut deserialized: DebugEvent = serde_json::from_slice(data).map_err(|_| ())?;
36 let state_modification = match &mut deserialized {
37 DebugEvent::Exit {
38 debug_session_id,
39 reason: _,
40 } => StateModification {
41 add_events: vec![],
42 remove_events: vec![debug_session_id.to_owned()],
43 },
44 DebugEvent::Stdout {
45 debug_session_id: _,
46 data,
47 } => {
48 if blobs.is_empty() {
49 return Err(());
50 }
51 *data = blobs.remove(0);
52 StateModification::empty()
53 }
54 DebugEvent::Stderr {
55 debug_session_id: _,
56 data,
57 } => {
58 if blobs.is_empty() {
59 return Err(());
60 }
61 *data = blobs.remove(0);
62 StateModification::empty()
63 }
64 DebugEvent::Initialized {
65 debug_session_id: _,
66 } => StateModification::empty(),
67 DebugEvent::RemoteInspectorData {
68 debug_session_id: _,
69 data,
70 } => {
71 if blobs.is_empty() {
72 return Err(());
73 }
74 *data = blobs.remove(0);
75 StateModification::empty()
76 }
77 };
78 Ok((DecthingsEvent::Debug(deserialized), state_modification))
79 }
80 b"Language" => {
81 let mut deserialized: LanguageEvent =
82 serde_json::from_slice(data).map_err(|_| ())?;
83 let state_modification = match &mut deserialized {
84 LanguageEvent::Exit {
85 language_server_id,
86 reason: _,
87 } => StateModification {
88 add_events: vec![],
89 remove_events: vec![language_server_id.to_owned()],
90 },
91 LanguageEvent::Data {
92 language_server_id: _,
93 data,
94 } => {
95 if blobs.is_empty() {
96 return Err(());
97 }
98 *data = blobs.remove(0);
99 StateModification::empty()
100 }
101 };
102 Ok((DecthingsEvent::Language(deserialized), state_modification))
103 }
104 b"Spawned" => {
105 let mut deserialized: SpawnedEvent =
106 serde_json::from_slice(data).map_err(|_| ())?;
107 let state_modification = match &mut deserialized {
108 SpawnedEvent::Exit {
109 spawned_command_id,
110 reason: _,
111 } => StateModification {
112 add_events: vec![],
113 remove_events: vec![spawned_command_id.to_owned()],
114 },
115 SpawnedEvent::Stdout {
116 spawned_command_id: _,
117 data,
118 } => {
119 if blobs.is_empty() {
120 return Err(());
121 }
122 *data = blobs.remove(0);
123 StateModification::empty()
124 }
125 SpawnedEvent::Stderr {
126 spawned_command_id: _,
127 data,
128 } => {
129 if blobs.is_empty() {
130 return Err(());
131 }
132 *data = blobs.remove(0);
133 StateModification::empty()
134 }
135 };
136 Ok((DecthingsEvent::Spawned(deserialized), state_modification))
137 }
138 b"Terminal" => {
139 let mut deserialized: TerminalEvent =
140 serde_json::from_slice(data).map_err(|_| ())?;
141 let state_modification = match &mut deserialized {
142 TerminalEvent::Exit {
143 terminal_session_id,
144 reason: _,
145 } => StateModification {
146 add_events: vec![],
147 remove_events: vec![terminal_session_id.to_owned()],
148 },
149 TerminalEvent::Data {
150 terminal_session_id: _,
151 data,
152 } => {
153 if blobs.is_empty() {
154 return Err(());
155 }
156 *data = blobs.remove(0);
157 StateModification::empty()
158 }
159 };
160 Ok((DecthingsEvent::Terminal(deserialized), state_modification))
161 }
162 _ => Err(()),
163 }
164 }
165}
166
167type DecthingsClientEventListener = Box<dyn Fn(&DecthingsEvent) + Send + Sync>;
168
169pub(super) struct EventListeners {
170 listeners: Mutex<HashMap<u64, DecthingsClientEventListener>>,
171}
172
173impl EventListeners {
174 pub fn new() -> Self {
175 Self {
176 listeners: Mutex::new(HashMap::new()),
177 }
178 }
179
180 pub async fn add(
181 self: &Arc<Self>,
182 ev: impl Fn(&DecthingsEvent) + Send + Sync + 'static,
183 ) -> EventListenerDisposer {
184 let mut lock = self.listeners.lock().await;
185 let mut id = 0;
186 while lock.contains_key(&id) {
187 id += 1;
188 }
189 lock.insert(id, Box::new(ev));
190 drop(lock);
191 EventListenerDisposer {
192 event_listeners: Arc::downgrade(self),
193 id,
194 }
195 }
196
197 pub async fn call(&self, ev: &DecthingsEvent) {
198 let locked = self.listeners.lock().await;
199 for listener in locked.values() {
200 listener(ev);
201 }
202 }
203}
204
205pub struct EventListenerDisposer {
206 event_listeners: Weak<EventListeners>,
207 id: u64,
208}
209
210impl EventListenerDisposer {
211 pub async fn dispose(self) {
212 if let Some(event_listeners) = self.event_listeners.upgrade() {
213 let mut lock = event_listeners.listeners.lock().await;
214 lock.remove(&self.id);
215 }
216 }
217}