Skip to main content

symbi_runtime/
lib.rs

1//! Symbiont Agent Runtime System
2//!
3//! The Agent Runtime System is the core orchestration layer of the Symbiont platform,
4//! responsible for managing the complete lifecycle of autonomous agents.
5
6pub mod communication;
7pub mod config;
8pub mod context;
9pub mod crypto;
10pub mod error_handler;
11pub mod integrations;
12pub mod lifecycle;
13pub mod logging;
14pub mod metrics;
15pub mod models;
16pub mod rag;
17pub mod reasoning;
18pub mod resource;
19pub mod routing;
20pub mod sandbox;
21pub mod scheduler;
22pub mod secrets;
23pub mod skills;
24pub mod toolclad;
25pub mod types;
26
27pub mod prelude;
28
29#[cfg(feature = "cli-executor")]
30pub mod cli_executor;
31
32#[cfg(feature = "http-api")]
33pub mod api;
34
35#[cfg(feature = "http-api")]
36use api::traits::RuntimeApiProvider;
37#[cfg(all(feature = "http-api", feature = "cron"))]
38use api::types::ScheduleRunEntry;
39#[cfg(feature = "http-api")]
40use api::types::{
41    AddIdentityMappingRequest, AgentExecutionRecord, AgentStatusResponse, ChannelActionResponse,
42    ChannelAuditResponse, ChannelDetail, ChannelHealthResponse, ChannelSummary, CreateAgentRequest,
43    CreateAgentResponse, CreateScheduleRequest, CreateScheduleResponse, DeleteAgentResponse,
44    DeleteChannelResponse, DeleteScheduleResponse, ExecuteAgentRequest, ExecuteAgentResponse,
45    GetAgentHistoryResponse, IdentityMappingEntry, NextRunsResponse, RegisterChannelRequest,
46    RegisterChannelResponse, ScheduleActionResponse, ScheduleDetail, ScheduleHistoryResponse,
47    ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse, UpdateChannelRequest,
48    UpdateScheduleRequest, WorkflowExecutionRequest,
49};
50#[cfg(feature = "http-api")]
51use async_trait::async_trait;
52
53#[cfg(feature = "http-input")]
54pub mod http_input;
55
56// Re-export commonly used types
57pub use communication::{CommunicationBus, CommunicationConfig, DefaultCommunicationBus};
58pub use config::SecurityConfig;
59pub use context::{ContextManager, ContextManagerConfig, StandardContextManager};
60pub use error_handler::{DefaultErrorHandler, ErrorHandler, ErrorHandlerConfig};
61pub use lifecycle::{DefaultLifecycleController, LifecycleConfig, LifecycleController};
62pub use logging::{LoggingConfig, ModelInteractionType, ModelLogger, RequestData, ResponseData};
63pub use models::{ModelCatalog, ModelCatalogError, SlmRunner, SlmRunnerError};
64pub use resource::{DefaultResourceManager, ResourceManager, ResourceManagerConfig};
65pub use routing::{
66    DefaultRoutingEngine, RouteDecision, RoutingConfig, RoutingContext, RoutingEngine, TaskType,
67};
68pub use sandbox::{E2BSandbox, ExecutionResult, SandboxRunner, SandboxTier};
69#[cfg(feature = "cron")]
70pub use scheduler::{
71    cron_scheduler::{
72        CronMetrics, CronScheduler, CronSchedulerConfig, CronSchedulerError, CronSchedulerHealth,
73    },
74    cron_types::{
75        AuditLevel, CronJobDefinition, CronJobId, CronJobStatus, DeliveryChannel, DeliveryConfig,
76        DeliveryReceipt, JobRunRecord, JobRunStatus,
77    },
78    delivery::{CustomDeliveryHandler, DefaultDeliveryRouter, DeliveryResult, DeliveryRouter},
79    heartbeat::{
80        HeartbeatAssessment, HeartbeatConfig, HeartbeatContextMode, HeartbeatSeverity,
81        HeartbeatState,
82    },
83    job_store::{JobStore, JobStoreError, SqliteJobStore},
84    policy_gate::{
85        PolicyGate, ScheduleContext, SchedulePolicyCondition, SchedulePolicyDecision,
86        SchedulePolicyEffect, SchedulePolicyRule,
87    },
88};
89pub use scheduler::{AgentScheduler, DefaultAgentScheduler, SchedulerConfig};
90pub use secrets::{SecretStore, SecretsConfig};
91pub use types::*;
92
93use std::sync::Arc;
94use tokio::sync::RwLock;
95
96/// Bounded in-memory execution log for agent history tracking.
97///
98/// Stores execution records in a thread-safe ring buffer. When the buffer
99/// reaches capacity, oldest entries are evicted. All operations are O(1)
100/// amortized except `get_history` which is O(n) in the number of records
101/// for the requested agent.
102#[cfg(feature = "http-api")]
103pub struct ExecutionLog {
104    entries: parking_lot::RwLock<std::collections::VecDeque<ExecutionEntry>>,
105    capacity: usize,
106}
107
108#[cfg(feature = "http-api")]
109#[derive(Debug, Clone)]
110struct ExecutionEntry {
111    agent_id: AgentId,
112    execution_id: String,
113    status: String,
114    timestamp: chrono::DateTime<chrono::Utc>,
115}
116
117#[cfg(feature = "http-api")]
118impl ExecutionLog {
119    fn new(capacity: usize) -> Self {
120        Self {
121            entries: parking_lot::RwLock::new(std::collections::VecDeque::with_capacity(capacity)),
122            capacity,
123        }
124    }
125
126    /// Record an execution event for an agent.
127    fn record(&self, agent_id: AgentId, execution_id: &str, status: &str) {
128        let entry = ExecutionEntry {
129            agent_id,
130            execution_id: execution_id.to_string(),
131            status: status.to_string(),
132            timestamp: chrono::Utc::now(),
133        };
134        let mut entries = self.entries.write();
135        if entries.len() >= self.capacity {
136            entries.pop_front();
137        }
138        entries.push_back(entry);
139    }
140
141    /// Retrieve execution history for a specific agent, most recent first.
142    fn get_history(&self, agent_id: AgentId, limit: usize) -> Vec<ExecutionEntry> {
143        let entries = self.entries.read();
144        entries
145            .iter()
146            .rev()
147            .filter(|e| e.agent_id == agent_id)
148            .take(limit)
149            .cloned()
150            .collect()
151    }
152}
153
154/// Main Agent Runtime System
155#[derive(Clone)]
156pub struct AgentRuntime {
157    pub scheduler: Arc<dyn scheduler::AgentScheduler + Send + Sync>,
158    pub lifecycle: Arc<dyn lifecycle::LifecycleController + Send + Sync>,
159    pub resource_manager: Arc<dyn resource::ResourceManager + Send + Sync>,
160    pub communication: Arc<dyn communication::CommunicationBus + Send + Sync>,
161    pub error_handler: Arc<dyn error_handler::ErrorHandler + Send + Sync>,
162    pub context_manager: Arc<dyn context::ContextManager + Send + Sync>,
163    pub model_logger: Option<Arc<logging::ModelLogger>>,
164    pub model_catalog: Option<Arc<models::ModelCatalog>>,
165    /// Stable identity for system-originated messages (API calls, HTTP input).
166    /// Created once at runtime startup and reused for all internal messages
167    /// so audit trails can consistently attribute system actions.
168    pub system_agent_id: AgentId,
169    #[cfg(feature = "cron")]
170    cron_scheduler: Option<Arc<scheduler::cron_scheduler::CronScheduler>>,
171    config: Arc<RwLock<RuntimeConfig>>,
172    /// In-memory execution log for agent history tracking.
173    #[cfg(feature = "http-api")]
174    execution_log: Arc<ExecutionLog>,
175}
176
177impl AgentRuntime {
178    /// Create a new Agent Runtime System instance
179    pub async fn new(config: RuntimeConfig) -> Result<Self, RuntimeError> {
180        let config = Arc::new(RwLock::new(config));
181
182        // Initialize components
183        let scheduler = Arc::new(
184            scheduler::DefaultAgentScheduler::new(config.read().await.scheduler.clone()).await?,
185        );
186
187        let resource_manager = Arc::new(
188            resource::DefaultResourceManager::new(config.read().await.resource_manager.clone())
189                .await?,
190        );
191
192        let communication = Arc::new(
193            communication::DefaultCommunicationBus::new(config.read().await.communication.clone())
194                .await?,
195        );
196
197        let error_handler = Arc::new(
198            error_handler::DefaultErrorHandler::new(config.read().await.error_handler.clone())
199                .await?,
200        );
201
202        let lifecycle_config = lifecycle::LifecycleConfig {
203            max_agents: 1000,
204            initialization_timeout: std::time::Duration::from_secs(30),
205            termination_timeout: std::time::Duration::from_secs(30),
206            state_check_interval: std::time::Duration::from_secs(10),
207            enable_auto_recovery: true,
208            max_restart_attempts: 3,
209        };
210        let lifecycle =
211            Arc::new(lifecycle::DefaultLifecycleController::new(lifecycle_config).await?);
212
213        let context_manager = Arc::new(
214            context::StandardContextManager::new(
215                config.read().await.context_manager.clone(),
216                "runtime-system",
217            )
218            .await
219            .map_err(|e| {
220                RuntimeError::Internal(format!("Failed to create context manager: {}", e))
221            })?,
222        );
223
224        // Initialize context manager
225        context_manager.initialize().await.map_err(|e| {
226            RuntimeError::Internal(format!("Failed to initialize context manager: {}", e))
227        })?;
228
229        // Initialize model logger if enabled
230        let model_logger = if config.read().await.logging.enabled {
231            // For now, initialize without secret store to avoid type conversion issues
232            match logging::ModelLogger::new(config.read().await.logging.clone(), None) {
233                Ok(logger) => {
234                    tracing::info!("Model logging initialized successfully");
235                    Some(Arc::new(logger))
236                }
237                Err(e) => {
238                    tracing::warn!("Failed to initialize model logger: {}", e);
239                    None
240                }
241            }
242        } else {
243            tracing::info!("Model logging is disabled");
244            None
245        };
246
247        // Initialize model catalog if SLM is enabled
248        let model_catalog = if let Some(ref slm_config) = config.read().await.slm {
249            if slm_config.enabled {
250                match models::ModelCatalog::new(slm_config.clone()) {
251                    Ok(catalog) => {
252                        tracing::info!(
253                            "Model catalog initialized with {} models",
254                            catalog.list_models().len()
255                        );
256                        Some(Arc::new(catalog))
257                    }
258                    Err(e) => {
259                        tracing::warn!("Failed to initialize model catalog: {}", e);
260                        None
261                    }
262                }
263            } else {
264                tracing::info!("SLM support is disabled");
265                None
266            }
267        } else {
268            tracing::info!("No SLM configuration provided");
269            None
270        };
271
272        Ok(Self {
273            scheduler,
274            lifecycle,
275            resource_manager,
276            communication,
277            error_handler,
278            context_manager,
279            model_logger,
280            model_catalog,
281            system_agent_id: AgentId::new(),
282            #[cfg(feature = "cron")]
283            cron_scheduler: None,
284            config,
285            #[cfg(feature = "http-api")]
286            execution_log: Arc::new(ExecutionLog::new(10_000)),
287        })
288    }
289
290    /// Attach a CronScheduler to the runtime so schedule APIs become functional.
291    #[cfg(feature = "cron")]
292    pub fn with_cron_scheduler(
293        mut self,
294        cron: Arc<scheduler::cron_scheduler::CronScheduler>,
295    ) -> Self {
296        self.cron_scheduler = Some(cron);
297        self
298    }
299
300    /// Get the current runtime configuration
301    pub async fn get_config(&self) -> RuntimeConfig {
302        self.config.read().await.clone()
303    }
304
305    /// Update the runtime configuration
306    pub async fn update_config(&self, config: RuntimeConfig) -> Result<(), RuntimeError> {
307        *self.config.write().await = config;
308        Ok(())
309    }
310
311    /// Shutdown the runtime system gracefully
312    pub async fn shutdown(&self) -> Result<(), RuntimeError> {
313        tracing::info!("Starting Agent Runtime shutdown sequence");
314
315        // Shutdown components in reverse order of initialization
316        self.lifecycle
317            .shutdown()
318            .await
319            .map_err(RuntimeError::Lifecycle)?;
320        self.communication
321            .shutdown()
322            .await
323            .map_err(RuntimeError::Communication)?;
324        self.resource_manager
325            .shutdown()
326            .await
327            .map_err(RuntimeError::Resource)?;
328        self.scheduler
329            .shutdown()
330            .await
331            .map_err(RuntimeError::Scheduler)?;
332        self.error_handler
333            .shutdown()
334            .await
335            .map_err(RuntimeError::ErrorHandler)?;
336
337        // Shutdown context manager last to ensure all contexts are saved
338        self.context_manager.shutdown().await.map_err(|e| {
339            RuntimeError::Internal(format!("Context manager shutdown failed: {}", e))
340        })?;
341
342        tracing::info!("Agent Runtime shutdown completed successfully");
343        Ok(())
344    }
345
346    /// Get system status
347    pub async fn get_status(&self) -> SystemStatus {
348        self.scheduler.get_system_status().await
349    }
350}
351
352/// Runtime configuration
353#[derive(Debug, Clone, Default)]
354pub struct RuntimeConfig {
355    pub scheduler: scheduler::SchedulerConfig,
356    pub resource_manager: resource::ResourceManagerConfig,
357    pub communication: communication::CommunicationConfig,
358    pub context_manager: context::ContextManagerConfig,
359    pub security: SecurityConfig,
360    pub audit: AuditConfig,
361    pub error_handler: error_handler::ErrorHandlerConfig,
362    pub logging: logging::LoggingConfig,
363    pub slm: Option<config::Slm>,
364    pub routing: Option<routing::RoutingConfig>,
365}
366
367/// Implementation of RuntimeApiProvider for AgentRuntime
368#[cfg(feature = "http-api")]
369#[async_trait]
370#[allow(unused_variables)] // Params used inside #[cfg(feature = "cron")] blocks
371impl RuntimeApiProvider for AgentRuntime {
372    async fn execute_workflow(
373        &self,
374        request: WorkflowExecutionRequest,
375    ) -> Result<serde_json::Value, RuntimeError> {
376        tracing::info!("Executing workflow: {}", request.workflow_id);
377
378        // Step 1: Parse the workflow DSL and extract metadata (before any await)
379        let workflow_dsl = &request.workflow_id; // For now, treat workflow_id as DSL source
380        let (metadata, agent_config) = {
381            let parsed_tree = dsl::parse_dsl(workflow_dsl)
382                .map_err(|e| RuntimeError::Internal(format!("DSL parsing failed: {}", e)))?;
383
384            // Extract metadata from the parsed workflow
385            let metadata = dsl::extract_metadata(&parsed_tree, workflow_dsl);
386
387            // Check for parsing errors
388            let root_node = parsed_tree.root_node();
389            if root_node.has_error() {
390                return Err(RuntimeError::Internal(
391                    "DSL contains syntax errors".to_string(),
392                ));
393            }
394
395            // Create agent configuration from the workflow
396            let agent_id = request.agent_id.unwrap_or_default();
397            let agent_config = AgentConfig {
398                id: agent_id,
399                name: metadata
400                    .get("name")
401                    .cloned()
402                    .unwrap_or_else(|| "workflow_agent".to_string()),
403                dsl_source: workflow_dsl.to_string(),
404                execution_mode: ExecutionMode::Ephemeral,
405                security_tier: SecurityTier::Tier1,
406                resource_limits: ResourceLimits::default(),
407                capabilities: vec![Capability::Computation], // Basic capability for workflow execution
408                policies: vec![],
409                metadata: metadata.clone(),
410                priority: Priority::Normal,
411            };
412
413            (metadata, agent_config)
414        };
415
416        // Step 2: Schedule the agent for execution
417        let scheduled_agent_id = self
418            .scheduler
419            .schedule_agent(agent_config)
420            .await
421            .map_err(RuntimeError::Scheduler)?;
422
423        // Step 3: Wait briefly and check initial status (simple implementation)
424        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
425
426        // Step 4: Collect basic execution information
427        let system_status = self.scheduler.get_system_status().await;
428
429        // Step 5: Prepare and return result
430        let mut result = serde_json::json!({
431            "status": "success",
432            "workflow_id": request.workflow_id,
433            "agent_id": scheduled_agent_id.to_string(),
434            "execution_started": true,
435            "metadata": metadata,
436            "system_status": {
437                "total_agents": system_status.total_agents,
438                "running_agents": system_status.running_agents,
439                "resource_utilization": {
440                    "memory_used": system_status.resource_utilization.memory_used,
441                    "cpu_utilization": system_status.resource_utilization.cpu_utilization,
442                    "disk_io_rate": system_status.resource_utilization.disk_io_rate,
443                    "network_io_rate": system_status.resource_utilization.network_io_rate
444                }
445            }
446        });
447
448        // Add parameters if provided
449        if !request.parameters.is_null() {
450            result["parameters"] = request.parameters;
451        }
452
453        // Record execution in history log
454        self.execution_log.record(
455            scheduled_agent_id,
456            &scheduled_agent_id.to_string(),
457            "workflow_started",
458        );
459
460        tracing::info!(
461            "Workflow execution initiated for agent: {}",
462            scheduled_agent_id
463        );
464        Ok(result)
465    }
466
467    async fn get_agent_status(
468        &self,
469        agent_id: AgentId,
470    ) -> Result<AgentStatusResponse, RuntimeError> {
471        let external_state = self.scheduler.get_external_agent_state(agent_id);
472        let agent_config = self.scheduler.get_agent_config(agent_id);
473
474        match self.scheduler.get_agent_status(agent_id).await {
475            Ok(agent_status) => {
476                // Convert SystemTime to DateTime<Utc>
477                let last_activity =
478                    chrono::DateTime::<chrono::Utc>::from(agent_status.last_activity);
479
480                let execution_mode_label = agent_config.as_ref().map(|c| match &c.execution_mode {
481                    crate::types::agent::ExecutionMode::External { .. } => "External".to_string(),
482                    crate::types::agent::ExecutionMode::Persistent => "Persistent".to_string(),
483                    crate::types::agent::ExecutionMode::Ephemeral => "Ephemeral".to_string(),
484                    crate::types::agent::ExecutionMode::Scheduled { .. } => "Scheduled".to_string(),
485                    crate::types::agent::ExecutionMode::CronScheduled { .. } => {
486                        "CronScheduled".to_string()
487                    }
488                    crate::types::agent::ExecutionMode::EventDriven => "EventDriven".to_string(),
489                });
490
491                let (metadata, last_result, recent_events) = if let Some(ref ext) = external_state {
492                    (
493                        Some(ext.metadata.clone()),
494                        ext.last_result.clone(),
495                        Some(ext.events.iter().cloned().collect::<Vec<_>>()),
496                    )
497                } else {
498                    (None, None, None)
499                };
500
501                Ok(AgentStatusResponse {
502                    agent_id: agent_status.agent_id,
503                    state: agent_status.state,
504                    last_activity,
505                    resource_usage: api::types::ResourceUsage {
506                        memory_bytes: agent_status.memory_usage,
507                        cpu_percent: agent_status.cpu_usage,
508                        active_tasks: agent_status.active_tasks,
509                    },
510                    metadata,
511                    last_result,
512                    recent_events,
513                    execution_mode: execution_mode_label,
514                })
515            }
516            Err(scheduler_error) => {
517                tracing::warn!(
518                    "Failed to get agent status for {}: {}",
519                    agent_id,
520                    scheduler_error
521                );
522                Err(RuntimeError::Internal(format!(
523                    "Agent {} not found",
524                    agent_id
525                )))
526            }
527        }
528    }
529
530    async fn get_system_health(&self) -> Result<serde_json::Value, RuntimeError> {
531        // Check health of all components
532        let scheduler_health =
533            self.scheduler.check_health().await.map_err(|e| {
534                RuntimeError::Internal(format!("Scheduler health check failed: {}", e))
535            })?;
536
537        let lifecycle_health =
538            self.lifecycle.check_health().await.map_err(|e| {
539                RuntimeError::Internal(format!("Lifecycle health check failed: {}", e))
540            })?;
541
542        let resource_health = self.resource_manager.check_health().await.map_err(|e| {
543            RuntimeError::Internal(format!("Resource manager health check failed: {}", e))
544        })?;
545
546        let communication_health = self.communication.check_health().await.map_err(|e| {
547            RuntimeError::Internal(format!("Communication health check failed: {}", e))
548        })?;
549
550        // Determine overall system status
551        let component_healths = vec![
552            ("scheduler", &scheduler_health),
553            ("lifecycle", &lifecycle_health),
554            ("resource_manager", &resource_health),
555            ("communication", &communication_health),
556        ];
557
558        let overall_status = if component_healths
559            .iter()
560            .all(|(_, h)| h.status == HealthStatus::Healthy)
561        {
562            "healthy"
563        } else if component_healths
564            .iter()
565            .any(|(_, h)| h.status == HealthStatus::Unhealthy)
566        {
567            "unhealthy"
568        } else {
569            "degraded"
570        };
571
572        // Build response with detailed component information
573        let mut components = serde_json::Map::new();
574        for (name, health) in component_healths {
575            let component_info = serde_json::json!({
576                "status": match health.status {
577                    HealthStatus::Healthy => "healthy",
578                    HealthStatus::Degraded => "degraded",
579                    HealthStatus::Unhealthy => "unhealthy",
580                },
581                "message": health.message,
582                "last_check": health.last_check
583                    .duration_since(std::time::UNIX_EPOCH)
584                    .unwrap_or_default()
585                    .as_secs(),
586                "uptime_seconds": health.uptime.as_secs(),
587                "metrics": health.metrics
588            });
589            components.insert(name.to_string(), component_info);
590        }
591
592        Ok(serde_json::json!({
593            "status": overall_status,
594            "timestamp": std::time::SystemTime::now()
595                .duration_since(std::time::UNIX_EPOCH)
596                .unwrap_or_default()
597                .as_secs(),
598            "components": components
599        }))
600    }
601
602    async fn list_agents(&self) -> Result<Vec<AgentId>, RuntimeError> {
603        Ok(self.scheduler.list_agents().await)
604    }
605
606    async fn list_agents_detailed(
607        &self,
608    ) -> Result<Vec<crate::api::types::AgentSummary>, RuntimeError> {
609        let ids = self.scheduler.list_agents().await;
610        let mut summaries = Vec::new();
611        for id in ids {
612            let name = self
613                .scheduler
614                .get_agent_config(id)
615                .map(|c| c.name)
616                .unwrap_or_default();
617            let state = self
618                .scheduler
619                .get_agent_status(id)
620                .await
621                .map(|s| s.state)
622                .unwrap_or(AgentState::Created);
623            summaries.push(crate::api::types::AgentSummary { id, name, state });
624        }
625        Ok(summaries)
626    }
627
628    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), RuntimeError> {
629        self.scheduler
630            .shutdown_agent(agent_id)
631            .await
632            .map_err(RuntimeError::Scheduler)
633    }
634
635    async fn get_metrics(&self) -> Result<serde_json::Value, RuntimeError> {
636        let status = self.get_status().await;
637
638        // Count agents in error/failed state by checking each agent's status
639        let agent_ids = self.scheduler.list_agents().await;
640        let mut error_count: usize = 0;
641        for aid in &agent_ids {
642            if let Ok(agent_status) = self.scheduler.get_agent_status(*aid).await {
643                if agent_status.state == AgentState::Failed {
644                    error_count += 1;
645                }
646            }
647        }
648
649        let idle = status
650            .total_agents
651            .saturating_sub(status.running_agents)
652            .saturating_sub(error_count);
653
654        Ok(serde_json::json!({
655            "agents": {
656                "total": status.total_agents,
657                "running": status.running_agents,
658                "idle": idle,
659                "error": error_count
660            },
661            "system": {
662                "uptime_seconds": status.uptime.as_secs(),
663                "memory_usage": status.resource_utilization.memory_used,
664                "cpu_usage": status.resource_utilization.cpu_utilization
665            }
666        }))
667    }
668
669    async fn create_agent(
670        &self,
671        request: CreateAgentRequest,
672    ) -> Result<CreateAgentResponse, RuntimeError> {
673        // Validate input
674        if request.name.is_empty() {
675            return Err(RuntimeError::Internal(
676                "Agent name cannot be empty".to_string(),
677            ));
678        }
679
680        let execution_mode = request.execution_mode.clone().unwrap_or_default();
681
682        let dsl_source = match &execution_mode {
683            crate::types::agent::ExecutionMode::External { .. } => {
684                request.dsl.clone().unwrap_or_default()
685            }
686            _ => {
687                let dsl = request.dsl.as_deref().unwrap_or("");
688                if dsl.is_empty() {
689                    return Err(RuntimeError::Internal(
690                        "Agent DSL cannot be empty for non-external agents".to_string(),
691                    ));
692                }
693                dsl.to_string()
694            }
695        };
696
697        // Create agent configuration
698        let agent_id = AgentId::new();
699        let agent_config = AgentConfig {
700            id: agent_id,
701            name: request.name,
702            dsl_source,
703            execution_mode,
704            security_tier: SecurityTier::Tier1,
705            resource_limits: ResourceLimits::default(),
706            capabilities: request
707                .capabilities
708                .unwrap_or_default()
709                .into_iter()
710                .map(Capability::Custom)
711                .collect(),
712            policies: vec![],
713            metadata: request.metadata.unwrap_or_default(),
714            priority: Priority::Normal,
715        };
716
717        // Schedule the agent for execution
718        let scheduled_agent_id = self
719            .scheduler
720            .schedule_agent(agent_config)
721            .await
722            .map_err(RuntimeError::Scheduler)?;
723
724        tracing::info!("Created and scheduled agent: {}", scheduled_agent_id);
725
726        // Record creation in execution log
727        self.execution_log.record(
728            scheduled_agent_id,
729            &scheduled_agent_id.to_string(),
730            "created",
731        );
732
733        Ok(CreateAgentResponse {
734            id: scheduled_agent_id.to_string(),
735            status: "scheduled".to_string(),
736        })
737    }
738
739    async fn update_agent(
740        &self,
741        agent_id: AgentId,
742        request: UpdateAgentRequest,
743    ) -> Result<UpdateAgentResponse, RuntimeError> {
744        // Validate that at least one field is provided for update
745        if request.name.is_none() && request.dsl.is_none() {
746            return Err(RuntimeError::Internal(
747                "At least one field (name or dsl) must be provided for update".to_string(),
748            ));
749        }
750
751        // Validate optional fields if provided
752        if let Some(ref name) = request.name {
753            if name.is_empty() {
754                return Err(RuntimeError::Internal(
755                    "Agent name cannot be empty".to_string(),
756                ));
757            }
758        }
759
760        if let Some(ref dsl) = request.dsl {
761            if dsl.is_empty() {
762                return Err(RuntimeError::Internal(
763                    "Agent DSL cannot be empty".to_string(),
764                ));
765            }
766        }
767
768        // Call the scheduler to update the agent
769        self.scheduler
770            .update_agent(agent_id, request)
771            .await
772            .map_err(RuntimeError::Scheduler)?;
773
774        tracing::info!("Successfully updated agent: {}", agent_id);
775
776        Ok(UpdateAgentResponse {
777            id: agent_id.to_string(),
778            status: "updated".to_string(),
779        })
780    }
781
782    async fn delete_agent(&self, agent_id: AgentId) -> Result<DeleteAgentResponse, RuntimeError> {
783        self.scheduler
784            .delete_agent(agent_id)
785            .await
786            .map_err(RuntimeError::Scheduler)?;
787
788        Ok(DeleteAgentResponse {
789            id: agent_id.to_string(),
790            status: "deleted".to_string(),
791        })
792    }
793
794    async fn execute_agent(
795        &self,
796        agent_id: AgentId,
797        request: ExecuteAgentRequest,
798    ) -> Result<ExecuteAgentResponse, RuntimeError> {
799        // Ensure the agent exists in the registry
800        if !self.scheduler.has_agent(agent_id) {
801            return Err(RuntimeError::Internal(format!(
802                "Agent {} not found",
803                agent_id
804            )));
805        }
806
807        // Re-schedule from stored config if the agent isn't currently active
808        let status = self.get_agent_status(agent_id).await?;
809        if status.state == AgentState::Completed {
810            if let Some(config) = self.scheduler.get_agent_config(agent_id) {
811                self.scheduler
812                    .schedule_agent(config)
813                    .await
814                    .map_err(RuntimeError::Scheduler)?;
815            }
816        } else if status.state != AgentState::Running {
817            self.lifecycle
818                .start_agent(agent_id)
819                .await
820                .map_err(RuntimeError::Lifecycle)?;
821        }
822        let execution_id = uuid::Uuid::new_v4().to_string();
823        let payload_data: bytes::Bytes = serde_json::to_vec(&request)
824            .map_err(|e| RuntimeError::Internal(e.to_string()))?
825            .into();
826        let message = self.communication.create_internal_message(
827            self.system_agent_id,
828            agent_id,
829            payload_data,
830            types::MessageType::Direct(agent_id),
831            std::time::Duration::from_secs(300),
832        );
833        self.communication
834            .send_message(message)
835            .await
836            .map_err(RuntimeError::Communication)?;
837
838        // Record execution in the history log
839        #[cfg(feature = "http-api")]
840        self.execution_log
841            .record(agent_id, &execution_id, "execution_started");
842
843        Ok(ExecuteAgentResponse {
844            execution_id,
845            status: "execution_started".to_string(),
846        })
847    }
848
849    async fn get_agent_history(
850        &self,
851        agent_id: AgentId,
852    ) -> Result<GetAgentHistoryResponse, RuntimeError> {
853        let entries = self.execution_log.get_history(agent_id, 100);
854        let history = entries
855            .into_iter()
856            .map(|e| AgentExecutionRecord {
857                execution_id: e.execution_id,
858                status: e.status,
859                timestamp: e.timestamp.to_rfc3339(),
860            })
861            .collect();
862        Ok(GetAgentHistoryResponse { history })
863    }
864
865    // ── Schedule endpoints ──────────────────────────────────────────
866
867    async fn list_schedules(&self) -> Result<Vec<ScheduleSummary>, RuntimeError> {
868        #[cfg(feature = "cron")]
869        if let Some(ref cron) = self.cron_scheduler {
870            let jobs = cron
871                .list_jobs()
872                .await
873                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
874            return Ok(jobs
875                .into_iter()
876                .map(|j| ScheduleSummary {
877                    job_id: j.job_id.to_string(),
878                    name: j.name,
879                    cron_expression: j.cron_expression,
880                    timezone: j.timezone,
881                    status: format!("{:?}", j.status),
882                    enabled: j.enabled,
883                    next_run: j.next_run.map(|t| t.to_rfc3339()),
884                    run_count: j.run_count,
885                })
886                .collect());
887        }
888        Ok(vec![])
889    }
890
891    async fn create_schedule(
892        &self,
893        request: CreateScheduleRequest,
894    ) -> Result<CreateScheduleResponse, RuntimeError> {
895        #[cfg(feature = "cron")]
896        if let Some(ref cron) = self.cron_scheduler {
897            use scheduler::cron_types::{CronJobDefinition, CronJobId};
898            let now = chrono::Utc::now();
899            let tz = if request.timezone.is_empty() {
900                "UTC".to_string()
901            } else {
902                request.timezone
903            };
904            let agent_config = types::AgentConfig {
905                id: types::AgentId::new(),
906                name: request.agent_name,
907                dsl_source: String::new(),
908                execution_mode: Default::default(),
909                security_tier: Default::default(),
910                resource_limits: Default::default(),
911                capabilities: Vec::new(),
912                policies: Vec::new(),
913                metadata: Default::default(),
914                priority: Default::default(),
915            };
916            let job = CronJobDefinition {
917                job_id: CronJobId::new(),
918                name: request.name,
919                cron_expression: request.cron_expression,
920                timezone: tz,
921                agent_config,
922                policy_ids: request.policy_ids,
923                audit_level: Default::default(),
924                status: scheduler::cron_types::CronJobStatus::Active,
925                enabled: true,
926                one_shot: request.one_shot,
927                created_at: now,
928                updated_at: now,
929                last_run: None,
930                next_run: None,
931                run_count: 0,
932                failure_count: 0,
933                max_retries: 3,
934                max_concurrent: 1,
935                delivery_config: None,
936                jitter_max_secs: 0,
937                session_mode: Default::default(),
938                agentpin_jwt: None,
939            };
940            let job_id = cron
941                .add_job(job)
942                .await
943                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
944            // Retrieve the saved job to get the computed next_run.
945            let saved = cron
946                .get_job(job_id)
947                .await
948                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
949            return Ok(CreateScheduleResponse {
950                job_id: job_id.to_string(),
951                next_run: saved.next_run.map(|t| t.to_rfc3339()),
952                status: "created".to_string(),
953            });
954        }
955        Err(RuntimeError::Internal(
956            "Schedule API requires a running CronScheduler".to_string(),
957        ))
958    }
959
960    async fn get_schedule(&self, job_id: &str) -> Result<ScheduleDetail, RuntimeError> {
961        #[cfg(feature = "cron")]
962        if let Some(ref cron) = self.cron_scheduler {
963            let id: scheduler::cron_types::CronJobId = job_id
964                .parse()
965                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
966            let j = cron
967                .get_job(id)
968                .await
969                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
970            return Ok(ScheduleDetail {
971                job_id: j.job_id.to_string(),
972                name: j.name,
973                cron_expression: j.cron_expression,
974                timezone: j.timezone,
975                status: format!("{:?}", j.status),
976                enabled: j.enabled,
977                one_shot: j.one_shot,
978                next_run: j.next_run.map(|t| t.to_rfc3339()),
979                last_run: j.last_run.map(|t| t.to_rfc3339()),
980                run_count: j.run_count,
981                failure_count: j.failure_count,
982                created_at: j.created_at.to_rfc3339(),
983                updated_at: j.updated_at.to_rfc3339(),
984            });
985        }
986        Err(RuntimeError::Internal(
987            "Schedule API requires a running CronScheduler".to_string(),
988        ))
989    }
990
991    async fn update_schedule(
992        &self,
993        job_id: &str,
994        request: UpdateScheduleRequest,
995    ) -> Result<ScheduleDetail, RuntimeError> {
996        #[cfg(feature = "cron")]
997        if let Some(ref cron) = self.cron_scheduler {
998            let id: scheduler::cron_types::CronJobId = job_id
999                .parse()
1000                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1001            let mut job = cron
1002                .get_job(id)
1003                .await
1004                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1005            if let Some(expr) = request.cron_expression {
1006                job.cron_expression = expr;
1007            }
1008            if let Some(tz) = request.timezone {
1009                job.timezone = tz;
1010            }
1011            if let Some(pids) = request.policy_ids {
1012                job.policy_ids = pids;
1013            }
1014            if let Some(one_shot) = request.one_shot {
1015                job.one_shot = one_shot;
1016            }
1017            cron.update_job(job)
1018                .await
1019                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1020            return self.get_schedule(job_id).await;
1021        }
1022        Err(RuntimeError::Internal(
1023            "Schedule API requires a running CronScheduler".to_string(),
1024        ))
1025    }
1026
1027    async fn delete_schedule(&self, job_id: &str) -> Result<DeleteScheduleResponse, RuntimeError> {
1028        #[cfg(feature = "cron")]
1029        if let Some(ref cron) = self.cron_scheduler {
1030            let id: scheduler::cron_types::CronJobId = job_id
1031                .parse()
1032                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1033            cron.remove_job(id)
1034                .await
1035                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1036            return Ok(DeleteScheduleResponse {
1037                job_id: job_id.to_string(),
1038                deleted: true,
1039            });
1040        }
1041        Err(RuntimeError::Internal(
1042            "Schedule API requires a running CronScheduler".to_string(),
1043        ))
1044    }
1045
1046    async fn pause_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1047        #[cfg(feature = "cron")]
1048        if let Some(ref cron) = self.cron_scheduler {
1049            let id: scheduler::cron_types::CronJobId = job_id
1050                .parse()
1051                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1052            cron.pause_job(id)
1053                .await
1054                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1055            return Ok(ScheduleActionResponse {
1056                job_id: job_id.to_string(),
1057                action: "pause".to_string(),
1058                status: "paused".to_string(),
1059            });
1060        }
1061        Err(RuntimeError::Internal(
1062            "Schedule API requires a running CronScheduler".to_string(),
1063        ))
1064    }
1065
1066    async fn resume_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1067        #[cfg(feature = "cron")]
1068        if let Some(ref cron) = self.cron_scheduler {
1069            let id: scheduler::cron_types::CronJobId = job_id
1070                .parse()
1071                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1072            cron.resume_job(id)
1073                .await
1074                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1075            return Ok(ScheduleActionResponse {
1076                job_id: job_id.to_string(),
1077                action: "resume".to_string(),
1078                status: "active".to_string(),
1079            });
1080        }
1081        Err(RuntimeError::Internal(
1082            "Schedule API requires a running CronScheduler".to_string(),
1083        ))
1084    }
1085
1086    async fn trigger_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1087        #[cfg(feature = "cron")]
1088        if let Some(ref cron) = self.cron_scheduler {
1089            let id: scheduler::cron_types::CronJobId = job_id
1090                .parse()
1091                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1092            cron.trigger_now(id)
1093                .await
1094                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1095            return Ok(ScheduleActionResponse {
1096                job_id: job_id.to_string(),
1097                action: "trigger".to_string(),
1098                status: "triggered".to_string(),
1099            });
1100        }
1101        Err(RuntimeError::Internal(
1102            "Schedule API requires a running CronScheduler".to_string(),
1103        ))
1104    }
1105
1106    async fn get_schedule_history(
1107        &self,
1108        job_id: &str,
1109        limit: usize,
1110    ) -> Result<ScheduleHistoryResponse, RuntimeError> {
1111        #[cfg(feature = "cron")]
1112        if let Some(ref cron) = self.cron_scheduler {
1113            let id: scheduler::cron_types::CronJobId = job_id
1114                .parse()
1115                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1116            let runs = cron
1117                .get_run_history(id, limit)
1118                .await
1119                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1120            return Ok(ScheduleHistoryResponse {
1121                job_id: job_id.to_string(),
1122                history: runs
1123                    .into_iter()
1124                    .map(|r| ScheduleRunEntry {
1125                        run_id: r.run_id.to_string(),
1126                        started_at: r.started_at.to_rfc3339(),
1127                        completed_at: r.completed_at.map(|t| t.to_rfc3339()),
1128                        status: format!("{:?}", r.status),
1129                        error: r.error,
1130                        execution_time_ms: r.execution_time_ms,
1131                    })
1132                    .collect(),
1133            });
1134        }
1135        Err(RuntimeError::Internal(
1136            "Schedule API requires a running CronScheduler".to_string(),
1137        ))
1138    }
1139
1140    async fn get_schedule_next_runs(
1141        &self,
1142        job_id: &str,
1143        count: usize,
1144    ) -> Result<NextRunsResponse, RuntimeError> {
1145        #[cfg(feature = "cron")]
1146        if let Some(ref cron) = self.cron_scheduler {
1147            let id: scheduler::cron_types::CronJobId = job_id
1148                .parse()
1149                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1150            let job = cron
1151                .get_job(id)
1152                .await
1153                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1154            let runs = cron
1155                .get_next_runs(&job.cron_expression, &job.timezone, count)
1156                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1157            return Ok(NextRunsResponse {
1158                job_id: job_id.to_string(),
1159                next_runs: runs.into_iter().map(|t| t.to_rfc3339()).collect(),
1160            });
1161        }
1162        Err(RuntimeError::Internal(
1163            "Schedule API requires a running CronScheduler".to_string(),
1164        ))
1165    }
1166
1167    async fn get_scheduler_health(
1168        &self,
1169    ) -> Result<api::types::SchedulerHealthResponse, RuntimeError> {
1170        #[cfg(feature = "cron")]
1171        if let Some(ref cron) = self.cron_scheduler {
1172            let h = cron
1173                .check_health()
1174                .await
1175                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1176            let m = cron.metrics();
1177            return Ok(api::types::SchedulerHealthResponse {
1178                is_running: h.is_running,
1179                store_accessible: h.store_accessible,
1180                jobs_total: h.jobs_total,
1181                jobs_active: h.jobs_active,
1182                jobs_paused: h.jobs_paused,
1183                jobs_dead_letter: h.jobs_dead_letter,
1184                global_active_runs: h.global_active_runs,
1185                max_concurrent: h.max_concurrent,
1186                runs_total: m.runs_total,
1187                runs_succeeded: m.runs_succeeded,
1188                runs_failed: m.runs_failed,
1189                average_execution_time_ms: m.average_execution_time_ms,
1190                longest_run_ms: m.longest_run_ms,
1191            });
1192        }
1193        // No CronScheduler — return a minimal response.
1194        Ok(api::types::SchedulerHealthResponse {
1195            is_running: false,
1196            store_accessible: false,
1197            jobs_total: 0,
1198            jobs_active: 0,
1199            jobs_paused: 0,
1200            jobs_dead_letter: 0,
1201            global_active_runs: 0,
1202            max_concurrent: 0,
1203            runs_total: 0,
1204            runs_succeeded: 0,
1205            runs_failed: 0,
1206            average_execution_time_ms: 0.0,
1207            longest_run_ms: 0,
1208        })
1209    }
1210
1211    // ── Channel endpoints ──────────────────────────────────────────
1212
1213    async fn list_channels(&self) -> Result<Vec<ChannelSummary>, RuntimeError> {
1214        // No ChannelAdapterManager — return empty list so dashboard renders gracefully
1215        Ok(vec![])
1216    }
1217
1218    async fn register_channel(
1219        &self,
1220        _request: RegisterChannelRequest,
1221    ) -> Result<RegisterChannelResponse, RuntimeError> {
1222        Err(RuntimeError::Internal(
1223            "Channel API requires a running ChannelAdapterManager".to_string(),
1224        ))
1225    }
1226
1227    async fn get_channel(&self, _id: &str) -> Result<ChannelDetail, RuntimeError> {
1228        Err(RuntimeError::Internal(
1229            "Channel API requires a running ChannelAdapterManager".to_string(),
1230        ))
1231    }
1232
1233    async fn update_channel(
1234        &self,
1235        _id: &str,
1236        _request: UpdateChannelRequest,
1237    ) -> Result<ChannelDetail, RuntimeError> {
1238        Err(RuntimeError::Internal(
1239            "Channel API requires a running ChannelAdapterManager".to_string(),
1240        ))
1241    }
1242
1243    async fn delete_channel(&self, _id: &str) -> Result<DeleteChannelResponse, RuntimeError> {
1244        Err(RuntimeError::Internal(
1245            "Channel API requires a running ChannelAdapterManager".to_string(),
1246        ))
1247    }
1248
1249    async fn start_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1250        Err(RuntimeError::Internal(
1251            "Channel API requires a running ChannelAdapterManager".to_string(),
1252        ))
1253    }
1254
1255    async fn stop_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1256        Err(RuntimeError::Internal(
1257            "Channel API requires a running ChannelAdapterManager".to_string(),
1258        ))
1259    }
1260
1261    async fn get_channel_health(&self, _id: &str) -> Result<ChannelHealthResponse, RuntimeError> {
1262        Err(RuntimeError::Internal(
1263            "Channel API requires a running ChannelAdapterManager".to_string(),
1264        ))
1265    }
1266
1267    async fn list_channel_mappings(
1268        &self,
1269        _id: &str,
1270    ) -> Result<Vec<IdentityMappingEntry>, RuntimeError> {
1271        Err(RuntimeError::Internal(
1272            "Channel identity mappings require enterprise edition".to_string(),
1273        ))
1274    }
1275
1276    async fn add_channel_mapping(
1277        &self,
1278        _id: &str,
1279        _request: AddIdentityMappingRequest,
1280    ) -> Result<IdentityMappingEntry, RuntimeError> {
1281        Err(RuntimeError::Internal(
1282            "Channel identity mappings require enterprise edition".to_string(),
1283        ))
1284    }
1285
1286    async fn remove_channel_mapping(&self, _id: &str, _user_id: &str) -> Result<(), RuntimeError> {
1287        Err(RuntimeError::Internal(
1288            "Channel identity mappings require enterprise edition".to_string(),
1289        ))
1290    }
1291
1292    async fn get_channel_audit(
1293        &self,
1294        _id: &str,
1295        _limit: usize,
1296    ) -> Result<ChannelAuditResponse, RuntimeError> {
1297        Err(RuntimeError::Internal(
1298            "Channel audit log requires enterprise edition".to_string(),
1299        ))
1300    }
1301
1302    // ── External agent endpoints ─────────────────────────────────────
1303
1304    async fn update_agent_heartbeat(
1305        &self,
1306        agent_id: AgentId,
1307        heartbeat: api::types::HeartbeatRequest,
1308    ) -> Result<(), RuntimeError> {
1309        let ext_agents = self.scheduler.external_agents();
1310        let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1311            RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1312        })?;
1313
1314        entry.last_heartbeat = Some(chrono::Utc::now());
1315        entry.reported_state = heartbeat.state;
1316
1317        if let Some(metadata) = heartbeat.metadata {
1318            entry.metadata.extend(metadata);
1319        }
1320        if let Some(last_result) = heartbeat.last_result {
1321            entry.last_result = Some(last_result);
1322        }
1323
1324        Ok(())
1325    }
1326
1327    async fn push_agent_event(
1328        &self,
1329        agent_id: AgentId,
1330        event: api::types::PushEventRequest,
1331    ) -> Result<(), RuntimeError> {
1332        let ext_agents = self.scheduler.external_agents();
1333        let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1334            RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1335        })?;
1336
1337        entry.push_event(api::types::AgentEvent {
1338            event_type: event.event_type,
1339            payload: event.payload,
1340            timestamp: chrono::Utc::now(),
1341        });
1342
1343        Ok(())
1344    }
1345
1346    async fn check_unreachable_agents(&self) {
1347        self.scheduler.check_unreachable_agents();
1348    }
1349}