#![warn(clippy::pedantic)]
#![forbid(clippy::unwrap_used)]
pub mod chunking;
pub mod logging;
use logging::Level;
use core::fmt;
use std::collections::HashMap;
use anyhow::{anyhow, bail, ensure, Context};
use nkeys::{KeyPair, KeyPairType};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize, Serializer};
use sha2::{Digest, Sha256};
use ulid::Ulid;
use uuid::Uuid;
use wascap::jwt;
use wascap::prelude::Claims;
pub type ActorLinks = Vec<LinkDefinition>;
pub type ClusterIssuerKey = String;
pub type ClusterIssuers = Vec<ClusterIssuerKey>;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct HealthCheckRequest {}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct HealthCheckResponse {
#[serde(default)]
pub healthy: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct HostData {
#[serde(default)]
pub host_id: String,
#[serde(default)]
pub lattice_rpc_prefix: String,
#[serde(default)]
pub link_name: String,
#[serde(default)]
pub lattice_rpc_user_jwt: String,
#[serde(default)]
pub lattice_rpc_user_seed: String,
#[serde(default)]
pub lattice_rpc_url: String,
#[serde(default)]
pub provider_key: String,
#[serde(default)]
pub invocation_seed: String,
#[serde(
serialize_with = "serialize_wit_map",
deserialize_with = "deserialize_wit_map"
)]
pub env_values: HostEnvValues,
#[serde(default)]
pub instance_id: String,
pub link_definitions: ActorLinks,
pub cluster_issuers: ClusterIssuers,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_json: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_rpc_timeout_ms: Option<u64>,
#[serde(default)]
pub structured_logging: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log_level: Option<Level>,
pub otel_config: OtelConfig,
}
pub type HostEnvValues = WitMap<String>;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct OtelConfig {
pub traces_exporter: Option<String>,
pub exporter_otlp_endpoint: Option<String>,
}
pub fn invocation_hash(
target_url: impl AsRef<str>,
origin_url: impl AsRef<str>,
op: impl AsRef<str>,
msg: impl AsRef<[u8]>,
) -> String {
let mut hash = Sha256::default();
hash.update(origin_url.as_ref());
hash.update(target_url.as_ref());
hash.update(op.as_ref());
hash.update(msg.as_ref());
hex::encode_upper(hash.finalize())
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Invocation {
pub origin: WasmCloudEntity,
pub target: WasmCloudEntity,
#[serde(default)]
pub operation: String,
#[serde(with = "serde_bytes")]
#[serde(default)]
pub msg: Vec<u8>,
#[serde(default)]
pub id: String,
#[serde(default)]
pub encoded_claims: String,
#[serde(default)]
pub host_id: String,
pub content_length: u64,
#[serde(rename = "traceContext")]
#[serde(
default,
serialize_with = "serialize_wit_map",
deserialize_with = "deserialize_wit_map"
)]
pub trace_context: TraceContext,
}
impl Invocation {
#[allow(clippy::missing_errors_doc)] pub fn new(
cluster_key: &KeyPair,
host_key: &KeyPair,
origin: WasmCloudEntity,
target: WasmCloudEntity,
operation: impl Into<String>,
msg: Vec<u8>,
trace_context: TraceContext,
) -> anyhow::Result<Invocation> {
let operation = operation.into();
let (_, operation) = operation
.rsplit_once('/')
.context("failed to parse operation")?;
let id = Uuid::from_u128(Ulid::new().into()).to_string();
let target_url = format!("{}/{operation}", target.url());
let claims = jwt::Claims::<jwt::Invocation>::new(
cluster_key.public_key(),
id.to_string(),
&target_url,
&origin.url(),
&invocation_hash(&target_url, origin.url(), operation, &msg),
);
let encoded_claims = claims
.encode(cluster_key)
.context("failed to encode claims")?;
let operation = operation.to_string();
Ok(Invocation {
content_length: msg.len() as _,
origin,
target,
operation,
msg,
id,
encoded_claims,
host_id: host_key.public_key(),
trace_context,
})
}
#[must_use]
pub fn origin_url(&self) -> String {
self.origin.url()
}
#[must_use]
pub fn target_url(&self) -> String {
format!("{}/{}", self.target.url(), self.operation)
}
#[must_use]
pub fn hash(&self) -> String {
invocation_hash(
self.target_url(),
self.origin_url(),
&self.operation,
&self.msg,
)
}
#[allow(clippy::missing_errors_doc)] pub fn validate_antiforgery(&self, valid_issuers: &[String]) -> anyhow::Result<()> {
match KeyPair::from_public_key(&self.host_id) {
Ok(kp) if kp.key_pair_type() == KeyPairType::Server => (),
_ => bail!("invalid host ID on invocation: '{}'", self.host_id),
}
let token_validation =
jwt::validate_token::<wascap::prelude::Invocation>(&self.encoded_claims)
.map_err(|e| anyhow!(e))?;
ensure!(!token_validation.expired, "invocation claims token expired");
ensure!(
!token_validation.cannot_use_yet,
"attempt to use invocation before claims token allows"
);
ensure!(
token_validation.signature_valid,
"invocation claims signature invalid"
);
let claims = Claims::<wascap::prelude::Invocation>::decode(&self.encoded_claims)
.map_err(|e| anyhow!(e))?;
ensure!(
valid_issuers.contains(&claims.issuer),
"issuer of this invocation is not among the list of valid issuers"
);
let inv_claims = claims
.metadata
.context("no wascap metadata found on claims")?;
ensure!(
inv_claims.target_url == self.target_url(),
"invocation claims and invocation target URL do not match"
);
ensure!(
inv_claims.origin_url == self.origin_url(),
"invocation claims and invocation origin URL do not match"
);
if !self.msg.is_empty() && inv_claims.invocation_hash != self.hash() {
bail!(
"invocation hash does not match signed claims hash ({} / {})",
inv_claims.invocation_hash,
self.hash()
);
}
Ok(())
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct InvocationResponse {
#[serde(with = "serde_bytes")]
#[serde(default)]
pub msg: Vec<u8>,
#[serde(default)]
pub invocation_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub content_length: u64,
#[serde(rename = "traceContext")]
#[serde(
default,
serialize_with = "serialize_wit_map",
deserialize_with = "deserialize_wit_map"
)]
pub trace_context: TraceContext,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct LinkDefinition {
#[serde(default)]
pub actor_id: String,
#[serde(default)]
pub provider_id: String,
#[serde(default)]
pub link_name: String,
#[serde(default)]
pub contract_id: String,
#[serde(
serialize_with = "serialize_wit_map",
deserialize_with = "deserialize_wit_map"
)]
pub values: LinkSettings,
}
pub type LinkSettings = WitMap<String>;
pub type TraceContext = WitMap<String>;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct WasmCloudEntity {
#[serde(default)]
pub public_key: String,
#[serde(default)]
pub link_name: String,
#[serde(default)]
pub contract_id: String,
}
impl WasmCloudEntity {
#[must_use]
pub fn url(&self) -> String {
if self.public_key.to_uppercase().starts_with('M') {
format!("wasmbus://{}", self.public_key)
} else {
format!(
"wasmbus://{}/{}/{}",
self.contract_id
.replace(':', "/")
.replace(' ', "_")
.to_lowercase(),
self.link_name.replace(' ', "_").to_lowercase(),
self.public_key
)
}
}
#[must_use]
pub fn is_actor(&self) -> bool {
self.link_name.is_empty() || self.contract_id.is_empty()
}
#[must_use]
pub fn is_provider(&self) -> bool {
!self.is_actor()
}
}
impl fmt::Display for WasmCloudEntity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let url = self.url();
write!(f, "{url}")
}
}
pub trait HealthCheck {
fn health_request(&self) -> HealthCheckResponse;
}
pub type WitMap<T> = Vec<(String, T)>;
fn serialize_wit_map<S: Serializer, T>(map: &WitMap<T>, serializer: S) -> Result<S::Ok, S::Error>
where
T: Serialize,
{
let mut seq = serializer.serialize_map(Some(map.len()))?;
for (key, val) in map {
seq.serialize_entry(key, val)?;
}
seq.end()
}
fn deserialize_wit_map<'de, D: serde::Deserializer<'de>, T>(
deserializer: D,
) -> Result<WitMap<T>, D::Error>
where
T: Deserialize<'de>,
{
let values = HashMap::<String, T>::deserialize(deserializer)?;
Ok(values.into_iter().collect())
}