wash_lib/
wait.rs

1use anyhow::{anyhow, bail, Result};
2use cloudevents::event::{AttributesReader, Event};
3use tokio::sync::mpsc::Receiver;
4use tokio::time::{Duration, Instant};
5
6use crate::component::ComponentScaledInfo;
7
8/// Useful parts of a `CloudEvent` coming in from the wasmbus.
9#[derive(Debug)]
10struct CloudEventData {
11    event_type: String,
12    source: String,
13    data: serde_json::Value,
14}
15
16/// Small helper to easily get a String value out of a JSON object.
17fn get_string_data_from_json(json: &serde_json::Value, key: &str) -> Result<String> {
18    Ok(json
19        .get(key)
20        .ok_or_else(|| anyhow!("No {} key found in json data", key))?
21        .as_str()
22        .ok_or_else(|| anyhow!("{} is not a string", key))?
23        .to_string())
24}
25
26/// Get the useful parts out of a wasmbus cloud event.
27fn get_wasmbus_event_info(event: Event) -> Result<CloudEventData> {
28    let data: serde_json::Value = event
29        .data()
30        .ok_or_else(|| anyhow!("No data in event"))?
31        .clone()
32        .try_into()?;
33
34    Ok(CloudEventData {
35        event_type: event.ty().to_string(),
36        source: event.source().to_string(),
37        data,
38    })
39}
40
41/// The potential outcomes of an event that has been found.
42/// It can either succeed or fail. This enum should only be returned if we found the applicable event.
43/// If we did not find the event or another error occurred, use the `Err` variant of a `Result` wrapping around this enum.
44pub enum FindEventOutcome<T> {
45    Success(T),
46    Failure(anyhow::Error),
47}
48
49/// The potential outcomes of a function check on an event.
50/// Because we can pass events that are not applicable to the event we are looking for, we need the `NotApplicable` variant to skip these events.
51pub enum EventCheckOutcome<T> {
52    Success(T),
53    Failure(anyhow::Error),
54    NotApplicable,
55}
56
57/// Uses the NATS receiver to read events being published to the wasmCloud lattice event subject, up until the given timeout duration.
58///
59/// Takes a `check_function`, which receives each event coming in from the receiver. This function must return a `Result<EventCheckOutcome>`.
60///
61/// If the applicable response event is found (either started or failed to start), the `Ok` variant of the `Result` will be returned,
62/// with the `FindEventOutcome` enum containing the success or failure state of the event.
63///
64/// If the timeout is reached or another error occurs, the `Err` variant of the `Result` will be returned.
65///
66/// You can use the generics in `EventCheckOutcome` and `FindEventOutcome` to return any data from the event out of your `check_function`.
67async fn find_event<T>(
68    receiver: &mut Receiver<Event>,
69    timeout: Duration,
70    check_function: impl Fn(Event) -> Result<EventCheckOutcome<T>>,
71) -> Result<FindEventOutcome<T>> {
72    let start = Instant::now();
73    loop {
74        let elapsed = start.elapsed();
75        if elapsed >= timeout {
76            bail!("Timeout waiting for event");
77        }
78
79        match tokio::time::timeout(timeout - elapsed, receiver.recv()).await {
80            Ok(Some(event)) => {
81                let outcome = check_function(event)?;
82
83                match outcome {
84                    EventCheckOutcome::Success(success_data) => {
85                        return Ok(FindEventOutcome::Success(success_data))
86                    }
87                    EventCheckOutcome::Failure(e) => return Ok(FindEventOutcome::Failure(e)),
88                    EventCheckOutcome::NotApplicable => continue,
89                }
90            }
91            Err(_e) => {
92                return Ok(FindEventOutcome::Failure(anyhow!(
93                    "Timed out waiting for applicable event, operation may have failed"
94                )))
95            }
96            // Should only happen due to an internal failure with the events receiver
97            Ok(None) => {
98                return Ok(FindEventOutcome::Failure(anyhow!(
99                    "Channel dropped before event was received, please report this at https://github.com/wasmCloud/wasmCloud/issues with details to reproduce"
100                )))
101            }
102
103        }
104    }
105}
106
107/// Uses the NATS receiver to read events being published to the wasmCloud lattice event subject, up until the given timeout duration.
108///
109/// If the applicable component start response event is found (either started or failed to start), the `Ok` variant of the `Result` will be returned,
110/// with the `FindEventOutcome` enum containing the success or failure state of the event.
111///
112/// If the timeout is reached or another error occurs, the `Err` variant of the `Result` will be returned.
113pub async fn wait_for_component_scaled_event(
114    receiver: &mut Receiver<Event>,
115    timeout: Duration,
116    host_id: impl AsRef<str>,
117    component_ref: impl AsRef<str>,
118) -> Result<FindEventOutcome<ComponentScaledInfo>> {
119    let host_id = host_id.as_ref();
120    let component_ref = component_ref.as_ref();
121    let check_function = move |event: Event| {
122        let cloud_event = get_wasmbus_event_info(event)?;
123
124        if cloud_event.source != host_id {
125            return Ok(EventCheckOutcome::NotApplicable);
126        }
127
128        match cloud_event.event_type.as_str() {
129            "com.wasmcloud.lattice.component_scaled" => {
130                let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
131
132                if image_ref == component_ref {
133                    let component_id =
134                        get_string_data_from_json(&cloud_event.data, "component_id")?;
135                    return Ok(EventCheckOutcome::Success(ComponentScaledInfo {
136                        host_id: host_id.into(),
137                        component_ref: component_ref.into(),
138                        component_id: component_id.as_str().into(),
139                    }));
140                }
141            }
142            "com.wasmcloud.lattice.component_scale_failed" => {
143                let returned_component_ref =
144                    get_string_data_from_json(&cloud_event.data, "image_ref")?;
145
146                if returned_component_ref == component_ref {
147                    let error = anyhow!(
148                        "{}",
149                        cloud_event
150                            .data
151                            .get("error")
152                            .ok_or_else(|| anyhow!("No error found in data"))?
153                            .as_str()
154                            .ok_or_else(|| anyhow!("error is not a string"))?
155                    );
156
157                    return Ok(EventCheckOutcome::Failure(error));
158                }
159            }
160            _ => {}
161        }
162
163        Ok(EventCheckOutcome::NotApplicable)
164    };
165
166    let event = find_event(receiver, timeout, check_function).await?;
167    Ok(event)
168}
169
170/// Information related to an provider start
171pub struct ProviderStartedInfo {
172    pub host_id: String,
173    pub provider_ref: String,
174    pub provider_id: String,
175}
176
177/// Uses the NATS receiver to read events being published to the wasmCloud lattice event subject, up until the given timeout duration.
178///
179/// If the applicable provider start response event is found (either started or failed to start), the `Ok` variant of the `Result` will be returned,
180/// with the `FindEventOutcome` enum containing the success or failure state of the event.
181///
182/// If the timeout is reached or another error occurs, the `Err` variant of the `Result` will be returned.
183pub async fn wait_for_provider_start_event(
184    receiver: &mut Receiver<Event>,
185    timeout: Duration,
186    host_id: String,
187    provider_ref: String,
188) -> Result<FindEventOutcome<ProviderStartedInfo>> {
189    let check_function = move |event: Event| {
190        let cloud_event = get_wasmbus_event_info(event)?;
191
192        if cloud_event.source != host_id.as_str() {
193            return Ok(EventCheckOutcome::NotApplicable);
194        }
195
196        match cloud_event.event_type.as_str() {
197            "com.wasmcloud.lattice.provider_started" => {
198                let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
199
200                if image_ref == provider_ref {
201                    let provider_id = get_string_data_from_json(&cloud_event.data, "provider_id")?;
202
203                    return Ok(EventCheckOutcome::Success(ProviderStartedInfo {
204                        host_id: host_id.as_str().into(),
205                        provider_ref: provider_ref.as_str().into(),
206                        provider_id,
207                    }));
208                }
209            }
210            "com.wasmcloud.lattice.provider_start_failed" => {
211                let returned_provider_ref =
212                    get_string_data_from_json(&cloud_event.data, "provider_ref")?;
213
214                if returned_provider_ref == provider_ref {
215                    let error = anyhow!(
216                        "{}",
217                        cloud_event
218                            .data
219                            .get("error")
220                            .ok_or_else(|| anyhow!("No error found in data"))?
221                            .as_str()
222                            .ok_or_else(|| anyhow!("error is not a string"))?
223                    );
224
225                    return Ok(EventCheckOutcome::Failure(error));
226                }
227            }
228            _ => {}
229        }
230
231        Ok(EventCheckOutcome::NotApplicable)
232    };
233
234    let event = find_event(receiver, timeout, check_function).await?;
235    Ok(event)
236}
237
238/// Information related to an provider stop
239pub struct ProviderStoppedInfo {
240    pub host_id: String,
241    pub provider_id: String,
242}
243
244/// Uses the NATS receiver to read events being published to the wasmCloud lattice event subject, up until the given timeout duration.
245///
246/// If the applicable provider stop response event is found (either stopped or failed to stop), the `Ok` variant of the `Result` will be returned,
247/// with the `FindEventOutcome` enum containing the success or failure state of the event.
248///
249/// If the timeout is reached or another error occurs, the `Err` variant of the `Result` will be returned.
250pub async fn wait_for_provider_stop_event(
251    receiver: &mut Receiver<Event>,
252    timeout: Duration,
253    host_id: String,
254    provider_id: String,
255) -> Result<FindEventOutcome<ProviderStoppedInfo>> {
256    let check_function = move |event: Event| {
257        let cloud_event = get_wasmbus_event_info(event)?;
258
259        if cloud_event.source != host_id.as_str() {
260            return Ok(EventCheckOutcome::NotApplicable);
261        }
262
263        match cloud_event.event_type.as_str() {
264            "com.wasmcloud.lattice.provider_stopped" => {
265                let returned_provider_id =
266                    get_string_data_from_json(&cloud_event.data, "provider_id")?;
267
268                if returned_provider_id == provider_id {
269                    return Ok(EventCheckOutcome::Success(ProviderStoppedInfo {
270                        host_id: host_id.as_str().into(),
271                        provider_id: returned_provider_id,
272                    }));
273                }
274            }
275            "com.wasmcloud.lattice.provider_stop_failed" => {
276                let returned_provider_id =
277                    get_string_data_from_json(&cloud_event.data, "public_key")?;
278
279                if returned_provider_id == provider_id {
280                    let error = anyhow!(
281                        "{}",
282                        cloud_event
283                            .data
284                            .get("error")
285                            .ok_or_else(|| anyhow!("No error found in data"))?
286                            .as_str()
287                            .ok_or_else(|| anyhow!("error is not a string"))?
288                    );
289
290                    return Ok(EventCheckOutcome::Failure(error));
291                }
292            }
293            _ => {}
294        }
295
296        Ok(EventCheckOutcome::NotApplicable)
297    };
298
299    let event = find_event(receiver, timeout, check_function).await?;
300    Ok(event)
301}
302
303/// Information related to an component stop
304pub struct ComponentStoppedInfo {
305    pub host_id: String,
306    pub component_id: String,
307}
308
309/// Uses the NATS receiver to read events being published to the wasmCloud lattice event subject, up until the given timeout duration.
310///
311/// If the applicable stop component response event is found (either started or failed to start), the `Ok` variant of the `Result` will be returned,
312/// with the `FindEventOutcome` enum containing the success or failure state of the event.
313///
314/// If the timeout is reached or another error occurs, the `Err` variant of the `Result` will be returned.
315pub async fn wait_for_component_stop_event(
316    receiver: &mut Receiver<Event>,
317    timeout: Duration,
318    host_id: String,
319    component_id: String,
320) -> Result<FindEventOutcome<ComponentStoppedInfo>> {
321    let check_function = move |event: Event| {
322        let cloud_event = get_wasmbus_event_info(event)?;
323
324        if cloud_event.source != host_id.as_str() {
325            return Ok(EventCheckOutcome::NotApplicable);
326        }
327
328        match cloud_event.event_type.as_str() {
329            "com.wasmcloud.lattice.component_scaled" => {
330                let returned_component_id =
331                    get_string_data_from_json(&cloud_event.data, "public_key")?;
332                if returned_component_id == component_id {
333                    return Ok(EventCheckOutcome::Success(ComponentStoppedInfo {
334                        host_id: host_id.as_str().into(),
335                        component_id: returned_component_id,
336                    }));
337                }
338            }
339            "com.wasmcloud.lattice.component_scale_failed" => {
340                let returned_component_id =
341                    get_string_data_from_json(&cloud_event.data, "public_key")?;
342
343                if returned_component_id == component_id {
344                    let error = anyhow!(
345                        "{}",
346                        cloud_event
347                            .data
348                            .get("error")
349                            .ok_or_else(|| anyhow!("No error found in data"))?
350                            .as_str()
351                            .ok_or_else(|| anyhow!("error is not a string"))?
352                    );
353
354                    return Ok(EventCheckOutcome::Failure(error));
355                }
356            }
357            _ => {}
358        }
359
360        Ok(EventCheckOutcome::NotApplicable)
361    };
362
363    let event = find_event(receiver, timeout, check_function).await?;
364    Ok(event)
365}