1use anyhow::{anyhow, bail, Result};
2use cloudevents::event::{AttributesReader, Event};
3use tokio::sync::mpsc::Receiver;
4use tokio::time::{Duration, Instant};
5
6use crate::component::ComponentScaledInfo;
7
8#[derive(Debug)]
10struct CloudEventData {
11 event_type: String,
12 source: String,
13 data: serde_json::Value,
14}
15
16fn get_string_data_from_json(json: &serde_json::Value, key: &str) -> Result<String> {
18 Ok(json
19 .get(key)
20 .ok_or_else(|| anyhow!("No {} key found in json data", key))?
21 .as_str()
22 .ok_or_else(|| anyhow!("{} is not a string", key))?
23 .to_string())
24}
25
26fn get_wasmbus_event_info(event: Event) -> Result<CloudEventData> {
28 let data: serde_json::Value = event
29 .data()
30 .ok_or_else(|| anyhow!("No data in event"))?
31 .clone()
32 .try_into()?;
33
34 Ok(CloudEventData {
35 event_type: event.ty().to_string(),
36 source: event.source().to_string(),
37 data,
38 })
39}
40
41pub enum FindEventOutcome<T> {
45 Success(T),
46 Failure(anyhow::Error),
47}
48
49pub enum EventCheckOutcome<T> {
52 Success(T),
53 Failure(anyhow::Error),
54 NotApplicable,
55}
56
57async fn find_event<T>(
68 receiver: &mut Receiver<Event>,
69 timeout: Duration,
70 check_function: impl Fn(Event) -> Result<EventCheckOutcome<T>>,
71) -> Result<FindEventOutcome<T>> {
72 let start = Instant::now();
73 loop {
74 let elapsed = start.elapsed();
75 if elapsed >= timeout {
76 bail!("Timeout waiting for event");
77 }
78
79 match tokio::time::timeout(timeout - elapsed, receiver.recv()).await {
80 Ok(Some(event)) => {
81 let outcome = check_function(event)?;
82
83 match outcome {
84 EventCheckOutcome::Success(success_data) => {
85 return Ok(FindEventOutcome::Success(success_data))
86 }
87 EventCheckOutcome::Failure(e) => return Ok(FindEventOutcome::Failure(e)),
88 EventCheckOutcome::NotApplicable => continue,
89 }
90 }
91 Err(_e) => {
92 return Ok(FindEventOutcome::Failure(anyhow!(
93 "Timed out waiting for applicable event, operation may have failed"
94 )))
95 }
96 Ok(None) => {
98 return Ok(FindEventOutcome::Failure(anyhow!(
99 "Channel dropped before event was received, please report this at https://github.com/wasmCloud/wasmCloud/issues with details to reproduce"
100 )))
101 }
102
103 }
104 }
105}
106
107pub async fn wait_for_component_scaled_event(
114 receiver: &mut Receiver<Event>,
115 timeout: Duration,
116 host_id: impl AsRef<str>,
117 component_ref: impl AsRef<str>,
118) -> Result<FindEventOutcome<ComponentScaledInfo>> {
119 let host_id = host_id.as_ref();
120 let component_ref = component_ref.as_ref();
121 let check_function = move |event: Event| {
122 let cloud_event = get_wasmbus_event_info(event)?;
123
124 if cloud_event.source != host_id {
125 return Ok(EventCheckOutcome::NotApplicable);
126 }
127
128 match cloud_event.event_type.as_str() {
129 "com.wasmcloud.lattice.component_scaled" => {
130 let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
131
132 if image_ref == component_ref {
133 let component_id =
134 get_string_data_from_json(&cloud_event.data, "component_id")?;
135 return Ok(EventCheckOutcome::Success(ComponentScaledInfo {
136 host_id: host_id.into(),
137 component_ref: component_ref.into(),
138 component_id: component_id.as_str().into(),
139 }));
140 }
141 }
142 "com.wasmcloud.lattice.component_scale_failed" => {
143 let returned_component_ref =
144 get_string_data_from_json(&cloud_event.data, "image_ref")?;
145
146 if returned_component_ref == component_ref {
147 let error = anyhow!(
148 "{}",
149 cloud_event
150 .data
151 .get("error")
152 .ok_or_else(|| anyhow!("No error found in data"))?
153 .as_str()
154 .ok_or_else(|| anyhow!("error is not a string"))?
155 );
156
157 return Ok(EventCheckOutcome::Failure(error));
158 }
159 }
160 _ => {}
161 }
162
163 Ok(EventCheckOutcome::NotApplicable)
164 };
165
166 let event = find_event(receiver, timeout, check_function).await?;
167 Ok(event)
168}
169
170pub struct ProviderStartedInfo {
172 pub host_id: String,
173 pub provider_ref: String,
174 pub provider_id: String,
175}
176
177pub async fn wait_for_provider_start_event(
184 receiver: &mut Receiver<Event>,
185 timeout: Duration,
186 host_id: String,
187 provider_ref: String,
188) -> Result<FindEventOutcome<ProviderStartedInfo>> {
189 let check_function = move |event: Event| {
190 let cloud_event = get_wasmbus_event_info(event)?;
191
192 if cloud_event.source != host_id.as_str() {
193 return Ok(EventCheckOutcome::NotApplicable);
194 }
195
196 match cloud_event.event_type.as_str() {
197 "com.wasmcloud.lattice.provider_started" => {
198 let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
199
200 if image_ref == provider_ref {
201 let provider_id = get_string_data_from_json(&cloud_event.data, "provider_id")?;
202
203 return Ok(EventCheckOutcome::Success(ProviderStartedInfo {
204 host_id: host_id.as_str().into(),
205 provider_ref: provider_ref.as_str().into(),
206 provider_id,
207 }));
208 }
209 }
210 "com.wasmcloud.lattice.provider_start_failed" => {
211 let returned_provider_ref =
212 get_string_data_from_json(&cloud_event.data, "provider_ref")?;
213
214 if returned_provider_ref == provider_ref {
215 let error = anyhow!(
216 "{}",
217 cloud_event
218 .data
219 .get("error")
220 .ok_or_else(|| anyhow!("No error found in data"))?
221 .as_str()
222 .ok_or_else(|| anyhow!("error is not a string"))?
223 );
224
225 return Ok(EventCheckOutcome::Failure(error));
226 }
227 }
228 _ => {}
229 }
230
231 Ok(EventCheckOutcome::NotApplicable)
232 };
233
234 let event = find_event(receiver, timeout, check_function).await?;
235 Ok(event)
236}
237
238pub struct ProviderStoppedInfo {
240 pub host_id: String,
241 pub provider_id: String,
242}
243
244pub async fn wait_for_provider_stop_event(
251 receiver: &mut Receiver<Event>,
252 timeout: Duration,
253 host_id: String,
254 provider_id: String,
255) -> Result<FindEventOutcome<ProviderStoppedInfo>> {
256 let check_function = move |event: Event| {
257 let cloud_event = get_wasmbus_event_info(event)?;
258
259 if cloud_event.source != host_id.as_str() {
260 return Ok(EventCheckOutcome::NotApplicable);
261 }
262
263 match cloud_event.event_type.as_str() {
264 "com.wasmcloud.lattice.provider_stopped" => {
265 let returned_provider_id =
266 get_string_data_from_json(&cloud_event.data, "provider_id")?;
267
268 if returned_provider_id == provider_id {
269 return Ok(EventCheckOutcome::Success(ProviderStoppedInfo {
270 host_id: host_id.as_str().into(),
271 provider_id: returned_provider_id,
272 }));
273 }
274 }
275 "com.wasmcloud.lattice.provider_stop_failed" => {
276 let returned_provider_id =
277 get_string_data_from_json(&cloud_event.data, "public_key")?;
278
279 if returned_provider_id == provider_id {
280 let error = anyhow!(
281 "{}",
282 cloud_event
283 .data
284 .get("error")
285 .ok_or_else(|| anyhow!("No error found in data"))?
286 .as_str()
287 .ok_or_else(|| anyhow!("error is not a string"))?
288 );
289
290 return Ok(EventCheckOutcome::Failure(error));
291 }
292 }
293 _ => {}
294 }
295
296 Ok(EventCheckOutcome::NotApplicable)
297 };
298
299 let event = find_event(receiver, timeout, check_function).await?;
300 Ok(event)
301}
302
303pub struct ComponentStoppedInfo {
305 pub host_id: String,
306 pub component_id: String,
307}
308
309pub async fn wait_for_component_stop_event(
316 receiver: &mut Receiver<Event>,
317 timeout: Duration,
318 host_id: String,
319 component_id: String,
320) -> Result<FindEventOutcome<ComponentStoppedInfo>> {
321 let check_function = move |event: Event| {
322 let cloud_event = get_wasmbus_event_info(event)?;
323
324 if cloud_event.source != host_id.as_str() {
325 return Ok(EventCheckOutcome::NotApplicable);
326 }
327
328 match cloud_event.event_type.as_str() {
329 "com.wasmcloud.lattice.component_scaled" => {
330 let returned_component_id =
331 get_string_data_from_json(&cloud_event.data, "public_key")?;
332 if returned_component_id == component_id {
333 return Ok(EventCheckOutcome::Success(ComponentStoppedInfo {
334 host_id: host_id.as_str().into(),
335 component_id: returned_component_id,
336 }));
337 }
338 }
339 "com.wasmcloud.lattice.component_scale_failed" => {
340 let returned_component_id =
341 get_string_data_from_json(&cloud_event.data, "public_key")?;
342
343 if returned_component_id == component_id {
344 let error = anyhow!(
345 "{}",
346 cloud_event
347 .data
348 .get("error")
349 .ok_or_else(|| anyhow!("No error found in data"))?
350 .as_str()
351 .ok_or_else(|| anyhow!("error is not a string"))?
352 );
353
354 return Ok(EventCheckOutcome::Failure(error));
355 }
356 }
357 _ => {}
358 }
359
360 Ok(EventCheckOutcome::NotApplicable)
361 };
362
363 let event = find_event(receiver, timeout, check_function).await?;
364 Ok(event)
365}