decthings_api/client/
event.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Weak},
4};
5
6use super::{
7    rpc::{
8        debug::DebugEvent, language::LanguageEvent, spawned::SpawnedEvent, terminal::TerminalEvent,
9    },
10    StateModification,
11};
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone)]
15pub enum DecthingsEvent {
16    Debug(DebugEvent),
17    Language(LanguageEvent),
18    Spawned(SpawnedEvent),
19    Terminal(TerminalEvent),
20    /// This event will be emitted when the Websocket connection to Decthings closes unexpectedly.
21    /// This means that all subscriptions are cancelled. To solve this, call subscribe for the
22    /// corresponding API again (no need to call on_event again). Note that you may miss some
23    /// events while the subscription is inactive.
24    SubscriptionsRemoved,
25}
26
27impl DecthingsEvent {
28    pub(super) fn deserialize(
29        api: &[u8],
30        data: &[u8],
31        mut blobs: Vec<bytes::Bytes>,
32    ) -> Result<(DecthingsEvent, StateModification), ()> {
33        match api {
34            b"Debug" => {
35                let mut deserialized: DebugEvent = serde_json::from_slice(data).map_err(|_| ())?;
36                let state_modification = match &mut deserialized {
37                    DebugEvent::Exit {
38                        debug_session_id,
39                        reason: _,
40                    } => StateModification {
41                        add_events: vec![],
42                        remove_events: vec![debug_session_id.to_owned()],
43                    },
44                    DebugEvent::Stdout {
45                        debug_session_id: _,
46                        data,
47                    } => {
48                        if blobs.is_empty() {
49                            return Err(());
50                        }
51                        *data = blobs.remove(0);
52                        StateModification::empty()
53                    }
54                    DebugEvent::Stderr {
55                        debug_session_id: _,
56                        data,
57                    } => {
58                        if blobs.is_empty() {
59                            return Err(());
60                        }
61                        *data = blobs.remove(0);
62                        StateModification::empty()
63                    }
64                    DebugEvent::Initialized {
65                        debug_session_id: _,
66                    } => StateModification::empty(),
67                    DebugEvent::RemoteInspectorData {
68                        debug_session_id: _,
69                        data,
70                    } => {
71                        if blobs.is_empty() {
72                            return Err(());
73                        }
74                        *data = blobs.remove(0);
75                        StateModification::empty()
76                    }
77                };
78                Ok((DecthingsEvent::Debug(deserialized), state_modification))
79            }
80            b"Language" => {
81                let mut deserialized: LanguageEvent =
82                    serde_json::from_slice(data).map_err(|_| ())?;
83                let state_modification = match &mut deserialized {
84                    LanguageEvent::Exit {
85                        language_server_id,
86                        reason: _,
87                    } => StateModification {
88                        add_events: vec![],
89                        remove_events: vec![language_server_id.to_owned()],
90                    },
91                    LanguageEvent::Data {
92                        language_server_id: _,
93                        data,
94                    } => {
95                        if blobs.is_empty() {
96                            return Err(());
97                        }
98                        *data = blobs.remove(0);
99                        StateModification::empty()
100                    }
101                };
102                Ok((DecthingsEvent::Language(deserialized), state_modification))
103            }
104            b"Spawned" => {
105                let mut deserialized: SpawnedEvent =
106                    serde_json::from_slice(data).map_err(|_| ())?;
107                let state_modification = match &mut deserialized {
108                    SpawnedEvent::Exit {
109                        spawned_command_id,
110                        reason: _,
111                    } => StateModification {
112                        add_events: vec![],
113                        remove_events: vec![spawned_command_id.to_owned()],
114                    },
115                    SpawnedEvent::Stdout {
116                        spawned_command_id: _,
117                        data,
118                    } => {
119                        if blobs.is_empty() {
120                            return Err(());
121                        }
122                        *data = blobs.remove(0);
123                        StateModification::empty()
124                    }
125                    SpawnedEvent::Stderr {
126                        spawned_command_id: _,
127                        data,
128                    } => {
129                        if blobs.is_empty() {
130                            return Err(());
131                        }
132                        *data = blobs.remove(0);
133                        StateModification::empty()
134                    }
135                };
136                Ok((DecthingsEvent::Spawned(deserialized), state_modification))
137            }
138            b"Terminal" => {
139                let mut deserialized: TerminalEvent =
140                    serde_json::from_slice(data).map_err(|_| ())?;
141                let state_modification = match &mut deserialized {
142                    TerminalEvent::Exit {
143                        terminal_session_id,
144                        reason: _,
145                    } => StateModification {
146                        add_events: vec![],
147                        remove_events: vec![terminal_session_id.to_owned()],
148                    },
149                    TerminalEvent::Data {
150                        terminal_session_id: _,
151                        data,
152                    } => {
153                        if blobs.is_empty() {
154                            return Err(());
155                        }
156                        *data = blobs.remove(0);
157                        StateModification::empty()
158                    }
159                };
160                Ok((DecthingsEvent::Terminal(deserialized), state_modification))
161            }
162            _ => Err(()),
163        }
164    }
165}
166
167type DecthingsClientEventListener = Box<dyn Fn(&DecthingsEvent) + Send + Sync>;
168
169pub(super) struct EventListeners {
170    listeners: Mutex<HashMap<u64, DecthingsClientEventListener>>,
171}
172
173impl EventListeners {
174    pub fn new() -> Self {
175        Self {
176            listeners: Mutex::new(HashMap::new()),
177        }
178    }
179
180    pub async fn add(
181        self: &Arc<Self>,
182        ev: impl Fn(&DecthingsEvent) + Send + Sync + 'static,
183    ) -> EventListenerDisposer {
184        let mut lock = self.listeners.lock().await;
185        let mut id = 0;
186        while lock.contains_key(&id) {
187            id += 1;
188        }
189        lock.insert(id, Box::new(ev));
190        drop(lock);
191        EventListenerDisposer {
192            event_listeners: Arc::downgrade(self),
193            id,
194        }
195    }
196
197    pub async fn call(&self, ev: &DecthingsEvent) {
198        let locked = self.listeners.lock().await;
199        for listener in locked.values() {
200            listener(ev);
201        }
202    }
203}
204
205pub struct EventListenerDisposer {
206    event_listeners: Weak<EventListeners>,
207    id: u64,
208}
209
210impl EventListenerDisposer {
211    pub async fn dispose(self) {
212        if let Some(event_listeners) = self.event_listeners.upgrade() {
213            let mut lock = event_listeners.listeners.lock().await;
214            lock.remove(&self.id);
215        }
216    }
217}