use crate::capability_binding::{CapabilityRegistry, CapabilityResult};
use crate::context_fabric::{ContextFabric, ContextQuery, TimeWindow};
use crate::thought_stream::{ThoughtEvent, ThoughtEventType};
use crate::trace::{TraceEvent, TraceRecorder, ExecutionTrace};
use crate::types::{ProvenanceChain, StructuredContent, Timestamp};
use crate::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ExecutionState {
Initialized,
Perceiving,
Reasoning,
Assigning,
Executing,
Reflecting,
Completed,
Failed,
Paused,
}
impl ExecutionState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Initialized => "initialized",
Self::Perceiving => "perceiving",
Self::Reasoning => "reasoning",
Self::Assigning => "assigning",
Self::Executing => "executing",
Self::Reflecting => "reflecting",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Paused => "paused",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionConfig {
pub tracing_enabled: bool,
pub auto_finalize: bool,
pub pause_on_failure: bool,
pub require_approval: bool,
pub max_execution_time: Option<u64>,
pub reflection_frequency: usize,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
tracing_enabled: true,
auto_finalize: true,
pause_on_failure: false,
require_approval: false,
max_execution_time: Some(3600), reflection_frequency: 10,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ExecutionIntent {
pub id: Uuid,
pub description: String,
pub priority: u32,
pub created_at: Timestamp,
pub deadline: Option<Timestamp>,
pub context: StructuredContent,
}
impl ExecutionIntent {
pub fn new(description: impl Into<String>, context: StructuredContent) -> Self {
Self {
id: Uuid::new_v4(),
description: description.into(),
priority: 50, created_at: Timestamp::now(),
deadline: None,
context,
}
}
pub fn with_priority(mut self, priority: u32) -> Self {
self.priority = priority.clamp(0, 100);
self
}
pub fn with_deadline(mut self, deadline: Timestamp) -> Self {
self.deadline = Some(deadline);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Subgoal {
pub id: Uuid,
pub parent_id: Uuid,
pub description: String,
pub required_capabilities: Vec<String>,
pub preconditions: Vec<String>,
pub assigned_layer: Option<String>,
pub status: SubgoalStatus,
}
impl Subgoal {
pub fn new(parent_id: Uuid, description: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
parent_id,
description: description.into(),
required_capabilities: Vec::new(),
preconditions: Vec::new(),
assigned_layer: None,
status: SubgoalStatus::Pending,
}
}
pub fn requires(mut self, capability: impl Into<String>) -> Self {
self.required_capabilities.push(capability.into());
self
}
pub fn with_precondition(mut self, precondition: impl Into<String>) -> Self {
self.preconditions.push(precondition.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SubgoalStatus {
Pending,
InProgress,
Completed,
Failed,
Blocked,
RequiresApproval,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ExecutionResult {
pub success: bool,
pub data: StructuredContent,
pub steps_executed: usize,
pub trace_id: Uuid,
pub error: Option<String>,
}
impl ExecutionResult {
pub fn success(data: StructuredContent, steps: usize, trace_id: Uuid) -> Self {
Self {
success: true,
data,
steps_executed: steps,
trace_id,
error: None,
}
}
pub fn failure(error: String, steps: usize, trace_id: Uuid) -> Self {
Self {
success: false,
data: StructuredContent::text(""),
steps_executed: steps,
trace_id,
error: Some(error),
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ExecutionStats {
pub total_steps: usize,
pub completed_steps: usize,
pub failed_steps: usize,
pub avg_step_duration_ms: u64,
pub total_duration_ms: u64,
pub interventions: usize,
}
pub struct ExecutionEngine {
thought_stream: Arc<crate::thought_stream::ThoughtStream>,
capability_registry: Arc<crate::capability_binding::CapabilityRegistry>,
context_fabric: Arc<crate::context_fabric::ContextFabric>,
trace_recorder: Arc<TraceRecorder>,
state: Arc<RwLock<ExecutionState>>,
intents: Arc<RwLock<Vec<ExecutionIntent>>>,
subgoals: Arc<RwLock<Vec<Subgoal>>>,
config: Arc<RwLock<ExecutionConfig>>,
stats: Arc<RwLock<ExecutionStats>>,
}
impl ExecutionEngine {
pub fn new(
thought_stream: Arc<crate::thought_stream::ThoughtStream>,
capability_registry: Arc<crate::capability_binding::CapabilityRegistry>,
context_fabric: Arc<crate::context_fabric::ContextFabric>,
) -> Self {
let trace_recorder = Arc::new(TraceRecorder::new("execution_engine"));
Self {
thought_stream: thought_stream.clone(),
capability_registry,
context_fabric,
trace_recorder,
state: Arc::new(RwLock::new(ExecutionState::Initialized)),
intents: Arc::new(RwLock::new(Vec::new())),
subgoals: Arc::new(RwLock::new(Vec::new())),
config: Arc::new(RwLock::new(ExecutionConfig::default())),
stats: Arc::new(RwLock::new(ExecutionStats::default())),
}
}
pub async fn execute(&self, intent: ExecutionIntent) -> Result<ExecutionResult> {
let start_time = std::time::Instant::now();
*self.state.write().await = ExecutionState::Perceiving;
self.ingest_intent(&intent).await?;
*self.state.write().await = ExecutionState::Reasoning;
let subgoals = self.decompose_intent(&intent).await?;
*self.state.write().await = ExecutionState::Assigning;
self.assign_layers(&subgoals).await?;
*self.state.write().await = ExecutionState::Executing;
let mut steps = 0;
let mut final_result = StructuredContent::text("");
for subgoal in subgoals {
match self.execute_subgoal(subgoal).await {
Ok(result) => {
steps += 1;
final_result = result;
}
Err(e) => {
*self.state.write().await = ExecutionState::Failed;
let elapsed_ms = start_time.elapsed().as_millis() as u64;
self.update_stats(|s| s.total_duration_ms = elapsed_ms).await;
return Err(e);
}
}
}
*self.state.write().await = ExecutionState::Reflecting;
self.reflect_on_execution(&intent, steps).await?;
*self.state.write().await = ExecutionState::Completed;
let trace = self.trace_recorder.finalize().await;
let trace_id = trace.id;
Ok(ExecutionResult::success(final_result, steps, trace_id))
}
pub async fn ingest_intent(&self, intent: &ExecutionIntent) -> Result<()> {
self.update_stats(|s| s.total_steps += 1).await;
let event = ThoughtEvent::observation(
StructuredContent::json(serde_json::json!({
"intent_id": intent.id,
"description": &intent.description,
"priority": intent.priority,
})),
ProvenanceChain::new(),
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
Ok(())
}
pub async fn decompose_intent(&self, intent: &ExecutionIntent) -> Result<Vec<Subgoal>> {
self.update_stats(|s| s.total_steps += 1).await;
let mut subgoals = Vec::new();
let subgoal = Subgoal::new(intent.id, "Execute main goal")
.requires("execute")
.with_precondition("intent_validated");
subgoals.push(subgoal);
let mut stored_subgoals = self.subgoals.write().await;
stored_subgoals.extend(subgoals.clone());
let event = ThoughtEvent::intention(
StructuredContent::json(serde_json::json!({
"intent_id": intent.id,
"subgoals_count": subgoals.len(),
})),
ProvenanceChain::new(),
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
Ok(subgoals)
}
pub async fn assign_layers(&self, subgoals: &[Subgoal]) -> Result<()> {
self.update_stats(|s| s.total_steps += 1).await;
for subgoal in subgoals {
let event = ThoughtEvent::reflection(
StructuredContent::json(serde_json::json!({
"subgoal_id": subgoal.id,
"assigned_layer": "SI",
})),
ProvenanceChain::new(),
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
}
Ok(())
}
pub async fn execute_subgoal(&self, mut subgoal: Subgoal) -> Result<StructuredContent> {
self.update_stats(|s| s.completed_steps += 1).await;
subgoal.status = SubgoalStatus::InProgress;
if let Some(cap_name) = subgoal.required_capabilities.first() {
let input = StructuredContent::json(serde_json::json!({
"subgoal_id": subgoal.id,
"description": subgoal.description,
}));
match self.capability_registry.invoke(cap_name, input).await {
Ok(result) => {
subgoal.status = SubgoalStatus::Completed;
let event = ThoughtEvent::new(
ThoughtEventType::Success,
StructuredContent::json(serde_json::json!({
"subgoal_id": subgoal.id,
"result": "completed",
})),
ProvenanceChain::new(),
1.0,
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
Ok(result.data)
}
Err(e) => {
subgoal.status = SubgoalStatus::Failed;
let event = ThoughtEvent::new(
ThoughtEventType::Warning,
StructuredContent::json(serde_json::json!({
"subgoal_id": subgoal.id,
"error": e.to_string(),
})),
ProvenanceChain::new(),
1.0,
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
Err(e)
}
}
} else {
Ok(StructuredContent::text("No capabilities required"))
}
}
pub async fn reflect_on_execution(&self, intent: &ExecutionIntent, steps: usize) -> Result<()> {
self.update_stats(|s| {
s.total_duration_ms = 0; }).await;
let window = TimeWindow::new(intent.created_at, Timestamp::now());
let query = ContextQuery::recall("execution.", window);
if let Ok(result) = self.context_fabric.query(query).await {
let reflection = serde_json::json!({
"intent_id": intent.id,
"steps_executed": steps,
"thoughts_count": result.events.len(),
"success": true,
});
let event = ThoughtEvent::reflection(
StructuredContent::json(reflection),
ProvenanceChain::new(),
);
self.thought_stream.emit(event.clone()).unwrap();
self.trace_recorder.record_thought(event).await;
}
Ok(())
}
pub async fn state(&self) -> ExecutionState {
*self.state.read().await
}
pub async fn stats(&self) -> ExecutionStats {
self.stats.read().await.clone()
}
async fn update_stats<F>(&self, updater: F)
where
F: FnOnce(&mut ExecutionStats),
{
let mut stats = self.stats.write().await;
updater(&mut stats);
}
pub fn trace_recorder(&self) -> Arc<TraceRecorder> {
self.trace_recorder.clone()
}
pub async fn intents(&self) -> Vec<ExecutionIntent> {
self.intents.read().await.clone()
}
pub async fn subgoals(&self) -> Vec<Subgoal> {
self.subgoals.read().await.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_execution_intent_creation() {
let context = StructuredContent::text("test context");
let intent = ExecutionIntent::new("test intent", context);
assert_eq!(intent.description, "test intent");
assert!(intent.deadline.is_none());
}
#[test]
fn test_execution_state() {
assert_eq!(ExecutionState::Initialized.as_str(), "initialized");
assert_eq!(ExecutionState::Executing.as_str(), "executing");
}
#[test]
fn test_subgoal_creation() {
let parent_id = Uuid::new_v4();
let subgoal = Subgoal::new(parent_id, "test subgoal");
assert_eq!(subgoal.description, "test subgoal");
assert_eq!(subgoal.status, SubgoalStatus::Pending);
}
#[test]
fn test_execution_result() {
let result = ExecutionResult::success(
StructuredContent::text("success"),
5,
Uuid::new_v4(),
);
assert!(result.success);
assert_eq!(result.steps_executed, 5);
}
}