bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! RuntimeAdapter v1
//!
//! RuntimeAdapter defines the uniform execution contract between Bzzz orchestration
//! and concrete runtime environments.
//!
//! Required operations:
//! - create: Create a new execution context
//! - execute: Execute an Agent or Swarm
//! - status: Query execution status
//! - destroy: Clean up execution resources
//!
//! All local, Docker, HTTP, and future cloud runtimes must conform to the same contract.
//! Any implementation that weakens lifecycle semantics for convenience fails the Bzzz architecture.

use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{AgentSpec, ArtifactId, ExecutionHandle, Run, RunError, RunId, RunStatus, RuntimeKind};

/// Execution context for a runtime
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionContext {
    /// Unique identifier for this context
    pub id: String,
    /// Runtime kind
    pub runtime_kind: RuntimeKind,
    /// Working directory
    pub working_dir: Option<std::path::PathBuf>,
    /// Environment variables
    pub env: std::collections::HashMap<String, String>,
    /// Resource limits
    pub limits: ResourceLimits,
}

impl ExecutionContext {
    /// Create a new execution context
    pub fn new(id: impl Into<String>, runtime_kind: RuntimeKind) -> Self {
        ExecutionContext {
            id: id.into(),
            runtime_kind,
            working_dir: None,
            env: std::collections::HashMap::new(),
            limits: ResourceLimits::default(),
        }
    }

    /// Set working directory
    pub fn with_working_dir(mut self, path: std::path::PathBuf) -> Self {
        self.working_dir = Some(path);
        self
    }

    /// Add an environment variable
    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.env.insert(key.into(), value.into());
        self
    }

    /// Set resource limits
    pub fn with_limits(mut self, limits: ResourceLimits) -> Self {
        self.limits = limits;
        self
    }
}

/// Resource limits for execution
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ResourceLimits {
    /// Maximum CPU cores (0 = unlimited)
    pub max_cpu_cores: u32,
    /// Maximum memory in bytes (0 = unlimited)
    pub max_memory_bytes: u64,
    /// Maximum execution time in seconds (0 = unlimited)
    pub max_time_seconds: u64,
}

/// Execution result from a runtime
#[derive(Debug, Clone)]
pub struct ExecutionResult {
    /// Run ID
    pub run_id: RunId,
    /// Final status
    pub status: RunStatus,
    /// Output artifacts
    pub artifacts: Vec<ArtifactId>,
    /// Error if failed
    pub error: Option<RunError>,
    /// Execution metrics
    pub metrics: ExecutionMetrics,
    /// Output data (JSON value for parameter substitution)
    ///
    /// This field contains the worker's output data that can be used
    /// for parameter substitution in subsequent steps.
    /// For example, `{{steps.worker_name.output.field}}`.
    pub output: Option<Value>,
}

impl ExecutionResult {
    /// Create a new ExecutionResult with default values
    pub fn new(run_id: RunId, status: RunStatus) -> Self {
        ExecutionResult {
            run_id,
            status,
            artifacts: Vec::new(),
            error: None,
            metrics: ExecutionMetrics::default(),
            output: None,
        }
    }

    /// Create a completed result
    pub fn completed(run_id: RunId) -> Self {
        ExecutionResult::new(run_id, RunStatus::Completed)
    }

    /// Create a failed result
    pub fn failed(run_id: RunId, error: RunError) -> Self {
        ExecutionResult {
            run_id,
            status: RunStatus::Failed,
            artifacts: Vec::new(),
            error: Some(error),
            metrics: ExecutionMetrics::default(),
            output: None,
        }
    }

    /// Create a cancelled result
    pub fn cancelled(run_id: RunId, reason: String) -> Self {
        ExecutionResult {
            run_id,
            status: RunStatus::Cancelled,
            artifacts: Vec::new(),
            error: Some(RunError::Cancelled { reason }),
            metrics: ExecutionMetrics::default(),
            output: None,
        }
    }

    /// Set artifacts
    pub fn with_artifacts(mut self, artifacts: Vec<ArtifactId>) -> Self {
        self.artifacts = artifacts;
        self
    }

