#![warn(clippy::pedantic)]
mod broker;
mod otel;
mod types;
use async_nats::Subscriber;
pub use types::*;
use core::fmt::{self, Debug};
use core::time::Duration;
use std::collections::HashMap;
use cloudevents::event::Event;
use futures::{StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use tracing::{debug, error, instrument, trace};
type Result<T> = ::std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#[derive(Clone)]
pub struct Client {
nc: async_nats::Client,
topic_prefix: Option<String>,
pub lattice: String,
timeout: Duration,
auction_timeout: Duration,
}
impl Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("topic_prefix", &self.topic_prefix)
.field("lattice", &self.lattice)
.field("timeout", &self.timeout)
.field("auction_timeout", &self.auction_timeout)
.finish_non_exhaustive()
}
}
pub struct ClientBuilder {
nc: async_nats::Client,
topic_prefix: Option<String>,
lattice: String,
timeout: Duration,
auction_timeout: Duration,
}
impl ClientBuilder {
#[must_use]
pub fn new(nc: async_nats::Client) -> ClientBuilder {
ClientBuilder {
nc,
topic_prefix: None,
lattice: "default".to_string(),
timeout: Duration::from_secs(2),
auction_timeout: Duration::from_secs(5),
}
}
#[must_use]
pub fn topic_prefix(self, prefix: impl Into<String>) -> ClientBuilder {
ClientBuilder {
topic_prefix: Some(prefix.into()),
..self
}
}
#[must_use]
pub fn lattice(self, prefix: impl Into<String>) -> ClientBuilder {
ClientBuilder {
lattice: prefix.into(),
..self
}
}
#[must_use]
pub fn timeout(self, timeout: Duration) -> ClientBuilder {
ClientBuilder { timeout, ..self }
}
#[must_use]
pub fn auction_timeout(self, timeout: Duration) -> ClientBuilder {
ClientBuilder {
auction_timeout: timeout,
..self
}
}
#[must_use]
pub fn build(self) -> Client {
Client {
nc: self.nc,
topic_prefix: self.topic_prefix,
lattice: self.lattice,
timeout: self.timeout,
auction_timeout: self.auction_timeout,
}
}
}
impl Client {
#[must_use]
pub fn new(nc: async_nats::Client) -> Client {
ClientBuilder::new(nc).build()
}
#[instrument(level = "debug", skip_all)]
pub(crate) async fn request_timeout(
&self,
subject: String,
payload: Vec<u8>,
timeout: Duration,
) -> Result<async_nats::Message> {
match tokio::time::timeout(
timeout,
self.nc.request_with_headers(
subject,
otel::HeaderInjector::default_with_span().into(),
payload.into(),
),
)
.await
{
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timed out").into()),
Ok(Ok(message)) => Ok(message),
Ok(Err(e)) => Err(e.into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_hosts(&self) -> Result<Vec<Host>> {
let subject = broker::queries::hosts(&self.topic_prefix, &self.lattice);
debug!("get_hosts:publish {}", &subject);
self.publish_and_wait(subject, Vec::new()).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_host_inventory(&self, host_id: &str) -> Result<HostInventory> {
let subject = broker::queries::host_inventory(
&self.topic_prefix,
&self.lattice,
parse_identifier(&IdentifierKind::HostId, host_id)?.as_str(),
);
debug!("get_host_inventory:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive host inventory from target host: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_claims(&self) -> Result<Vec<HashMap<String, String>>> {
let subject = broker::queries::claims(&self.topic_prefix, &self.lattice);
debug!("get_claims:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => {
let list: GetClaimsResponse = json_deserialize(&msg.payload)?;
Ok(list.claims)
}
Err(e) => Err(format!("Did not receive claims from lattice: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn perform_actor_auction(
&self,
actor_ref: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<ActorAuctionAck>> {
let subject = broker::actor_auction_subject(&self.topic_prefix, &self.lattice);
let bytes = json_serialize(ActorAuctionRequest {
actor_ref: parse_identifier(&IdentifierKind::ActorRef, actor_ref)?,
constraints,
})?;
debug!("actor_auction:publish {}", &subject);
self.publish_and_wait(subject, bytes).await
}
#[instrument(level = "debug", skip_all)]
pub async fn perform_provider_auction(
&self,
provider_ref: &str,
link_name: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<ProviderAuctionAck>> {
let subject = broker::provider_auction_subject(&self.topic_prefix, &self.lattice);
let bytes = json_serialize(ProviderAuctionRequest {
provider_ref: parse_identifier(&IdentifierKind::ProviderRef, provider_ref)?,
link_name: parse_identifier(&IdentifierKind::LinkName, link_name)?,
constraints,
})?;
debug!("provider_auction:publish {}", &subject);
self.publish_and_wait(subject, bytes).await
}
#[instrument(level = "debug", skip_all)]
pub async fn scale_actor(
&self,
host_id: &str,
actor_ref: &str,
max_instances: u32,
annotations: Option<HashMap<String, String>>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::scale_actor(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("scale_actor:request {}", &subject);
let bytes = json_serialize(ScaleActorCommand {
max_instances,
actor_ref: parse_identifier(&IdentifierKind::ActorRef, actor_ref)?,
host_id,
annotations,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive scale actor acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn put_registries(&self, registries: RegistryCredentialMap) -> Result<()> {
let subject = broker::publish_registries(&self.topic_prefix, &self.lattice);
debug!("put_registries:publish {}", &subject);
let bytes = json_serialize(®istries)?;
let resp = self
.nc
.publish_with_headers(
subject,
otel::HeaderInjector::default_with_span().into(),
bytes.into(),
)
.await;
if let Err(e) = resp {
Err(format!("Failed to push registry credential map: {e}").into())
} else {
Ok(())
}
}
#[instrument(level = "debug", skip_all)]
pub async fn advertise_link(
&self,
actor_id: &str,
provider_id: &str,
contract_id: &str,
link_name: &str,
values: HashMap<String, String>,
) -> Result<CtlOperationAck> {
let ld = LinkDefinition {
actor_id: parse_identifier(&IdentifierKind::ActorId, actor_id)?,
provider_id: parse_identifier(&IdentifierKind::ProviderId, provider_id)?,
contract_id: parse_identifier(&IdentifierKind::ContractId, contract_id)?,
link_name: parse_identifier(&IdentifierKind::LinkName, link_name)?,
values,
};
let subject = broker::advertise_link(&self.topic_prefix, &self.lattice);
debug!("advertise_link:request {}", &subject);
let bytes = crate::json_serialize(&ld)?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive advertise link acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn remove_link(
&self,
actor_id: &str,
contract_id: &str,
link_name: &str,
) -> Result<CtlOperationAck> {
let subject = broker::remove_link(&self.topic_prefix, &self.lattice);
debug!("remove_link:request {}", &subject);
let ld = LinkDefinition {
actor_id: parse_identifier(&IdentifierKind::ActorId, actor_id)?,
contract_id: parse_identifier(&IdentifierKind::ContractId, contract_id)?,
link_name: parse_identifier(&IdentifierKind::LinkName, link_name)?,
..Default::default()
};
let bytes = crate::json_serialize(&ld)?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive remove link acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn query_links(&self) -> Result<Vec<LinkDefinition>> {
let subject = broker::queries::link_definitions(&self.topic_prefix, &self.lattice);
debug!("query_links:request {}", &subject);
match self.request_timeout(subject, vec![], self.timeout).await {
Ok(msg) => {
let list: LinkDefinitionList = json_deserialize(&msg.payload)?;
Ok(list.links)
}
Err(e) => Err(format!("Did not receive a response to links query: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn put_config(
&self,
entity_id: &str,
key: &str,
value: impl Into<Vec<u8>>,
) -> Result<CtlOperationAck> {
let subject = broker::put_config(&self.topic_prefix, &self.lattice, entity_id, key);
debug!(%subject, "Putting config");
match self
.request_timeout(subject, value.into(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => Err(format!("Did not receive a response to put config request: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn delete_config_key(&self, entity_id: &str, key: &str) -> Result<CtlOperationAck> {
let subject = broker::delete_config(&self.topic_prefix, &self.lattice, entity_id, key);
debug!(%subject, %key, "Delete config at key");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => {
Err(format!("Did not receive a response to delete config request: {e}").into())
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn clear_config(&self, entity_id: &str, key: &str) -> Result<CtlOperationAck> {
let subject = broker::clear_config(&self.topic_prefix, &self.lattice, entity_id);
debug!(%subject, %key, "Deleting all config at key");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => {
Err(format!("Did not receive a response to clear config request: {e}").into())
}
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_config(&self, entity_id: &str, key: &str) -> Result<GetConfigKeyResponse> {
let subject = broker::queries::config(&self.topic_prefix, &self.lattice, entity_id, key);
debug!(%subject, %key, "Getting config at key");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => Err(format!("Did not receive a response to get config request: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_all_config(&self, entity_id: &str) -> Result<HashMap<String, Vec<u8>>> {
let subject = broker::queries::all_config(&self.topic_prefix, &self.lattice, entity_id);
debug!(%subject, "Getting all config");
match self
.request_timeout(subject, Vec::default(), self.timeout)
.await
{
Ok(msg) => json_deserialize(&msg.payload),
Err(e) => {
Err(format!("Did not receive a response to get all config request: {e}",).into())
}
}
}
pub async fn put_label(
&self,
host_id: &str,
key: &str,
value: &str,
) -> Result<CtlOperationAck> {
let subject = broker::put_label(&self.topic_prefix, &self.lattice, host_id);
debug!(%subject, "putting label");
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: value.to_string(),
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
}
}
pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlOperationAck> {
let subject = broker::delete_label(&self.topic_prefix, &self.lattice, host_id);
debug!(%subject, "removing label");
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: String::new(), })?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn update_actor(
&self,
host_id: &str,
existing_actor_id: &str,
new_actor_ref: &str,
annotations: Option<HashMap<String, String>>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::update_actor(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("update_actor:request {}", &subject);
let bytes = json_serialize(UpdateActorCommand {
host_id,
actor_id: parse_identifier(&IdentifierKind::ActorId, existing_actor_id)?,
new_actor_ref: parse_identifier(&IdentifierKind::ActorRef, new_actor_ref)?,
annotations,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive update actor acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn start_provider(
&self,
host_id: &str,
provider_ref: &str,
link_name: Option<String>,
annotations: Option<HashMap<String, String>>,
provider_configuration: Option<String>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::start_provider(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("start_provider:request {}", &subject);
let bytes = json_serialize(StartProviderCommand {
host_id,
provider_ref: parse_identifier(&IdentifierKind::ProviderRef, provider_ref)?,
link_name: parse_identifier(
&IdentifierKind::LinkName,
link_name.unwrap_or_else(|| "default".to_string()).as_str(),
)?,
annotations,
configuration: provider_configuration,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive start provider acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn stop_provider(
&self,
host_id: &str,
provider_ref: &str,
link_name: &str,
contract_id: &str,
annotations: Option<HashMap<String, String>>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::stop_provider(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("stop_provider:request {}", &subject);
let bytes = json_serialize(StopProviderCommand {
host_id,
provider_ref: parse_identifier(&IdentifierKind::ProviderRef, provider_ref)?,
link_name: parse_identifier(&IdentifierKind::LinkName, link_name)?,
contract_id: parse_identifier(&IdentifierKind::ContractId, contract_id)?,
annotations,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive stop provider acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn stop_actor(
&self,
host_id: &str,
actor_ref: &str,
annotations: Option<HashMap<String, String>>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::stop_actor(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("stop_actor:request {}", &subject);
let bytes = json_serialize(StopActorCommand {
host_id,
actor_ref: parse_identifier(&IdentifierKind::ActorRef, actor_ref)?,
annotations,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive stop actor acknowledgement: {e}").into()),
}
}
#[instrument(level = "debug", skip_all)]
pub async fn stop_host(
&self,
host_id: &str,
timeout_ms: Option<u64>,
) -> Result<CtlOperationAck> {
let host_id = parse_identifier(&IdentifierKind::HostId, host_id)?;
let subject =
broker::commands::stop_host(&self.topic_prefix, &self.lattice, host_id.as_str());
debug!("stop_host:request {}", &subject);
let bytes = json_serialize(StopHostCommand {
host_id,
timeout: timeout_ms,
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive stop host acknowledgement: {e}").into()),
}
}
async fn publish_and_wait<D: DeserializeOwned>(
&self,
subject: String,
payload: Vec<u8>,
) -> Result<Vec<D>> {
let reply = self.nc.new_inbox();
let sub = self.nc.subscribe(reply.clone()).await?;
self.nc
.publish_with_reply_and_headers(
subject.clone(),
reply,
otel::HeaderInjector::default_with_span().into(),
payload.into(),
)
.await?;
let nc = self.nc.clone();
tokio::spawn(async move {
if let Err(error) = nc.flush().await {
error!(%error, "flush after publish");
}
});
Ok(collect_sub_timeout::<D>(sub, self.auction_timeout, subject.as_str()).await)
}
#[allow(clippy::missing_errors_doc)] pub async fn events_receiver(&self, event_types: Vec<String>) -> Result<Receiver<Event>> {
let (sender, receiver) = tokio::sync::mpsc::channel(5000);
let futs = event_types.into_iter().map(|event_type| {
self.nc
.subscribe(format!("wasmbus.evt.{}.{}", self.lattice, event_type))
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
});
let subs: Vec<Subscriber> = futures::future::join_all(futs)
.await
.into_iter()
.collect::<Result<_>>()?;
let mut stream = futures::stream::select_all(subs);
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
let Ok(evt) = json_deserialize::<Event>(&msg.payload) else {
error!("Object received on event stream was not a CloudEvent");
continue;
};
trace!("received event: {:?}", evt);
let Ok(()) = sender.send(evt).await else {
break;
};
}
});
Ok(receiver)
}
}
fn json_serialize<T>(item: T) -> Result<Vec<u8>>
where
T: Serialize,
{
serde_json::to_vec(&item).map_err(|e| format!("JSON serialization failure: {e}").into())
}
fn json_deserialize<'de, T: Deserialize<'de>>(buf: &'de [u8]) -> Result<T> {
serde_json::from_slice(buf).map_err(|e| format!("JSON deserialization failure: {e}").into())
}
pub async fn collect_sub_timeout<T: DeserializeOwned>(
mut sub: async_nats::Subscriber,
timeout: Duration,
reason: &str,
) -> Vec<T> {
let mut items = Vec::new();
let sleep = tokio::time::sleep(timeout);
tokio::pin!(sleep);
loop {
tokio::select! {
msg = sub.next() => {
let Some(msg) = msg else {
break;
};
if msg.payload.is_empty() {
break;
}
match json_deserialize::<T>(&msg.payload) {
Ok(item) => items.push(item),
Err(error) => {
error!(%reason, %error,
"deserialization error in auction - results may be incomplete",
);
break;
}
}
},
() = &mut sleep => { break; }
}
}
items
}
enum IdentifierKind {
HostId,
ActorId,
ActorRef,
ProviderId,
ProviderRef,
ContractId,
LinkName,
}
fn assert_non_empty_string(input: &str, message: &str) -> Result<String> {
if input.trim().is_empty() {
Err(message.into())
} else {
Ok(input.trim().to_string())
}
}
fn parse_identifier<T: AsRef<str>>(kind: &IdentifierKind, value: T) -> Result<String> {
let value = value.as_ref();
match kind {
IdentifierKind::HostId => assert_non_empty_string(value, "Host ID cannot be empty"),
IdentifierKind::ActorId => assert_non_empty_string(value, "Actor ID cannot be empty"),
IdentifierKind::ActorRef => {
assert_non_empty_string(value, "Actor OCI reference cannot be empty")
}
IdentifierKind::ProviderId => assert_non_empty_string(value, "Provider ID cannot be empty"),
IdentifierKind::ProviderRef => {
assert_non_empty_string(value, "Provider OCI reference cannot be empty")
}
IdentifierKind::ContractId => assert_non_empty_string(value, "Contract ID cannot be empty"),
IdentifierKind::LinkName => assert_non_empty_string(value, "Link Name cannot be empty"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore]
async fn test_events_receiver() {
let nc = async_nats::connect("127.0.0.1:4222").await.unwrap();
let client = ClientBuilder::new(nc)
.timeout(Duration::from_millis(1000))
.auction_timeout(Duration::from_millis(1000))
.build();
let mut receiver = client
.events_receiver(vec!["foobar".to_string()])
.await
.unwrap();
tokio::spawn(async move {
while let Some(evt) = receiver.recv().await {
println!("Event received: {evt:?}");
}
});
println!("Listening to Cloud Events for 120 seconds. Then we will quit.");
tokio::time::sleep(Duration::from_secs(120)).await;
}
#[test]
fn test_parse_identifier() -> Result<()> {
assert!(parse_identifier(&IdentifierKind::HostId, "").is_err());
assert!(parse_identifier(&IdentifierKind::HostId, " ").is_err());
let host_id = parse_identifier(&IdentifierKind::HostId, " ");
assert!(host_id.is_err(), "parsing host id should have failed");
assert!(host_id
.unwrap_err()
.to_string()
.contains("Host ID cannot be empty"));
let provider_ref = parse_identifier(&IdentifierKind::ProviderRef, "");
assert!(
provider_ref.is_err(),
"parsing provider ref should have failed"
);
assert!(provider_ref
.unwrap_err()
.to_string()
.contains("Provider OCI reference cannot be empty"));
assert!(parse_identifier(&IdentifierKind::HostId, "host_id").is_ok());
let actor_id = parse_identifier(&IdentifierKind::ActorId, " iambatman ")?;
assert_eq!(actor_id, "iambatman");
Ok(())
}
}