use core::fmt::{self, Debug};
use core::time::Duration;
use std::collections::HashMap;
use async_nats::Subscriber;
use cloudevents::event::Event;
use futures::{StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use tokio::sync::mpsc::Receiver;
use tracing::{debug, error, instrument, trace};
use crate::types::link::InterfaceLinkDefinition;
use crate::types::ctl::{
CtlResponse, ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
UpdateComponentCommand,
};
use crate::types::host::{Host, HostInventory, HostLabel};
use crate::types::registry::RegistryCredential;
use crate::types::rpc::{
ComponentAuctionAck, ComponentAuctionRequest, DeleteInterfaceLinkDefinitionRequest,
ProviderAuctionAck, ProviderAuctionRequest,
};
use crate::{
broker, json_deserialize, json_serialize, otel, parse_identifier, IdentifierKind, Result,
};
#[derive(Clone)]
pub struct Client {
nc: async_nats::Client,
topic_prefix: Option<String>,
pub lattice: String,
timeout: Duration,
auction_timeout: Duration,
}
impl Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("topic_prefix", &self.topic_prefix)
.field("lattice", &self.lattice)
.field("timeout", &self.timeout)
.field("auction_timeout", &self.auction_timeout)
.finish_non_exhaustive()
}
}
impl Client {
#[allow(unused)]
pub fn nats_client(&self) -> async_nats::Client {
self.nc.clone()
}
}
pub struct ClientBuilder {
nc: async_nats::Client,
topic_prefix: Option<String>,
lattice: String,
timeout: Duration,
auction_timeout: Duration,
}
impl ClientBuilder {
#[must_use]
pub fn new(nc: async_nats::Client) -> ClientBuilder {
ClientBuilder {
nc,
topic_prefix: None,
lattice: "default".to_string(),
timeout: Duration::from_secs(2),
auction_timeout: Duration::from_secs(5),
}
}
#[must_use]
pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
ClientBuilder {
topic_prefix: Some(prefix.into()),
..self
}
}
#[must_use]
pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
ClientBuilder {
lattice: prefix.into(),
..self
}
}
#[must_use]
pub fn timeout(self, timeout: Duration) -> ClientBuilder {
ClientBuilder { timeout, ..self }
}
#[must_use]
pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
ClientBuilder {
auction_timeout: timeout,
..self
}
}
#[must_use]
pub fn build(self) -> Client {
Client {
nc: self.nc,
topic_prefix: self.topic_prefix,
lattice: self.lattice,
timeout: self.timeout,
auction_timeout: self.auction_timeout,
}
}
}
impl Client {
#[must_use]
pub fn new(nc: async_nats::Client) -> Client {
ClientBuilder::new(nc).build()
}
#[instrument(level = "debug", skip_all)]
pub(crate) async fn request_timeout(
&self,
subject: String,
payload: Vec<u8>,
timeout: Duration,
) -> Result<async_nats::Message> {
match tokio::time::timeout(
timeout,
self.nc.request_with_headers(
subject,
otel::HeaderInjector::default_with_span().into(),
payload.into(),
),
)
.await
{
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
Ok(Ok(message)) => Ok(message),
Ok(Err(e)) => Err(e.into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_hosts(&self) -> Result<Vec<CtlResponse<Host>>> {
let subject = broker::v1::queries::hosts(&self.topic_prefix, &self.lattice);
debug!("get_hosts:publish {}", &subject);
self.publish_and_wait(subject, Vec::new()).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_host_inventory(&self, host_id: &str) -> Result<CtlResponse<HostInventory>> {
let subject = broker::v1::queries::host_inventory(
&self.topic_prefix,
&self.lattice,
parse_identifier(&IdentifierKind::HostId, host_id)?.as_str(),
);
debug!("get_host_inventory:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_claims(&self) -> Result<CtlResponse<Vec<HashMap<String, String>>>> {
let subject = broker::v1::queries::claims(&self.topic_prefix, &self.lattice);
debug!("get_claims:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn perform_component_auction(
&self,
component_ref: &str,
component_id: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<CtlResponse<ComponentAuctionAck>>> {
let subject = broker::v1::component_auction_subject(&self.topic_prefix, &self.lattice);
let bytes = json_serialize(ComponentAuctionRequest {
component_ref: parse_identifier(&IdentifierKind::ActorRef, component_ref)?,
component_id: parse_identifier(&IdentifierKind::ComponentId, component_id)?,
constraints,
})?;
debug!("component_auction:publish {}", &subject);
self.publish_and_wait(subject, bytes).await
}
#[instrument(level = "debug", skip_all)]
pub async fn perform_provider_auction(
&self,
provider_ref: &str,
provider_id: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<CtlResponse<ProviderAuctionAck>>> {
let subject = broker::v1::provider_auction_subject(&self.topic_prefix, &self.lattice);
let bytes = json_serialize(ProviderAuctionRequest {
provider_ref: parse_identifier(&IdentifierKind::ProviderRef, provider_ref)?,
provider_id: parse_identifier(&IdentifierKind::ComponentId, provider_id)?,
constraints,
})?;
debug!("provider_auction:publish {}", &subject);
self.publish_and_wait(subject, bytes).await
}
#[instrument(level = "debug", skip_all)]
pub async fn scale_component(
&self,
host_id: &str,
component_ref: &str,
component_id: &str,
max_instances: u32,
annotations: Option<HashMap<String, String>>,
config: Vec<String>,
) -> Result<CtlResponse<()>> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject = broker::v1::commands::scale_component(
&self.topic_prefix,
&self.lattice,
host_id.as_str(),
);
debug!("scale_component:request {}", &subject);
let bytes = json_serialize(ScaleComponentCommand {
max_instances,
component_ref: parse_identifier(&IdentifierKind::ActorRef, component_ref)?,
component_id: parse_identifier(&IdentifierKind::ComponentId, component_id)?,
host_id,
annotations,
config,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive scale component acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn put_registries(
&self,
registries: HashMap<String, RegistryCredential>,
) -> Result<CtlResponse<()>> {
let subject = broker::v1::publish_registries(&self.topic_prefix, &self.lattice);
debug!("put_registries:publish {}", &subject);
let bytes = json_serialize(®istries)?;
let resp = self
.nc
.publish_with_headers(
subject,
otel::HeaderInjector::default_with_span().into(),
bytes.into(),
)
.await;
if let Err(e) = resp {
Err(format!("Failed to push registry credential map: {e}").into())
} else {
Ok(CtlResponse::success())
}
}
#[instrument(level = "debug", skip_all)]
pub async fn put_link(&self, link: InterfaceLinkDefinition) -> Result<CtlResponse<()>> {
parse_identifier(&IdentifierKind::ComponentId, &link.source_id)?;
parse_identifier(&IdentifierKind::ComponentId, &link.target)?;
parse_identifier(&IdentifierKind::LinkName, &link.name)?;
let subject = broker::v1::put_link(&self.topic_prefix, &self.lattice);
debug!("put_link:request {}", &subject);
let bytes = crate::json_serialize(&link)?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive put link acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn delete_link(
&self,
source_id: &str,
link_name: &str,
wit_namespace: &str,
wit_package: &str,
) -> Result<CtlResponse<()>> {
let subject = broker::v1::delete_link(&self.topic_prefix, &self.lattice);
let ld = DeleteInterfaceLinkDefinitionRequest {
source_id: parse_identifier(&IdentifierKind::ComponentId, source_id)?,
name: parse_identifier(&IdentifierKind::LinkName, link_name)?,
wit_namespace: wit_namespace.to_string(),
wit_package: wit_package.to_string(),
};
let bytes = crate::json_serialize(&ld)?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive delete link acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_links(&self) -> Result<CtlResponse<Vec<InterfaceLinkDefinition>>> {
let subject = broker::v1::queries::link_definitions(&self.topic_prefix, &self.lattice);
debug!("get_links:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive a response to get links: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn put_config(
&self,
config_name: &str,
config: impl Into<HashMap<String, String>>,
) -> Result<CtlResponse<()>> {
let subject = broker::v1::put_config(&self.topic_prefix, &self.lattice, config_name);
debug!(%subject, %config_name, "Putting config");
let data = serde_json::to_vec(&config.into())?;
match self.request_timeout(subject, data, self.timeout).await {
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn delete_config(&self, config_name: &str) -> Result<CtlResponse<()>> {
let subject = broker::v1::delete_config(&self.topic_prefix, &self.lattice, config_name);
debug!(%subject, %config_name, "Delete config");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => {
Err(format!("Did not receive a response to delete config request: {e}").into())
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_config(
&self,
config_name: &str,
) -> Result<CtlResponse<HashMap<String, String>>> {
let subject = broker::v1::queries::config(&self.topic_prefix, &self.lattice, config_name);
debug!(%subject, %config_name, "Getting config");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
}
}
pub async fn put_label(
&self,
host_id: &str,
key: &str,
value: &str,
) -> Result<CtlResponse<()>> {
let subject = broker::v1::put_label(&self.topic_prefix, &self.lattice, host_id);
debug!(%subject, "putting label");
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: value.to_string(),
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
}
}
pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlResponse<()>> {
let subject = broker::v1::delete_label(&self.topic_prefix, &self.lattice, host_id);
debug!(%subject, "removing label");
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: String::new(), })?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn update_component(
&self,
host_id: &str,
existing_component_id: &str,
new_component_ref: &str,
annotations: Option<HashMap<String, String>>,
) -> Result<CtlResponse<()>> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject = broker::v1::commands::update_component(
&self.topic_prefix,
&self.lattice,
host_id.as_str(),
);
debug!("update_component:request {}", &subject);
let bytes = json_serialize(UpdateComponentCommand {
host_id,
component_id: parse_identifier(&IdentifierKind::ComponentId, existing_component_id)?,
new_component_ref: parse_identifier(&IdentifierKind::ActorRef, new_component_ref)?,
annotations,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive update component acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn start_provider(
&self,
host_id: &str,
provider_ref: &str,
provider_id: &str,
annotations: Option<HashMap<String, String>>,
provider_configuration: Vec<String>,
) -> Result<CtlResponse<()>> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject = broker::v1::commands::start_provider(
&self.topic_prefix,
&self.lattice,
host_id.as_str(),
);
debug!("start_provider:request {}", &subject);
let bytes = json_serialize(StartProviderCommand {
host_id,
provider_ref: parse_identifier(&IdentifierKind::ProviderRef, provider_ref)?,
provider_id: parse_identifier(&IdentifierKind::ComponentId, provider_id)?,
annotations,
config: provider_configuration,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn stop_provider(&self, host_id: &str, provider_id: &str) -> Result<CtlResponse<()>> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject = broker::v1::commands::stop_provider(
&self.topic_prefix,
&self.lattice,
host_id.as_str(),
);
debug!("stop_provider:request {}", &subject);
let bytes = json_serialize(StopProviderCommand {
host_id,
provider_id: parse_identifier(&IdentifierKind::ComponentId, provider_id)?,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn stop_host(
&self,
host_id: &str,
timeout_ms: Option<u64>,
) -> Result<CtlResponse<()>> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::v1::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("stop_host:request {}", &subject);
let bytes = json_serialize(StopHostCommand {
host_id,
timeout: timeout_ms,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
}
}
async fn publish_and_wait<D: DeserializeOwned>(
&self,
subject: String,
payload: Vec<u8>,
) -> Result<Vec<D>> {
let reply = self.nc.new_inbox();
let sub = self.nc.subscribe(reply.clone()).await?;
self.nc
.publish_with_reply_and_headers(
subject.clone(),
reply,
otel::HeaderInjector::default_with_span().into(),
payload.into(),
)
.await?;
let nc = self.nc.clone();
tokio::spawn(async move {
if let Err(error) = nc.flush().await {
error!(%error, "flush after publish");
}
});
Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
}
#[allow(clippy::missing_errors_doc)] pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
let (sender, receiver) = tokio::sync::mpsc::channel(5000);
let futs = event_types.into_iter().map(|event_type| {
self.nc
.subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
});
let subs: Vec<Subscriber> = futures::future::join_all(futs)
.await
.into_iter()
.collect::<Result<_>>()?;
let mut stream = futures::stream::select_all(subs);
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
error!("Object received on event stream was not a CloudEvent");
continue;
};
trace!("received event: {:?}", evt);
let Ok(()) = sender.send(evt).await else {
break;
};
}
});
Ok(receiver)
}
}
pub async fn collect_sub_timeout<T: DeserializeOwned>(
mut sub: async_nats::Subscriber,
timeout: Duration,
reason: &str,
) -> Vec<T> {
let mut items = Vec::new();
let sleep = tokio::time::sleep(timeout);
tokio::pin!(sleep);
loop {
tokio::select! {
msg = sub.next() => {
let Some(msg) = msg else {
break;
};
if msg.payload.is_empty() {
break;
}
match json_deserialize::<T>(&msg.payload) {
Ok(item) => items.push(item),
Err(error) => {
error!(%reason, %error,
"deserialization error in auction - results may be incomplete",
);
break;
}
}
},
() = &mut sleep => { break; }
}
}
items
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore]
async fn test_events_receiver() {
let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
let client = ClientBuilder::new(nc)
.timeout(Duration::from_millis(1000))
.auction_timeout(Duration::from_millis(1000))
.build();
let mut receiver = client
.events_receiver(vec!["foobar".to_string()])
.await
.unwrap();
tokio::spawn(async move {
while let Some(evt) = receiver.recv().await {
println!("Event received: {evt:?}");
}
});
println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
tokio::time::sleep(Duration::from_secs(120)).await;
}
#[test]
fn test_parse_identifier() -> Result<()> {
assert!(parse_identifier(&IdentifierKind::HostId, "").is_err());
assert!(parse_identifier(&IdentifierKind::HostId, " ").is_err());
let host_id = parse_identifier(&IdentifierKind::HostId, " ");
assert!(host_id.is_err(), "parsing host id should have failed");
assert!(host_id
.unwrap_err()
.to_string()
.contains("Host ID cannot be empty"));
let provider_ref = parse_identifier(&IdentifierKind::ProviderRef, "");
assert!(
provider_ref.is_err(),
"parsing provider ref should have failed"
);
assert!(provider_ref
.unwrap_err()
.to_string()
.contains("Provider OCI reference cannot be empty"));
assert!(parse_identifier(&IdentifierKind::HostId, "host_id").is_ok());
let component_id =
parse_identifier(&IdentifierKind::ComponentId, " iambatman ")?;
assert_eq!(component_id, "iambatman");
Ok(())
}
#[tokio::test]
#[ignore]
async fn ctl_response_comprehensive() {
let client = Client::new(
async_nats::connect("127.0.0.1:4222")
.await
.expect("should be able to connect to local NATS"),
);
let hosts = client
.get_hosts()
.await
.expect("should be able to fetch at least a host");
assert_eq!(hosts.len(), 1);
let host = hosts.first().expect("one host to exist");
assert!(host.success);
assert!(host.message.is_empty());
assert!(host.response.is_some());
let host = host.response.as_ref().unwrap();
let auction_response = client
.perform_component_auction(
"ghcr.io/brooksmtownsend/http-hello-world-rust:0.1.0",
"echo",
HashMap::new(),
)
.await
.expect("should be able to auction an component");
assert_eq!(auction_response.len(), 1);
let first_ack = auction_response.first().expect("a single component ack");
let auction_ack = first_ack.response.as_ref().unwrap();
let (component_ref, component_id) = (&auction_ack.component_ref, &auction_ack.component_id);
let scale_response = client
.scale_component(
&host.id,
component_ref,
component_id,
1,
None,
Vec::with_capacity(0),
)
.await
.expect("should be able to scale component");
assert!(scale_response.success);
assert!(scale_response.message.is_empty());
assert!(scale_response.response.is_none());
let update_component_resp = client
.update_component(
&host.id,
"nonexistantcomponentID",
"wasmcloud.azurecr.io/kvcounter:0.4.0",
None,
)
.await
.expect("should be able to issue update component request");
assert!(!update_component_resp.success);
assert_eq!(
update_component_resp.message,
"component not found".to_string()
);
assert_eq!(update_component_resp.response, None);
let provider_acks = client
.perform_provider_auction(
"wasmcloud.azurecr.io/httpserver:0.19.1",
"httpserver",
HashMap::new(),
)
.await
.expect("should be able to hold provider auction");
assert_eq!(provider_acks.len(), 1);
let provider_ack = provider_acks.first().expect("a single provider ack");
assert!(provider_ack.success);
assert!(provider_ack.message.is_empty());
assert!(provider_ack.response.is_some());
let auction_ack = provider_ack.response.as_ref().unwrap();
let (provider_ref, provider_id) = (&auction_ack.provider_ref, &auction_ack.provider_id);
let start_response = client
.start_provider(&host.id, provider_ref, provider_id, None, vec![])
.await
.expect("should be able to start provider");
assert!(start_response.success);
assert!(start_response.message.is_empty());
assert!(start_response.response.is_none());
let stop_response = client
.stop_provider(&host.id, "notarealproviderID")
.await
.expect("should be able to issue stop provider request");
assert!(!stop_response.success);
assert_eq!(
stop_response.message,
"provider with that ID is not running".to_string()
);
assert!(stop_response.response.is_none());
tokio::time::sleep(Duration::from_secs(5)).await;
let link_put = client
.put_link(InterfaceLinkDefinition {
source_id: "echo".to_string(),
target: "httpserver".to_string(),
name: "default".to_string(),
wit_namespace: "wasi".to_string(),
wit_package: "http".to_string(),
interfaces: vec!["incoming-handler".to_string()],
source_config: vec![],
target_config: vec![],
})
.await
.expect("should be able to put link");
assert!(link_put.success);
assert!(link_put.message.is_empty());
assert!(link_put.response.is_none());
let links_get = client
.get_links()
.await
.expect("should be able to get links");
assert!(links_get.success);
assert!(links_get.message.is_empty());
assert!(links_get.response.is_some());
let link_get = links_get.response.as_ref().unwrap().first().unwrap();
assert_eq!(link_get.source_id, "echo");
assert_eq!(link_get.target, "httpserver");
assert_eq!(link_get.name, "default");
assert_eq!(link_get.wit_namespace, "wasi");
assert_eq!(link_get.wit_package, "http");
let link_del = client
.delete_link("echo", "default", "wasi", "http")
.await
.expect("should be able to delete link");
assert!(link_del.success);
assert!(link_del.message.is_empty());
assert!(link_del.response.is_none());
let label_one = client
.put_label(&host.id, "idk", "lol")
.await
.expect("should be able to put label");
assert!(label_one.success);
assert!(label_one.message.is_empty());
assert!(label_one.response.is_none());
let label_two = client
.put_label(&host.id, "foo", "bar")
.await
.expect("should be able to put another label");
assert!(label_two.success);
assert!(label_two.message.is_empty());
assert!(label_two.response.is_none());
let del_label_one = client
.delete_label(&host.id, "idk")
.await
.expect("should be able to delete label");
assert!(del_label_one.success);
assert!(del_label_one.message.is_empty());
assert!(del_label_one.response.is_none());
let registry_put = client
.put_registries(HashMap::from_iter([(
"mycloud.io".to_string(),
RegistryCredential {
username: Some("user".to_string()),
password: Some("pass".to_string()),
registry_type: "oci".to_string(),
token: None,
},
)]))
.await
.expect("should be able to put registries");
assert!(registry_put.success);
assert!(registry_put.message.is_empty());
assert!(registry_put.response.is_none());
let config_put = client
.put_config(
"test_config",
HashMap::from_iter([("sup".to_string(), "hey".to_string())]),
)
.await
.expect("should be able to put config");
assert!(config_put.success);
assert!(config_put.message.is_empty());
assert!(config_put.response.is_none());
let config_get = client
.get_config("test_config")
.await
.expect("should be able to get config");
assert!(config_get.success);
assert!(config_get.message.is_empty());
assert!(config_get
.response
.is_some_and(|r| r.get("sup").is_some_and(|s| s == "hey")));
let config_del = client
.delete_config("test_config")
.await
.expect("should be able to delete config");
assert!(config_del.success);
assert!(config_del.message.is_empty());
assert!(config_del.response.is_none());
let inventory = client
.get_host_inventory(&host.id)
.await
.expect("should be able to fetch at least a host");
assert!(inventory.success);
assert!(inventory.message.is_empty());
assert!(inventory.response.is_some());
let host_inventory = inventory.response.unwrap();
assert!(host_inventory.components.iter().all(|a| a.id == "echo"));
assert!(host_inventory.labels.get("idk").is_none());
assert!(host_inventory
.labels
.get("foo")
.is_some_and(|f| f == &"bar".to_string()));
let stop_host = client
.stop_host(&host.id, Some(1234))
.await
.expect("should be able to stop host");
assert!(stop_host.success);
assert!(stop_host.message.is_empty());
assert!(stop_host.response.is_none());
}
}