use anyhow::{anyhow, bail, Result};
use cloudevents::{event::Event, AttributesReader};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Receiver;
async fn find_event<T>(
receiver: &mut Receiver<Event>,
timeout: Duration,
check_function: impl Fn(Event) -> Result<EventCheckOutcome<T>>,
) -> Result<FindEventOutcome<T>> {
let start = Instant::now();
loop {
let elapsed = start.elapsed();
if elapsed >= timeout {
bail!("Timeout waiting for event");
}
match tokio::time::timeout(timeout - elapsed, receiver.recv()).await {
Ok(Some(event)) => {
let outcome = check_function(event)?;
match outcome {
EventCheckOutcome::Success(success_data) => {
return Ok(FindEventOutcome::Success(success_data))
}
EventCheckOutcome::Failure(e) => return Ok(FindEventOutcome::Failure(e)),
EventCheckOutcome::NotApplicable => continue,
}
}
Err(_e) => {
return Ok(FindEventOutcome::Failure(anyhow!(
"Timed out waiting for applicable event, operation may have failed"
)))
}
Ok(None) => {
return Ok(FindEventOutcome::Failure(anyhow!(
"Channel dropped before event was received, please report this at https://github.com/wasmCloud/wash/issues with details to reproduce"
)))
}
}
}
}
pub enum FindEventOutcome<T> {
Success(T),
Failure(anyhow::Error),
}
enum EventCheckOutcome<T> {
Success(T),
Failure(anyhow::Error),
NotApplicable,
}
pub(crate) async fn wait_for_actor_start_event(
receiver: &mut Receiver<Event>,
timeout: Duration,
host_id: String,
actor_ref: String,
) -> Result<FindEventOutcome<()>> {
let check_function = move |event: Event| {
let cloud_event = get_wasmbus_event_info(event)?;
if cloud_event.source != host_id.as_str() {
return Ok(EventCheckOutcome::NotApplicable);
}
match cloud_event.event_type.as_str() {
"com.wasmcloud.lattice.actor_started" => {
let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
if image_ref == actor_ref {
return Ok(EventCheckOutcome::Success(()));
}
}
"com.wasmcloud.lattice.actor_start_failed" => {
let returned_actor_ref = get_string_data_from_json(&cloud_event.data, "actor_ref")?;
if returned_actor_ref == actor_ref {
let error = anyhow!(
"{}",
cloud_event
.data
.get("error")
.ok_or_else(|| anyhow!("No error found in data"))?
.as_str()
.ok_or_else(|| anyhow!("error is not a string"))?
);
return Ok(EventCheckOutcome::Failure(error));
}
}
_ => {}
}
Ok(EventCheckOutcome::NotApplicable)
};
let event = find_event(receiver, timeout, check_function).await?;
Ok(event)
}
pub async fn wait_for_provider_start_event(
receiver: &mut Receiver<Event>,
timeout: Duration,
host_id: String,
provider_ref: String,
) -> Result<FindEventOutcome<()>> {
let check_function = move |event: Event| {
let cloud_event = get_wasmbus_event_info(event)?;
if cloud_event.source != host_id.as_str() {
return Ok(EventCheckOutcome::NotApplicable);
}
match cloud_event.event_type.as_str() {
"com.wasmcloud.lattice.provider_started" => {
let image_ref = get_string_data_from_json(&cloud_event.data, "image_ref")?;
if image_ref == provider_ref {
return Ok(EventCheckOutcome::Success(()));
}
}
"com.wasmcloud.lattice.provider_start_failed" => {
let returned_provider_ref =
get_string_data_from_json(&cloud_event.data, "provider_ref")?;
if returned_provider_ref == provider_ref {
let error = anyhow!(
"{}",
cloud_event
.data
.get("error")
.ok_or_else(|| anyhow!("No error found in data"))?
.as_str()
.ok_or_else(|| anyhow!("error is not a string"))?
);
return Ok(EventCheckOutcome::Failure(error));
}
}
_ => {}
}
Ok(EventCheckOutcome::NotApplicable)
};
let event = find_event(receiver, timeout, check_function).await?;
Ok(event)
}
pub async fn wait_for_provider_stop_event(
receiver: &mut Receiver<Event>,
timeout: Duration,
host_id: String,
provider_id: String,
) -> Result<FindEventOutcome<()>> {
let check_function = move |event: Event| {
let cloud_event = get_wasmbus_event_info(event)?;
if cloud_event.source != host_id.as_str() {
return Ok(EventCheckOutcome::NotApplicable);
}
match cloud_event.event_type.as_str() {
"com.wasmcloud.lattice.provider_stopped" => {
let returned_provider_id =
get_string_data_from_json(&cloud_event.data, "public_key")?;
if returned_provider_id == provider_id {
return Ok(EventCheckOutcome::Success(()));
}
}
"com.wasmcloud.lattice.provider_stop_failed" => {
let returned_provider_id =
get_string_data_from_json(&cloud_event.data, "public_key")?;
if returned_provider_id == provider_id {
let error = anyhow!(
"{}",
cloud_event
.data
.get("error")
.ok_or_else(|| anyhow!("No error found in data"))?
.as_str()
.ok_or_else(|| anyhow!("error is not a string"))?
);
return Ok(EventCheckOutcome::Failure(error));
}
}
_ => {}
}
Ok(EventCheckOutcome::NotApplicable)
};
let event = find_event(receiver, timeout, check_function).await?;
Ok(event)
}
pub async fn wait_for_actor_stop_event(
receiver: &mut Receiver<Event>,
timeout: Duration,
host_id: String,
actor_id: String,
) -> Result<FindEventOutcome<()>> {
let check_function = move |event: Event| {
let cloud_event = get_wasmbus_event_info(event)?;
if cloud_event.source != host_id.as_str() {
return Ok(EventCheckOutcome::NotApplicable);
}
match cloud_event.event_type.as_str() {
"com.wasmcloud.lattice.actor_stopped" => {
let returned_actor_id = get_string_data_from_json(&cloud_event.data, "public_key")?;
if returned_actor_id == actor_id {
return Ok(EventCheckOutcome::Success(()));
}
}
"com.wasmcloud.lattice.actor_stop_failed" => {
let returned_actor_id = get_string_data_from_json(&cloud_event.data, "public_key")?;
if returned_actor_id == actor_id {
let error = anyhow!(
"{}",
cloud_event
.data
.get("error")
.ok_or_else(|| anyhow!("No error found in data"))?
.as_str()
.ok_or_else(|| anyhow!("error is not a string"))?
);
return Ok(EventCheckOutcome::Failure(error));
}
}
_ => {}
}
Ok(EventCheckOutcome::NotApplicable)
};
let event = find_event(receiver, timeout, check_function).await?;
Ok(event)
}
struct CloudEventData {
event_type: String,
source: String,
data: serde_json::Value,
}
fn get_wasmbus_event_info(event: Event) -> Result<CloudEventData> {
let data: serde_json::Value = event
.data()
.ok_or_else(|| anyhow!("No data in event"))?
.clone()
.try_into()?;
Ok(CloudEventData {
event_type: event.ty().to_string(),
source: event.source().to_string(),
data,
})
}
fn get_string_data_from_json(json: &serde_json::Value, key: &str) -> Result<String> {
Ok(json
.get(key)
.ok_or_else(|| anyhow!("No {} key found in json data", key))?
.as_str()
.ok_or_else(|| anyhow!("{} is not a string", key))?
.to_string())
}