use std::fmt;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::context::CapsuleContext;
use crate::error::{CapsuleError, CapsuleResult};
use crate::manifest::CapsuleManifest;
#[derive(Debug, Clone)]
pub enum InterceptResult {
Continue(Vec<u8>),
Final(Vec<u8>),
Deny { reason: String },
}
impl InterceptResult {
pub fn from_guest_bytes(bytes: Vec<u8>) -> Self {
if bytes.is_empty() {
return Self::Continue(Vec::new());
}
match bytes[0] {
0x00 => Self::Continue(bytes[1..].to_vec()),
0x01 => Self::Final(bytes[1..].to_vec()),
0x02 => {
let reason = String::from_utf8_lossy(&bytes[1..]).into_owned();
Self::Deny { reason }
},
_ => Self::Continue(bytes),
}
}
pub fn from_capsule_result(action: &str, data: Option<&str>) -> Self {
let payload = data.map(|d| d.as_bytes().to_vec()).unwrap_or_default();
match action {
"continue" => Self::Continue(payload),
"final" => Self::Final(payload),
"deny" | "abort" => Self::Deny {
reason: data.unwrap_or("denied by interceptor").to_string(),
},
_ => Self::Continue(payload),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct CapsuleId(String);
impl<'de> Deserialize<'de> for CapsuleId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Self::new(s).map_err(serde::de::Error::custom)
}
}
impl CapsuleId {
pub fn new(id: impl Into<String>) -> CapsuleResult<Self> {
let id = id.into();
Self::validate(&id)?;
Ok(Self(id))
}
#[must_use]
pub fn from_static(id: &str) -> Self {
Self(id.to_string())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
fn validate(id: &str) -> CapsuleResult<()> {
if id.is_empty() {
return Err(CapsuleError::UnsupportedEntryPoint(
"capsule id must not be empty".into(),
));
}
if !id
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
{
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"capsule id must contain only lowercase alphanumeric characters and hyphens, got: {id}"
)));
}
Ok(())
}
}
impl fmt::Display for CapsuleId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl AsRef<str> for CapsuleId {
fn as_ref(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadyStatus {
Ready,
Timeout,
Crashed,
}
impl ReadyStatus {
#[must_use]
pub fn is_ready(self) -> bool {
self == Self::Ready
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CapsuleState {
Unloaded,
Loading,
Ready,
Failed(String),
Unloading,
}
#[async_trait]
pub trait Capsule: Send + Sync {
fn id(&self) -> &CapsuleId;
fn manifest(&self) -> &CapsuleManifest;
fn state(&self) -> CapsuleState;
async fn load(&mut self, ctx: &CapsuleContext) -> CapsuleResult<()>;
async fn unload(&mut self) -> CapsuleResult<()>;
fn take_inbound_rx(
&mut self,
) -> Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>> {
None
}
async fn wait_ready(&self, _timeout: std::time::Duration) -> ReadyStatus {
ReadyStatus::Ready
}
async fn invoke_interceptor(
&self,
_action: &str,
_payload: &[u8],
_caller: Option<&astrid_events::ipc::IpcMessage>,
) -> CapsuleResult<InterceptResult> {
Err(CapsuleError::NotSupported(
"interceptors not supported".into(),
))
}
fn check_health(&self) -> CapsuleState {
self.state()
}
fn source_dir(&self) -> Option<&Path> {
None
}
}
pub(crate) struct CompositeCapsule {
id: CapsuleId,
manifest: CapsuleManifest,
state: CapsuleState,
engines: Vec<Box<dyn crate::engine::ExecutionEngine>>,
capsule_dir: Option<PathBuf>,
}
impl CompositeCapsule {
pub(crate) fn new(manifest: CapsuleManifest) -> CapsuleResult<Self> {
let id = CapsuleId::new(manifest.package.name.clone())?;
Ok(Self {
id,
manifest,
state: CapsuleState::Unloaded,
engines: Vec::new(),
capsule_dir: None,
})
}
pub(crate) fn set_source_dir(&mut self, dir: PathBuf) {
self.capsule_dir = Some(dir);
}
pub(crate) fn add_engine(&mut self, engine: Box<dyn crate::engine::ExecutionEngine>) {
self.engines.push(engine);
}
}
#[async_trait]
impl Capsule for CompositeCapsule {
fn id(&self) -> &CapsuleId {
&self.id
}
fn manifest(&self) -> &CapsuleManifest {
&self.manifest
}
fn state(&self) -> CapsuleState {
self.state.clone()
}
async fn load(&mut self, ctx: &CapsuleContext) -> CapsuleResult<()> {
self.state = CapsuleState::Loading;
for engine in &mut self.engines {
if let Err(e) = engine.load(ctx).await {
self.state = CapsuleState::Failed(e.to_string());
return Err(e);
}
}
self.state = CapsuleState::Ready;
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
self.state = CapsuleState::Unloading;
for engine in &mut self.engines {
let _ = engine.unload().await;
}
self.state = CapsuleState::Unloaded;
Ok(())
}
async fn wait_ready(&self, timeout: std::time::Duration) -> ReadyStatus {
let deadline = tokio::time::Instant::now() + timeout;
for engine in &self.engines {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return ReadyStatus::Timeout;
}
let status = engine.wait_ready(remaining).await;
if !status.is_ready() {
return status;
}
}
ReadyStatus::Ready
}
fn take_inbound_rx(
&mut self,
) -> Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>> {
for engine in &mut self.engines {
if let Some(rx) = engine.take_inbound_rx() {
return Some(rx);
}
}
None
}
async fn invoke_interceptor(
&self,
action: &str,
payload: &[u8],
caller: Option<&astrid_events::ipc::IpcMessage>,
) -> CapsuleResult<InterceptResult> {
for engine in &self.engines {
match engine.invoke_interceptor(action, payload, caller).await {
Ok(result) => return Ok(result),
Err(CapsuleError::NotSupported(_)) => continue,
Err(e) => return Err(e),
}
}
Err(CapsuleError::NotSupported(
"no engine supports interceptors".into(),
))
}
fn check_health(&self) -> CapsuleState {
for engine in &self.engines {
let health = engine.check_health();
if let CapsuleState::Failed(_) = &health {
return health;
}
}
self.state.clone()
}
fn source_dir(&self) -> Option<&Path> {
self.capsule_dir.as_deref()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::ExecutionEngine;
use crate::manifest::{CapabilitiesDef, PackageDef};
use async_trait::async_trait;
struct HealthyEngine;
#[async_trait]
impl ExecutionEngine for HealthyEngine {
async fn load(&mut self, _ctx: &crate::context::CapsuleContext) -> CapsuleResult<()> {
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
Ok(())
}
}
struct FailedEngine;
#[async_trait]
impl ExecutionEngine for FailedEngine {
async fn load(&mut self, _ctx: &crate::context::CapsuleContext) -> CapsuleResult<()> {
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
Ok(())
}
fn check_health(&self) -> CapsuleState {
CapsuleState::Failed("engine crashed".into())
}
}
fn test_manifest() -> CapsuleManifest {
CapsuleManifest {
package: PackageDef {
name: "test-capsule".into(),
version: "0.0.1".into(),
description: None,
authors: Vec::new(),
repository: None,
homepage: None,
documentation: None,
license: None,
license_file: None,
readme: None,
keywords: Vec::new(),
categories: Vec::new(),
astrid_version: None,
publish: None,
include: None,
exclude: None,
metadata: None,
},
components: Vec::new(),
imports: std::collections::HashMap::new(),
exports: std::collections::HashMap::new(),
capabilities: CapabilitiesDef::default(),
env: std::collections::HashMap::new(),
context_files: Vec::new(),
commands: Vec::new(),
mcp_servers: Vec::new(),
skills: Vec::new(),
uplinks: Vec::new(),
publishes: ::std::collections::HashMap::new(),
subscribes: ::std::collections::HashMap::new(),
tools: ::std::vec::Vec::new(),
}
}
#[test]
fn composite_check_health_all_healthy() {
let mut capsule = CompositeCapsule::new(test_manifest()).unwrap();
capsule.state = CapsuleState::Ready;
capsule.add_engine(Box::new(HealthyEngine));
capsule.add_engine(Box::new(HealthyEngine));
assert_eq!(capsule.check_health(), CapsuleState::Ready);
}
#[test]
fn composite_check_health_returns_first_failure() {
let mut capsule = CompositeCapsule::new(test_manifest()).unwrap();
capsule.state = CapsuleState::Ready;
capsule.add_engine(Box::new(HealthyEngine));
capsule.add_engine(Box::new(FailedEngine));
assert_eq!(
capsule.check_health(),
CapsuleState::Failed("engine crashed".into())
);
}
#[test]
fn composite_check_health_no_engines_returns_state() {
let mut capsule = CompositeCapsule::new(test_manifest()).unwrap();
capsule.state = CapsuleState::Ready;
assert_eq!(capsule.check_health(), CapsuleState::Ready);
}
struct SlowEngine;
#[async_trait]
impl ExecutionEngine for SlowEngine {
async fn load(&mut self, _ctx: &crate::context::CapsuleContext) -> CapsuleResult<()> {
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
Ok(())
}
async fn wait_ready(&self, timeout: std::time::Duration) -> ReadyStatus {
tokio::time::sleep(timeout).await;
ReadyStatus::Timeout
}
}
#[tokio::test]
async fn composite_wait_ready_first_engine_timeout_starves_second() {
let mut capsule = CompositeCapsule::new(test_manifest()).unwrap();
capsule.add_engine(Box::new(SlowEngine));
capsule.add_engine(Box::new(HealthyEngine));
let status = capsule
.wait_ready(std::time::Duration::from_millis(50))
.await;
assert_eq!(status, ReadyStatus::Timeout);
}
#[tokio::test]
async fn composite_wait_ready_all_healthy() {
let mut capsule = CompositeCapsule::new(test_manifest()).unwrap();
capsule.add_engine(Box::new(HealthyEngine));
capsule.add_engine(Box::new(HealthyEngine));
let status = capsule
.wait_ready(std::time::Duration::from_millis(100))
.await;
assert_eq!(status, ReadyStatus::Ready);
}
}