extern crate log;
#[macro_use]
extern crate serde;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use crossbeam::Sender;
use wascap::prelude::*;
use controlplane::{
LaunchAck, LaunchAuctionRequest, LaunchAuctionResponse, LaunchCommand, TerminateCommand,
};
pub use events::{BusEvent, CloudEvent};
use crate::controlplane::{
LaunchProviderCommand, ProviderAuctionRequest, ProviderAuctionResponse, ProviderLaunchAck,
};
pub mod controlplane;
mod events;
pub const INVENTORY_ACTORS: &str = "inventory.actors";
pub const INVENTORY_HOSTS: &str = "inventory.hosts";
pub const INVENTORY_BINDINGS: &str = "inventory.bindings";
pub const INVENTORY_CAPABILITIES: &str = "inventory.capabilities";
pub const EVENTS: &str = "events";
const AUCTION_TIMEOUT_SECONDS: u64 = 5;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum InventoryResponse {
Host(HostProfile),
Actors {
host: String,
actors: Vec<Claims<Actor>>,
},
Bindings {
host: String,
bindings: Vec<Binding>,
},
Capabilities {
host: String,
capabilities: Vec<HostedCapability>,
},
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct HostProfile {
pub id: String,
pub labels: HashMap<String, String>,
pub uptime_ms: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct HostedCapability {
pub binding_name: String,
pub descriptor: wascc_codec::capabilities::CapabilityDescriptor,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Binding {
pub actor: String,
pub capability_id: String,
pub binding_name: String,
pub configuration: HashMap<String, String>,
}
pub struct Client {
nc: nats::Connection,
namespace: Option<String>,
timeout: Duration,
}
impl Client {
pub fn new(
host: &str,
credsfile: Option<PathBuf>,
call_timeout: Duration,
namespace: Option<String>,
) -> Self {
Client {
nc: get_connection(host, credsfile),
timeout: call_timeout,
namespace,
}
}
pub fn with_connection(
nc: nats::Connection,
call_timeout: Duration,
namespace: Option<String>,
) -> Self {
Client {
nc,
timeout: call_timeout,
namespace,
}
}
pub fn get_hosts(&self) -> std::result::Result<Vec<HostProfile>, Box<dyn std::error::Error>> {
let mut hosts = vec![];
let sub = self
.nc
.request_multi(self.gen_subject(INVENTORY_HOSTS).as_ref(), &[])?;
for msg in sub.timeout_iter(self.timeout) {
let ir: InventoryResponse = serde_json::from_slice(&msg.data)?;
if let InventoryResponse::Host(h) = ir {
hosts.push(h);
}
}
Ok(hosts)
}
pub fn get_bindings(
&self,
) -> std::result::Result<HashMap<String, Vec<Binding>>, Box<dyn std::error::Error>> {
let mut host_bindings = HashMap::new();
let sub = self
.nc
.request_multi(self.gen_subject(INVENTORY_BINDINGS).as_ref(), &[])?;
for msg in sub.timeout_iter(self.timeout) {
let ir: InventoryResponse = serde_json::from_slice(&msg.data)?;
if let InventoryResponse::Bindings { bindings: b, host } = ir {
host_bindings
.entry(host)
.and_modify(|e: &mut Vec<Binding>| e.extend_from_slice(&b))
.or_insert(b.clone());
}
}
Ok(host_bindings)
}
pub fn get_actors(
&self,
) -> std::result::Result<HashMap<String, Vec<Claims<Actor>>>, Box<dyn std::error::Error>> {
let mut host_actors = HashMap::new();
let sub = self
.nc
.request_multi(self.gen_subject(INVENTORY_ACTORS).as_ref(), &[])?;
for msg in sub.timeout_iter(self.timeout) {
let ir: InventoryResponse = serde_json::from_slice(&msg.data)?;
if let InventoryResponse::Actors { host, actors } = ir {
host_actors
.entry(host)
.and_modify(|e: &mut Vec<Claims<Actor>>| e.extend_from_slice(&actors))
.or_insert(actors.clone());
}
}
Ok(host_actors)
}
pub fn get_capabilities(
&self,
) -> std::result::Result<HashMap<String, Vec<HostedCapability>>, Box<dyn std::error::Error>>
{
let mut host_caps = HashMap::new();
let sub = self
.nc
.request_multi(self.gen_subject(INVENTORY_CAPABILITIES).as_ref(), &[])?;
for msg in sub.timeout_iter(self.timeout) {
let ir: InventoryResponse = serde_json::from_slice(&msg.data)?;
if let InventoryResponse::Capabilities { host, capabilities } = ir {
host_caps
.entry(host)
.and_modify(|e: &mut Vec<HostedCapability>| e.extend_from_slice(&capabilities))
.or_insert(capabilities.clone());
}
}
Ok(host_caps)
}
pub fn watch_events(&self, sender: Sender<BusEvent>) -> Result<(), Box<dyn std::error::Error>> {
let _sub = self
.nc
.subscribe(self.gen_subject(EVENTS).as_ref())?
.with_handler(move |msg| {
let ce: CloudEvent = serde_json::from_slice(&msg.data).unwrap();
let be: BusEvent = serde_json::from_str(&ce.data).unwrap();
let _ = sender.send(be);
Ok(())
});
Ok(())
}
pub fn perform_actor_launch_auction(
&self,
actor_id: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<LaunchAuctionResponse>, Box<dyn std::error::Error>> {
let mut results = vec![];
let req = LaunchAuctionRequest::new(actor_id, constraints);
let sub = self.nc.request_multi(
self.gen_subject(&format!(
"{}.{}",
controlplane::CPLANE_PREFIX,
controlplane::AUCTION_REQ
))
.as_ref(),
&serde_json::to_vec(&req)?,
)?;
for msg in sub.timeout_iter(Duration::from_secs(AUCTION_TIMEOUT_SECONDS)) {
let resp: LaunchAuctionResponse = serde_json::from_slice(&msg.data)?;
results.push(resp);
}
Ok(results)
}
pub fn perform_provider_launch_auction(
&self,
provider_ref: &str,
binding_name: &str,
constraints: HashMap<String, String>,
) -> Result<Vec<ProviderAuctionResponse>, Box<dyn std::error::Error>> {
let mut results = vec![];
let req = ProviderAuctionRequest::new(provider_ref, binding_name, constraints);
let sub = self.nc.request_multi(
self.gen_subject(&format!(
"{}.{}",
controlplane::CPLANE_PREFIX,
controlplane::PROVIDER_AUCTION_REQ
))
.as_ref(),
&serde_json::to_vec(&req)?,
)?;
for msg in sub.timeout_iter(Duration::from_secs(AUCTION_TIMEOUT_SECONDS)) {
let resp: ProviderAuctionResponse = serde_json::from_slice(&msg.data)?;
results.push(resp);
}
Ok(results)
}
pub fn launch_provider_on_host(
&self,
provider_ref: &str,
host_id: &str,
binding_name: &str,
) -> Result<ProviderLaunchAck, Box<dyn std::error::Error>> {
let msg = LaunchProviderCommand {
provider_ref: provider_ref.to_string(),
binding_name: binding_name.to_string(),
};
let ack: ProviderLaunchAck = serde_json::from_slice(
&self
.nc
.request_timeout(
&self.gen_launch_provider_subject(host_id),
&serde_json::to_vec(&msg)?,
self.timeout,
)?
.data,
)?;
Ok(ack)
}
pub fn launch_actor_on_host(
&self,
actor_id: &str,
host_id: &str,
) -> Result<LaunchAck, Box<dyn std::error::Error>> {
let msg = LaunchCommand {
actor_id: actor_id.to_string(),
};
let ack: LaunchAck = serde_json::from_slice(
&self
.nc
.request_timeout(
&self.gen_launch_actor_subject(host_id),
&serde_json::to_vec(&msg)?,
self.timeout,
)?
.data,
)?;
Ok(ack)
}
pub fn stop_actor_on_host(
&self,
actor_id: &str,
host_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let msg = TerminateCommand {
actor_id: actor_id.to_string(),
};
self.nc.publish(
&self.gen_terminate_actor_subject(host_id),
&serde_json::to_vec(&msg)?,
)?;
let _ = self.nc.flush();
Ok(())
}
fn gen_subject(&self, subject: &str) -> String {
match self.namespace.as_ref() {
Some(s) => format!("{}.wasmbus.{}", s, subject),
None => format!("wasmbus.{}", subject),
}
}
}
fn get_connection(host: &str, credsfile: Option<PathBuf>) -> nats::Connection {
let mut opts = if let Some(creds) = credsfile {
nats::Options::with_credentials(creds)
} else {
nats::Options::new()
};
opts = opts.with_name("waSCC Lattice");
opts.connect(host).unwrap()
}