pub mod broker;
mod generated;
mod inv;
mod sub_stream;
pub use crate::generated::ctliface::*;
use inv::Entity;
pub use inv::{Invocation, InvocationResponse};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, time::Duration};
use sub_stream::SubscriptionStream;
use wascap::prelude::KeyPair;
type Result<T> = ::std::result::Result<T, Box<dyn ::std::error::Error + Send + Sync>>;
pub struct Client {
nc: nats::asynk::Connection,
nsprefix: Option<String>,
timeout: Duration,
key: KeyPair,
}
impl Client {
pub fn new(nc: nats::asynk::Connection, nsprefix: Option<String>, timeout: Duration) -> Self {
Client {
nc,
nsprefix,
timeout,
key: KeyPair::new_server(),
}
}
pub async fn get_hosts(&self, timeout: Duration) -> Result<Vec<Host>> {
let subject = broker::queries::hosts(&self.nsprefix);
let sub = self.nc.request_multi(&subject, vec![]).await?;
let hosts = SubscriptionStream::new(sub)
.collect(timeout, "get hosts")
.await;
Ok(hosts)
}
pub async fn perform_actor_auction(
&self,
actor_ref: &str,
constraints: HashMap<String, String>,
timeout: Duration,
) -> Result<Vec<ActorAuctionAck>> {
let subject = broker::actor_auction_subject(&self.nsprefix);
let bytes = serialize(ActorAuctionRequest {
actor_ref: actor_ref.to_string(),
constraints,
})?;
let sub = self.nc.request_multi(&subject, bytes).await?;
let actors = SubscriptionStream::new(sub)
.collect(timeout, "actor auction")
.await;
Ok(actors)
}
pub async fn perform_provider_auction(
&self,
provider_ref: &str,
link_name: &str,
constraints: HashMap<String, String>,
timeout: Duration,
) -> Result<Vec<ProviderAuctionAck>> {
let subject = broker::provider_auction_subject(&self.nsprefix);
let bytes = serialize(ProviderAuctionRequest {
provider_ref: provider_ref.to_string(),
link_name: link_name.to_string(),
constraints,
})?;
let sub = self.nc.request_multi(&subject, bytes).await?;
let providers = SubscriptionStream::new(sub)
.collect(timeout, "provider auction")
.await;
Ok(providers)
}
pub async fn get_host_inventory(&self, host_id: &str) -> Result<HostInventory> {
let subject = broker::queries::host_inventory(&self.nsprefix, host_id);
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, vec![])).await? {
Ok(msg) => {
let hi: HostInventory = deserialize(&msg.data)?;
Ok(hi)
}
Err(e) => Err(format!("Did not receive host inventory from target host: {}", e).into()),
}
}
pub async fn start_actor(&self, host_id: &str, actor_ref: &str) -> Result<StartActorAck> {
let subject = broker::commands::start_actor(&self.nsprefix, host_id);
let bytes = serialize(StartActorCommand {
actor_ref: actor_ref.to_string(),
host_id: host_id.to_string(),
})?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let ack: StartActorAck = deserialize(&msg.data)?;
Ok(ack)
}
Err(e) => Err(format!("Did not receive start actor acknowledgement: {}", e).into()),
}
}
pub async fn call_actor(
&self,
target_id: &str,
operation: &str,
data: &[u8],
) -> Result<InvocationResponse> {
let subject = broker::rpc::call_actor(&self.nsprefix, target_id);
let bytes = crate::generated::ctliface::serialize(Invocation::new(
&self.key,
Entity::Actor("system".to_string()),
Entity::Actor(target_id.to_string()),
operation,
data.to_vec(),
))?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let resp: InvocationResponse = crate::generated::ctliface::deserialize(&msg.data)?;
Ok(resp)
}
Err(e) => Err(format!("Actor RPC call did not succeed: {}", e).into()),
}
}
pub async fn advertise_link(
&self,
actor_id: &str,
provider_id: &str,
contract_id: &str,
link_name: &str,
values: HashMap<String, String>,
) -> Result<()> {
let subject = broker::rpc::advertise_links(&self.nsprefix);
let ld = LinkDefinition {
actor_id: actor_id.to_string(),
provider_id: provider_id.to_string(),
contract_id: contract_id.to_string(),
link_name: link_name.to_string(),
values,
};
let bytes = crate::generated::ctliface::serialize(&ld)?;
self.nc.publish(&subject, &bytes).await?;
Ok(())
}
pub async fn update_actor(
&self,
host_id: &str,
existing_actor_id: &str,
new_actor_ref: &str,
) -> Result<UpdateActorAck> {
let subject = broker::commands::update_actor(&self.nsprefix, host_id);
let bytes = serialize(UpdateActorCommand {
host_id: host_id.to_string(),
actor_id: existing_actor_id.to_string(),
new_actor_ref: new_actor_ref.to_string(),
})?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let ack: UpdateActorAck = deserialize(&msg.data)?;
Ok(ack)
}
Err(e) => Err(format!("Did not receive update actor acknowledgement: {}", e).into()),
}
}
pub async fn start_provider(
&self,
host_id: &str,
provider_ref: &str,
link_name: Option<String>,
) -> Result<StartProviderAck> {
let subject = broker::commands::start_provider(&self.nsprefix, host_id);
let bytes = serialize(StartProviderCommand {
host_id: host_id.to_string(),
provider_ref: provider_ref.to_string(),
link_name: link_name.unwrap_or_else(|| "default".to_string()),
})?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let ack: StartProviderAck = deserialize(&msg.data)?;
Ok(ack)
}
Err(e) => Err(format!("Did not receive start provider acknowledgement: {}", e).into()),
}
}
pub async fn stop_provider(
&self,
host_id: &str,
provider_ref: &str,
link_name: &str,
contract_id: &str,
) -> Result<StopProviderAck> {
let subject = broker::commands::stop_provider(&self.nsprefix, host_id);
let bytes = serialize(StopProviderCommand {
host_id: host_id.to_string(),
provider_ref: provider_ref.to_string(),
link_name: link_name.to_string(),
contract_id: contract_id.to_string(),
})?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let ack: StopProviderAck = deserialize(&msg.data)?;
Ok(ack)
}
Err(e) => Err(format!("Did not receive stop provider acknowledgement: {}", e).into()),
}
}
pub async fn stop_actor(&self, host_id: &str, actor_ref: &str) -> Result<StopActorAck> {
let subject = broker::commands::stop_actor(&self.nsprefix, host_id);
let bytes = serialize(StopActorCommand {
host_id: host_id.to_string(),
actor_ref: actor_ref.to_string(),
})?;
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, &bytes)).await? {
Ok(msg) => {
let ack: StopActorAck = deserialize(&msg.data)?;
Ok(ack)
}
Err(e) => Err(format!("Did not receive stop actor acknowledgement: {}", e).into()),
}
}
pub async fn get_claims(&self) -> Result<ClaimsList> {
let subject = broker::queries::claims(&self.nsprefix);
match actix_rt::time::timeout(self.timeout, self.nc.request(&subject, vec![])).await? {
Ok(msg) => {
let list: ClaimsList = deserialize(&msg.data)?;
Ok(list)
}
Err(e) => Err(format!("Did not receive claims from lattice: {}", e).into()),
}
}
}
pub fn serialize<T>(
item: T,
) -> ::std::result::Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>>
where
T: Serialize,
{
serde_json::to_vec(&item).map_err(|_e| "JSON serialization failure".into())
}
pub fn deserialize<'de, T: Deserialize<'de>>(
buf: &'de [u8],
) -> ::std::result::Result<T, Box<dyn std::error::Error + Send + Sync>> {
serde_json::from_slice(buf).map_err(|_e| "JSON deserialization failure".into())
}