    /// Set error
    pub fn with_error(mut self, error: RunError) -> Self {
        self.error = Some(error);
        self
    }

    /// Set metrics
    pub fn with_metrics(mut self, metrics: ExecutionMetrics) -> Self {
        self.metrics = metrics;
        self
    }

    /// Set output
    pub fn with_output(mut self, output: Value) -> Self {
        self.output = Some(output);
        self
    }

    /// Set exposed output (semantic alias for with_output)
    ///
    /// Use this method when the output is the result of expose mapping resolution.
    /// This provides clearer intent when working with SwarmFile capability exposure.
    pub fn with_exposed_output(mut self, output: Value) -> Self {
        self.output = Some(output);
        self
    }
}

/// Execution metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExecutionMetrics {
    /// Wall clock time in milliseconds
    pub wall_time_ms: u64,
    /// CPU time in milliseconds
    pub cpu_time_ms: u64,
    /// Peak memory usage in bytes
    pub peak_memory_bytes: u64,
    /// Number of retries
    pub retries: u32,
    /// Selection trace for dynamic worker selection (P2-5)
    ///
    /// Records the decision process for worker selection:
    /// - Expression evaluated
    /// - Resolved worker name
    /// - Fallback used (if applicable)
    ///
    /// Example: "expr={{input.type}} resolved=processor_a fallback=none"
    pub selection_trace: Option<String>,
}

/// Status query result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusResult {
    /// Run ID
    pub run_id: RunId,
    /// Current status
    pub status: RunStatus,
    /// Current step (if running)
    pub current_step: Option<String>,
    /// Progress percentage (0-100)
    pub progress: u8,
    /// Elapsed time in milliseconds
    pub elapsed_ms: u64,
    /// Output artifacts so far
    pub artifacts: Vec<ArtifactId>,
}

/// RuntimeAdapter trait - the uniform execution contract
///
/// All runtime implementations (Local, Docker, HTTP, Cloud) must implement this trait.
#[async_trait::async_trait]
pub trait RuntimeAdapter: Send + Sync {
    /// Get the runtime kind
    fn kind(&self) -> RuntimeKind;

    /// Create an execution context
    async fn create(&self, spec: &AgentSpec) -> Result<ExecutionContext, RunError>;

    /// Execute an Agent in the given context (blocking)
    async fn execute(&self, ctx: &ExecutionContext, run: &Run)
        -> Result<ExecutionHandle, RunError>;

    /// Execute in background (non-blocking)
    ///
    /// Returns immediately with a handle. Use status() or wait() to check progress.
    async fn execute_background(
        &self,
        ctx: &ExecutionContext,
        run: &Run,
    ) -> Result<ExecutionHandle, RunError>;

    /// Query execution status
    async fn status(&self, handle: &ExecutionHandle) -> Result<StatusResult, RunError>;

    /// Destroy an execution context
    async fn destroy(&self, ctx: &ExecutionContext) -> Result<(), RunError>;

    /// Cancel a running execution
    async fn cancel(&self, handle: &ExecutionHandle) -> Result<(), RunError>;

    /// Wait for execution to complete
    async fn wait(&self, handle: &ExecutionHandle) -> Result<ExecutionResult, RunError>;
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_execution_context() {
        let ctx = ExecutionContext::new("test-ctx", RuntimeKind::Local)
            .with_working_dir(std::path::PathBuf::from("/tmp"))
            .with_env("KEY", "value");

        assert_eq!(ctx.id, "test-ctx");
        assert_eq!(ctx.runtime_kind, RuntimeKind::Local);
        assert!(ctx.working_dir.is_some());
        assert_eq!(ctx.env.get("KEY"), Some(&"value".to_string()));
    }

    #[test]
    fn test_resource_limits() {
        let limits = ResourceLimits {
            max_cpu_cores: 4,
            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
            max_time_seconds: 3600,
        };

        assert_eq!(limits.max_cpu_cores, 4);
        assert_eq!(limits.max_memory_bytes, 1024 * 1024 * 1024);
    }
}