Skip to main content

symbi_runtime/
lib.rs

1//! Symbiont Agent Runtime System
2//!
3//! The Agent Runtime System is the core orchestration layer of the Symbiont platform,
4//! responsible for managing the complete lifecycle of autonomous agents.
5
6pub mod communication;
7pub mod config;
8pub mod context;
9pub mod crypto;
10pub mod env;
11pub mod error_handler;
12pub mod integrations;
13pub mod lifecycle;
14pub mod logging;
15pub mod metrics;
16pub mod models;
17pub mod net_guard;
18pub mod rag;
19pub mod reasoning;
20pub mod resource;
21pub mod routing;
22pub mod sandbox;
23pub mod scheduler;
24pub mod secrets;
25pub mod skills;
26pub mod toolclad;
27pub mod types;
28
29pub mod prelude;
30
31#[cfg(feature = "cli-executor")]
32pub mod cli_executor;
33
34#[cfg(feature = "http-api")]
35pub mod api;
36
37#[cfg(feature = "http-api")]
38use api::traits::RuntimeApiProvider;
39#[cfg(all(feature = "http-api", feature = "cron"))]
40use api::types::ScheduleRunEntry;
41#[cfg(feature = "http-api")]
42use api::types::{
43    AddIdentityMappingRequest, AgentExecutionRecord, AgentStatusResponse, ChannelActionResponse,
44    ChannelAuditResponse, ChannelDetail, ChannelHealthResponse, ChannelSummary, CreateAgentRequest,
45    CreateAgentResponse, CreateScheduleRequest, CreateScheduleResponse, DeleteAgentResponse,
46    DeleteChannelResponse, DeleteScheduleResponse, ExecuteAgentRequest, ExecuteAgentResponse,
47    GetAgentHistoryResponse, IdentityMappingEntry, NextRunsResponse, RegisterChannelRequest,
48    RegisterChannelResponse, ScheduleActionResponse, ScheduleDetail, ScheduleHistoryResponse,
49    ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse, UpdateChannelRequest,
50    UpdateScheduleRequest, WorkflowExecutionRequest,
51};
52#[cfg(feature = "http-api")]
53use async_trait::async_trait;
54
55#[cfg(feature = "http-input")]
56pub mod http_input;
57
58// Re-export commonly used types
59pub use communication::{CommunicationBus, CommunicationConfig, DefaultCommunicationBus};
60pub use config::SecurityConfig;
61pub use context::{ContextManager, ContextManagerConfig, StandardContextManager};
62pub use error_handler::{DefaultErrorHandler, ErrorHandler, ErrorHandlerConfig};
63pub use lifecycle::{DefaultLifecycleController, LifecycleConfig, LifecycleController};
64pub use logging::{LoggingConfig, ModelInteractionType, ModelLogger, RequestData, ResponseData};
65pub use models::{ModelCatalog, ModelCatalogError, SlmRunner, SlmRunnerError};
66pub use resource::{DefaultResourceManager, ResourceManager, ResourceManagerConfig};
67pub use routing::{
68    DefaultRoutingEngine, RouteDecision, RoutingConfig, RoutingContext, RoutingEngine, TaskType,
69};
70pub use sandbox::{E2BSandbox, ExecutionResult, SandboxRunner, SandboxTier};
71#[cfg(feature = "cron")]
72pub use scheduler::{
73    cron_scheduler::{
74        CronMetrics, CronScheduler, CronSchedulerConfig, CronSchedulerError, CronSchedulerHealth,
75    },
76    cron_types::{
77        AuditLevel, CronJobDefinition, CronJobId, CronJobStatus, DeliveryChannel, DeliveryConfig,
78        DeliveryReceipt, JobRunRecord, JobRunStatus,
79    },
80    delivery::{CustomDeliveryHandler, DefaultDeliveryRouter, DeliveryResult, DeliveryRouter},
81    heartbeat::{
82        HeartbeatAssessment, HeartbeatConfig, HeartbeatContextMode, HeartbeatSeverity,
83        HeartbeatState,
84    },
85    job_store::{JobStore, JobStoreError, SqliteJobStore},
86    policy_gate::{
87        PolicyGate, ScheduleContext, SchedulePolicyCondition, SchedulePolicyDecision,
88        SchedulePolicyEffect, SchedulePolicyRule,
89    },
90};
91pub use scheduler::{AgentScheduler, DefaultAgentScheduler, SchedulerConfig};
92pub use secrets::{SecretStore, SecretsConfig};
93pub use types::*;
94
95use std::sync::Arc;
96use tokio::sync::RwLock;
97
98/// Bounded in-memory execution log for agent history tracking.
99///
100/// Stores execution records in a thread-safe ring buffer. When the buffer
101/// reaches capacity, oldest entries are evicted. All operations are O(1)
102/// amortized except `get_history` which is O(n) in the number of records
103/// for the requested agent.
104#[cfg(feature = "http-api")]
105pub struct ExecutionLog {
106    entries: parking_lot::RwLock<std::collections::VecDeque<ExecutionEntry>>,
107    capacity: usize,
108}
109
110#[cfg(feature = "http-api")]
111#[derive(Debug, Clone)]
112struct ExecutionEntry {
113    agent_id: AgentId,
114    execution_id: String,
115    status: String,
116    timestamp: chrono::DateTime<chrono::Utc>,
117}
118
119#[cfg(feature = "http-api")]
120impl ExecutionLog {
121    fn new(capacity: usize) -> Self {
122        Self {
123            entries: parking_lot::RwLock::new(std::collections::VecDeque::with_capacity(capacity)),
124            capacity,
125        }
126    }
127
128    /// Record an execution event for an agent.
129    fn record(&self, agent_id: AgentId, execution_id: &str, status: &str) {
130        let entry = ExecutionEntry {
131            agent_id,
132            execution_id: execution_id.to_string(),
133            status: status.to_string(),
134            timestamp: chrono::Utc::now(),
135        };
136        let mut entries = self.entries.write();
137        if entries.len() >= self.capacity {
138            entries.pop_front();
139        }
140        entries.push_back(entry);
141    }
142
143    /// Retrieve execution history for a specific agent, most recent first.
144    fn get_history(&self, agent_id: AgentId, limit: usize) -> Vec<ExecutionEntry> {
145        let entries = self.entries.read();
146        entries
147            .iter()
148            .rev()
149            .filter(|e| e.agent_id == agent_id)
150            .take(limit)
151            .cloned()
152            .collect()
153    }
154}
155
156/// Main Agent Runtime System
157#[derive(Clone)]
158pub struct AgentRuntime {
159    pub scheduler: Arc<dyn scheduler::AgentScheduler + Send + Sync>,
160    pub lifecycle: Arc<dyn lifecycle::LifecycleController + Send + Sync>,
161    pub resource_manager: Arc<dyn resource::ResourceManager + Send + Sync>,
162    pub communication: Arc<dyn communication::CommunicationBus + Send + Sync>,
163    pub error_handler: Arc<dyn error_handler::ErrorHandler + Send + Sync>,
164    pub context_manager: Arc<dyn context::ContextManager + Send + Sync>,
165    pub model_logger: Option<Arc<logging::ModelLogger>>,
166    pub model_catalog: Option<Arc<models::ModelCatalog>>,
167    /// Stable identity for system-originated messages (API calls, HTTP input).
168    /// Created once at runtime startup and reused for all internal messages
169    /// so audit trails can consistently attribute system actions.
170    pub system_agent_id: AgentId,
171    /// Optional AgentPin verifier, constructed when
172    /// `RuntimeConfig.agentpin.enabled == true`. When present, the HTTP
173    /// ingress paths (inter-agent messaging, heartbeat, push-event) and
174    /// the cron scheduler require a valid AgentPin JWT whose `sub`/agent
175    /// claims cover the acting agent.
176    pub agentpin_verifier: Option<Arc<dyn integrations::AgentPinVerifier>>,
177    #[cfg(feature = "cron")]
178    cron_scheduler: Option<Arc<scheduler::cron_scheduler::CronScheduler>>,
179    config: Arc<RwLock<RuntimeConfig>>,
180    /// In-memory execution log for agent history tracking.
181    #[cfg(feature = "http-api")]
182    execution_log: Arc<ExecutionLog>,
183}
184
185impl AgentRuntime {
186    /// Create a new Agent Runtime System instance
187    pub async fn new(config: RuntimeConfig) -> Result<Self, RuntimeError> {
188        let config = Arc::new(RwLock::new(config));
189
190        // Initialize components
191        let scheduler = Arc::new(
192            scheduler::DefaultAgentScheduler::new(config.read().await.scheduler.clone()).await?,
193        );
194
195        let resource_manager = Arc::new(
196            resource::DefaultResourceManager::new(config.read().await.resource_manager.clone())
197                .await?,
198        );
199
200        let communication = Arc::new(
201            communication::DefaultCommunicationBus::new(config.read().await.communication.clone())
202                .await?,
203        );
204
205        let error_handler = Arc::new(
206            error_handler::DefaultErrorHandler::new(config.read().await.error_handler.clone())
207                .await?,
208        );
209
210        let lifecycle_config = lifecycle::LifecycleConfig {
211            max_agents: 1000,
212            initialization_timeout: std::time::Duration::from_secs(30),
213            termination_timeout: std::time::Duration::from_secs(30),
214            state_check_interval: std::time::Duration::from_secs(10),
215            enable_auto_recovery: true,
216            max_restart_attempts: 3,
217        };
218        let lifecycle =
219            Arc::new(lifecycle::DefaultLifecycleController::new(lifecycle_config).await?);
220
221        let context_manager = Arc::new(
222            context::StandardContextManager::new(
223                config.read().await.context_manager.clone(),
224                "runtime-system",
225            )
226            .await
227            .map_err(|e| {
228                RuntimeError::Internal(format!("Failed to create context manager: {}", e))
229            })?,
230        );
231
232        // Initialize context manager
233        context_manager.initialize().await.map_err(|e| {
234            RuntimeError::Internal(format!("Failed to initialize context manager: {}", e))
235        })?;
236
237        // Initialize model logger if enabled
238        let model_logger = if config.read().await.logging.enabled {
239            // For now, initialize without secret store to avoid type conversion issues
240            match logging::ModelLogger::new(config.read().await.logging.clone(), None) {
241                Ok(logger) => {
242                    tracing::info!("Model logging initialized successfully");
243                    Some(Arc::new(logger))
244                }
245                Err(e) => {
246                    tracing::warn!("Failed to initialize model logger: {}", e);
247                    None
248                }
249            }
250        } else {
251            tracing::info!("Model logging is disabled");
252            None
253        };
254
255        // Initialize model catalog if SLM is enabled
256        let model_catalog = if let Some(ref slm_config) = config.read().await.slm {
257            if slm_config.enabled {
258                match models::ModelCatalog::new(slm_config.clone()) {
259                    Ok(catalog) => {
260                        tracing::info!(
261                            "Model catalog initialized with {} models",
262                            catalog.list_models().len()
263                        );
264                        Some(Arc::new(catalog))
265                    }
266                    Err(e) => {
267                        tracing::warn!("Failed to initialize model catalog: {}", e);
268                        None
269                    }
270                }
271            } else {
272                tracing::info!("SLM support is disabled");
273                None
274            }
275        } else {
276            tracing::info!("No SLM configuration provided");
277            None
278        };
279
280        // Initialize AgentPin verifier if enabled in config. Failing to
281        // build the verifier is NOT fail-open: we return the error so the
282        // operator sees the misconfiguration at startup rather than
283        // silently running without identity checks.
284        let agentpin_verifier: Option<Arc<dyn integrations::AgentPinVerifier>> = {
285            let cfg = config.read().await;
286            match cfg.agentpin.as_ref() {
287                Some(ap_cfg) if ap_cfg.enabled => {
288                    let verifier = integrations::DefaultAgentPinVerifier::new(ap_cfg.clone())
289                        .map_err(|e| {
290                            RuntimeError::Internal(format!(
291                                "Failed to construct AgentPin verifier: {}",
292                                e
293                            ))
294                        })?;
295                    tracing::info!(
296                        "AgentPin verifier enabled (discovery_mode={:?}, audience={:?})",
297                        ap_cfg.discovery_mode,
298                        ap_cfg.audience
299                    );
300                    Some(Arc::new(verifier) as Arc<dyn integrations::AgentPinVerifier>)
301                }
302                Some(_) => {
303                    tracing::info!("AgentPin configured but disabled; identity checks skipped");
304                    None
305                }
306                None => None,
307            }
308        };
309
310        Ok(Self {
311            scheduler,
312            lifecycle,
313            resource_manager,
314            communication,
315            error_handler,
316            context_manager,
317            model_logger,
318            model_catalog,
319            system_agent_id: AgentId::new(),
320            agentpin_verifier,
321            #[cfg(feature = "cron")]
322            cron_scheduler: None,
323            config,
324            #[cfg(feature = "http-api")]
325            execution_log: Arc::new(ExecutionLog::new(10_000)),
326        })
327    }
328
329    /// Attach a CronScheduler to the runtime so schedule APIs become functional.
330    #[cfg(feature = "cron")]
331    pub fn with_cron_scheduler(
332        mut self,
333        cron: Arc<scheduler::cron_scheduler::CronScheduler>,
334    ) -> Self {
335        self.cron_scheduler = Some(cron);
336        self
337    }
338
339    /// Get the current runtime configuration
340    pub async fn get_config(&self) -> RuntimeConfig {
341        self.config.read().await.clone()
342    }
343
344    /// Update the runtime configuration
345    pub async fn update_config(&self, config: RuntimeConfig) -> Result<(), RuntimeError> {
346        *self.config.write().await = config;
347        Ok(())
348    }
349
350    /// Verify that `jwt` is a valid AgentPin credential covering `agent_id`.
351    ///
352    /// Returns `Ok(())` when no AgentPin verifier is configured on this
353    /// runtime. When a verifier IS configured, `jwt` must be present and
354    /// its `sub` must equal the target agent's UUID string. Used by
355    /// heartbeat, push-event, and any other per-agent ingress path.
356    pub async fn verify_agentpin_for_agent(
357        &self,
358        jwt: Option<&str>,
359        agent_id: AgentId,
360    ) -> Result<(), RuntimeError> {
361        let Some(verifier) = self.agentpin_verifier.as_ref() else {
362            if jwt.is_some() {
363                tracing::warn!("AgentPin JWT supplied but verifier is disabled on this runtime");
364            }
365            return Ok(());
366        };
367        let jwt = jwt.ok_or_else(|| {
368            RuntimeError::Authentication(
369                "AgentPin verification is enabled; agentpin_jwt is required".to_string(),
370            )
371        })?;
372        let result = verifier
373            .verify_credential(jwt)
374            .await
375            .map_err(|e| RuntimeError::Authentication(format!("AgentPin: {}", e)))?;
376        if !result.valid {
377            return Err(RuntimeError::Authentication(format!(
378                "AgentPin credential invalid: {}",
379                result
380                    .error_message
381                    .unwrap_or_else(|| "no reason".to_string())
382            )));
383        }
384        let expected = agent_id.0.to_string();
385        let sub_matches = result
386            .agent_id
387            .as_deref()
388            .map(|sub| sub == expected)
389            .unwrap_or(false);
390        if !sub_matches {
391            return Err(RuntimeError::Authentication(format!(
392                "AgentPin JWT does not cover agent {}: sub={:?}",
393                expected, result.agent_id
394            )));
395        }
396        Ok(())
397    }
398
399    /// Shutdown the runtime system gracefully
400    pub async fn shutdown(&self) -> Result<(), RuntimeError> {
401        tracing::info!("Starting Agent Runtime shutdown sequence");
402
403        // Shutdown components in reverse order of initialization
404        self.lifecycle
405            .shutdown()
406            .await
407            .map_err(RuntimeError::Lifecycle)?;
408        self.communication
409            .shutdown()
410            .await
411            .map_err(RuntimeError::Communication)?;
412        self.resource_manager
413            .shutdown()
414            .await
415            .map_err(RuntimeError::Resource)?;
416        self.scheduler
417            .shutdown()
418            .await
419            .map_err(RuntimeError::Scheduler)?;
420        self.error_handler
421            .shutdown()
422            .await
423            .map_err(RuntimeError::ErrorHandler)?;
424
425        // Shutdown context manager last to ensure all contexts are saved
426        self.context_manager.shutdown().await.map_err(|e| {
427            RuntimeError::Internal(format!("Context manager shutdown failed: {}", e))
428        })?;
429
430        tracing::info!("Agent Runtime shutdown completed successfully");
431        Ok(())
432    }
433
434    /// Get system status
435    pub async fn get_status(&self) -> SystemStatus {
436        self.scheduler.get_system_status().await
437    }
438}
439
440/// Runtime configuration
441#[derive(Debug, Clone, Default)]
442pub struct RuntimeConfig {
443    pub scheduler: scheduler::SchedulerConfig,
444    pub resource_manager: resource::ResourceManagerConfig,
445    pub communication: communication::CommunicationConfig,
446    pub context_manager: context::ContextManagerConfig,
447    pub security: SecurityConfig,
448    pub audit: AuditConfig,
449    pub error_handler: error_handler::ErrorHandlerConfig,
450    pub logging: logging::LoggingConfig,
451    pub slm: Option<config::Slm>,
452    pub routing: Option<routing::RoutingConfig>,
453    /// Optional AgentPin configuration. When `enabled = true`, every
454    /// inter-agent messaging / heartbeat / cron-trigger path requires a
455    /// valid AgentPin JWT whose `sub` covers the acting agent.
456    pub agentpin: Option<crate::integrations::AgentPinConfig>,
457}
458
459/// Implementation of RuntimeApiProvider for AgentRuntime
460#[cfg(feature = "http-api")]
461#[async_trait]
462#[allow(unused_variables)] // Params used inside #[cfg(feature = "cron")] blocks
463impl RuntimeApiProvider for AgentRuntime {
464    async fn execute_workflow(
465        &self,
466        request: WorkflowExecutionRequest,
467    ) -> Result<serde_json::Value, RuntimeError> {
468        tracing::info!("Executing workflow: {}", request.workflow_id);
469
470        // Step 1: Parse the workflow DSL and extract metadata (before any await)
471        let workflow_dsl = &request.workflow_id; // For now, treat workflow_id as DSL source
472        let (metadata, agent_config) = {
473            let parsed_tree = dsl::parse_dsl(workflow_dsl)
474                .map_err(|e| RuntimeError::Internal(format!("DSL parsing failed: {}", e)))?;
475
476            // Extract metadata from the parsed workflow
477            let metadata = dsl::extract_metadata(&parsed_tree, workflow_dsl);
478
479            // Check for parsing errors
480            let root_node = parsed_tree.root_node();
481            if root_node.has_error() {
482                return Err(RuntimeError::Internal(
483                    "DSL contains syntax errors".to_string(),
484                ));
485            }
486
487            // Create agent configuration from the workflow
488            let agent_id = request.agent_id.unwrap_or_default();
489            let agent_config = AgentConfig {
490                id: agent_id,
491                name: metadata
492                    .get("name")
493                    .cloned()
494                    .unwrap_or_else(|| "workflow_agent".to_string()),
495                dsl_source: workflow_dsl.to_string(),
496                execution_mode: ExecutionMode::Ephemeral,
497                security_tier: SecurityTier::Tier1,
498                resource_limits: ResourceLimits::default(),
499                capabilities: vec![Capability::Computation], // Basic capability for workflow execution
500                policies: vec![],
501                metadata: metadata.clone(),
502                priority: Priority::Normal,
503            };
504
505            (metadata, agent_config)
506        };
507
508        // Step 2: Schedule the agent for execution
509        let scheduled_agent_id = self
510            .scheduler
511            .schedule_agent(agent_config)
512            .await
513            .map_err(RuntimeError::Scheduler)?;
514
515        // Step 3: Wait briefly and check initial status (simple implementation)
516        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
517
518        // Step 4: Collect basic execution information
519        let system_status = self.scheduler.get_system_status().await;
520
521        // Step 5: Prepare and return result
522        let mut result = serde_json::json!({
523            "status": "success",
524            "workflow_id": request.workflow_id,
525            "agent_id": scheduled_agent_id.to_string(),
526            "execution_started": true,
527            "metadata": metadata,
528            "system_status": {
529                "total_agents": system_status.total_agents,
530                "running_agents": system_status.running_agents,
531                "resource_utilization": {
532                    "memory_used": system_status.resource_utilization.memory_used,
533                    "cpu_utilization": system_status.resource_utilization.cpu_utilization,
534                    "disk_io_rate": system_status.resource_utilization.disk_io_rate,
535                    "network_io_rate": system_status.resource_utilization.network_io_rate
536                }
537            }
538        });
539
540        // Add parameters if provided
541        if !request.parameters.is_null() {
542            result["parameters"] = request.parameters;
543        }
544
545        // Record execution in history log
546        self.execution_log.record(
547            scheduled_agent_id,
548            &scheduled_agent_id.to_string(),
549            "workflow_started",
550        );
551
552        tracing::info!(
553            "Workflow execution initiated for agent: {}",
554            scheduled_agent_id
555        );
556        Ok(result)
557    }
558
559    async fn get_agent_status(
560        &self,
561        agent_id: AgentId,
562    ) -> Result<AgentStatusResponse, RuntimeError> {
563        let external_state = self.scheduler.get_external_agent_state(agent_id);
564        let agent_config = self.scheduler.get_agent_config(agent_id);
565
566        match self.scheduler.get_agent_status(agent_id).await {
567            Ok(agent_status) => {
568                // Convert SystemTime to DateTime<Utc>
569                let last_activity =
570                    chrono::DateTime::<chrono::Utc>::from(agent_status.last_activity);
571
572                let execution_mode_label = agent_config.as_ref().map(|c| match &c.execution_mode {
573                    crate::types::agent::ExecutionMode::External { .. } => "External".to_string(),
574                    crate::types::agent::ExecutionMode::Persistent => "Persistent".to_string(),
575                    crate::types::agent::ExecutionMode::Ephemeral => "Ephemeral".to_string(),
576                    crate::types::agent::ExecutionMode::Scheduled { .. } => "Scheduled".to_string(),
577                    crate::types::agent::ExecutionMode::CronScheduled { .. } => {
578                        "CronScheduled".to_string()
579                    }
580                    crate::types::agent::ExecutionMode::EventDriven => "EventDriven".to_string(),
581                });
582
583                let (metadata, last_result, recent_events) = if let Some(ref ext) = external_state {
584                    (
585                        Some(ext.metadata.clone()),
586                        ext.last_result.clone(),
587                        Some(ext.events.iter().cloned().collect::<Vec<_>>()),
588                    )
589                } else {
590                    (None, None, None)
591                };
592
593                Ok(AgentStatusResponse {
594                    agent_id: agent_status.agent_id,
595                    state: agent_status.state,
596                    last_activity,
597                    resource_usage: api::types::ResourceUsage {
598                        memory_bytes: agent_status.memory_usage,
599                        cpu_percent: agent_status.cpu_usage,
600                        active_tasks: agent_status.active_tasks,
601                    },
602                    metadata,
603                    last_result,
604                    recent_events,
605                    execution_mode: execution_mode_label,
606                })
607            }
608            Err(scheduler_error) => {
609                tracing::warn!(
610                    "Failed to get agent status for {}: {}",
611                    agent_id,
612                    scheduler_error
613                );
614                Err(RuntimeError::Internal(format!(
615                    "Agent {} not found",
616                    agent_id
617                )))
618            }
619        }
620    }
621
622    async fn get_system_health(&self) -> Result<serde_json::Value, RuntimeError> {
623        // Check health of all components
624        let scheduler_health =
625            self.scheduler.check_health().await.map_err(|e| {
626                RuntimeError::Internal(format!("Scheduler health check failed: {}", e))
627            })?;
628
629        let lifecycle_health =
630            self.lifecycle.check_health().await.map_err(|e| {
631                RuntimeError::Internal(format!("Lifecycle health check failed: {}", e))
632            })?;
633
634        let resource_health = self.resource_manager.check_health().await.map_err(|e| {
635            RuntimeError::Internal(format!("Resource manager health check failed: {}", e))
636        })?;
637
638        let communication_health = self.communication.check_health().await.map_err(|e| {
639            RuntimeError::Internal(format!("Communication health check failed: {}", e))
640        })?;
641
642        // Determine overall system status
643        let component_healths = vec![
644            ("scheduler", &scheduler_health),
645            ("lifecycle", &lifecycle_health),
646            ("resource_manager", &resource_health),
647            ("communication", &communication_health),
648        ];
649
650        let overall_status = if component_healths
651            .iter()
652            .all(|(_, h)| h.status == HealthStatus::Healthy)
653        {
654            "healthy"
655        } else if component_healths
656            .iter()
657            .any(|(_, h)| h.status == HealthStatus::Unhealthy)
658        {
659            "unhealthy"
660        } else {
661            "degraded"
662        };
663
664        // Build response with detailed component information
665        let mut components = serde_json::Map::new();
666        for (name, health) in component_healths {
667            let component_info = serde_json::json!({
668                "status": match health.status {
669                    HealthStatus::Healthy => "healthy",
670                    HealthStatus::Degraded => "degraded",
671                    HealthStatus::Unhealthy => "unhealthy",
672                },
673                "message": health.message,
674                "last_check": health.last_check
675                    .duration_since(std::time::UNIX_EPOCH)
676                    .unwrap_or_default()
677                    .as_secs(),
678                "uptime_seconds": health.uptime.as_secs(),
679                "metrics": health.metrics
680            });
681            components.insert(name.to_string(), component_info);
682        }
683
684        Ok(serde_json::json!({
685            "status": overall_status,
686            "timestamp": std::time::SystemTime::now()
687                .duration_since(std::time::UNIX_EPOCH)
688                .unwrap_or_default()
689                .as_secs(),
690            "components": components
691        }))
692    }
693
694    async fn list_agents(&self) -> Result<Vec<AgentId>, RuntimeError> {
695        Ok(self.scheduler.list_agents().await)
696    }
697
698    async fn list_agents_detailed(
699        &self,
700    ) -> Result<Vec<crate::api::types::AgentSummary>, RuntimeError> {
701        let ids = self.scheduler.list_agents().await;
702        let mut summaries = Vec::new();
703        for id in ids {
704            let name = self
705                .scheduler
706                .get_agent_config(id)
707                .map(|c| c.name)
708                .unwrap_or_default();
709            let state = self
710                .scheduler
711                .get_agent_status(id)
712                .await
713                .map(|s| s.state)
714                .unwrap_or(AgentState::Created);
715            summaries.push(crate::api::types::AgentSummary { id, name, state });
716        }
717        Ok(summaries)
718    }
719
720    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), RuntimeError> {
721        self.scheduler
722            .shutdown_agent(agent_id)
723            .await
724            .map_err(RuntimeError::Scheduler)
725    }
726
727    async fn get_metrics(&self) -> Result<serde_json::Value, RuntimeError> {
728        let status = self.get_status().await;
729
730        // Count agents in error/failed state by checking each agent's status
731        let agent_ids = self.scheduler.list_agents().await;
732        let mut error_count: usize = 0;
733        for aid in &agent_ids {
734            if let Ok(agent_status) = self.scheduler.get_agent_status(*aid).await {
735                if agent_status.state == AgentState::Failed {
736                    error_count += 1;
737                }
738            }
739        }
740
741        let idle = status
742            .total_agents
743            .saturating_sub(status.running_agents)
744            .saturating_sub(error_count);
745
746        Ok(serde_json::json!({
747            "agents": {
748                "total": status.total_agents,
749                "running": status.running_agents,
750                "idle": idle,
751                "error": error_count
752            },
753            "system": {
754                "uptime_seconds": status.uptime.as_secs(),
755                "memory_usage": status.resource_utilization.memory_used,
756                "cpu_usage": status.resource_utilization.cpu_utilization
757            }
758        }))
759    }
760
761    async fn create_agent(
762        &self,
763        request: CreateAgentRequest,
764    ) -> Result<CreateAgentResponse, RuntimeError> {
765        // Validate input
766        if request.name.is_empty() {
767            return Err(RuntimeError::Internal(
768                "Agent name cannot be empty".to_string(),
769            ));
770        }
771
772        let execution_mode = request.execution_mode.clone().unwrap_or_default();
773
774        let dsl_source = match &execution_mode {
775            crate::types::agent::ExecutionMode::External { .. } => {
776                request.dsl.clone().unwrap_or_default()
777            }
778            _ => {
779                let dsl = request.dsl.as_deref().unwrap_or("");
780                if dsl.is_empty() {
781                    return Err(RuntimeError::Internal(
782                        "Agent DSL cannot be empty for non-external agents".to_string(),
783                    ));
784                }
785                dsl.to_string()
786            }
787        };
788
789        // Create agent configuration
790        let agent_id = AgentId::new();
791        let agent_config = AgentConfig {
792            id: agent_id,
793            name: request.name,
794            dsl_source,
795            execution_mode,
796            security_tier: SecurityTier::Tier1,
797            resource_limits: ResourceLimits::default(),
798            capabilities: request
799                .capabilities
800                .unwrap_or_default()
801                .into_iter()
802                .map(Capability::Custom)
803                .collect(),
804            policies: vec![],
805            metadata: request.metadata.unwrap_or_default(),
806            priority: Priority::Normal,
807        };
808
809        // Schedule the agent for execution
810        let scheduled_agent_id = self
811            .scheduler
812            .schedule_agent(agent_config)
813            .await
814            .map_err(RuntimeError::Scheduler)?;
815
816        tracing::info!("Created and scheduled agent: {}", scheduled_agent_id);
817
818        // Record creation in execution log
819        self.execution_log.record(
820            scheduled_agent_id,
821            &scheduled_agent_id.to_string(),
822            "created",
823        );
824
825        Ok(CreateAgentResponse {
826            id: scheduled_agent_id.to_string(),
827            status: "scheduled".to_string(),
828        })
829    }
830
831    async fn update_agent(
832        &self,
833        agent_id: AgentId,
834        request: UpdateAgentRequest,
835    ) -> Result<UpdateAgentResponse, RuntimeError> {
836        // Validate that at least one field is provided for update
837        if request.name.is_none() && request.dsl.is_none() {
838            return Err(RuntimeError::Internal(
839                "At least one field (name or dsl) must be provided for update".to_string(),
840            ));
841        }
842
843        // Validate optional fields if provided
844        if let Some(ref name) = request.name {
845            if name.is_empty() {
846                return Err(RuntimeError::Internal(
847                    "Agent name cannot be empty".to_string(),
848                ));
849            }
850        }
851
852        if let Some(ref dsl) = request.dsl {
853            if dsl.is_empty() {
854                return Err(RuntimeError::Internal(
855                    "Agent DSL cannot be empty".to_string(),
856                ));
857            }
858        }
859
860        // Call the scheduler to update the agent
861        self.scheduler
862            .update_agent(agent_id, request)
863            .await
864            .map_err(RuntimeError::Scheduler)?;
865
866        tracing::info!("Successfully updated agent: {}", agent_id);
867
868        Ok(UpdateAgentResponse {
869            id: agent_id.to_string(),
870            status: "updated".to_string(),
871        })
872    }
873
874    async fn delete_agent(&self, agent_id: AgentId) -> Result<DeleteAgentResponse, RuntimeError> {
875        self.scheduler
876            .delete_agent(agent_id)
877            .await
878            .map_err(RuntimeError::Scheduler)?;
879
880        Ok(DeleteAgentResponse {
881            id: agent_id.to_string(),
882            status: "deleted".to_string(),
883        })
884    }
885
886    async fn execute_agent(
887        &self,
888        agent_id: AgentId,
889        request: ExecuteAgentRequest,
890    ) -> Result<ExecuteAgentResponse, RuntimeError> {
891        // Ensure the agent exists in the registry
892        if !self.scheduler.has_agent(agent_id) {
893            return Err(RuntimeError::Internal(format!(
894                "Agent {} not found",
895                agent_id
896            )));
897        }
898
899        // Re-schedule from stored config if the agent isn't currently active
900        let status = self.get_agent_status(agent_id).await?;
901        if status.state == AgentState::Completed {
902            if let Some(config) = self.scheduler.get_agent_config(agent_id) {
903                self.scheduler
904                    .schedule_agent(config)
905                    .await
906                    .map_err(RuntimeError::Scheduler)?;
907            }
908        } else if status.state != AgentState::Running {
909            self.lifecycle
910                .start_agent(agent_id)
911                .await
912                .map_err(RuntimeError::Lifecycle)?;
913        }
914        let execution_id = uuid::Uuid::new_v4().to_string();
915        let payload_data: bytes::Bytes = serde_json::to_vec(&request)
916            .map_err(|e| RuntimeError::Internal(e.to_string()))?
917            .into();
918        let message = self.communication.create_internal_message(
919            self.system_agent_id,
920            agent_id,
921            payload_data,
922            types::MessageType::Direct(agent_id),
923            std::time::Duration::from_secs(300),
924        );
925        self.communication
926            .send_message(message)
927            .await
928            .map_err(RuntimeError::Communication)?;
929
930        // Record execution in the history log
931        #[cfg(feature = "http-api")]
932        self.execution_log
933            .record(agent_id, &execution_id, "execution_started");
934
935        Ok(ExecuteAgentResponse {
936            execution_id,
937            status: "execution_started".to_string(),
938        })
939    }
940
941    async fn get_agent_history(
942        &self,
943        agent_id: AgentId,
944    ) -> Result<GetAgentHistoryResponse, RuntimeError> {
945        let entries = self.execution_log.get_history(agent_id, 100);
946        let history = entries
947            .into_iter()
948            .map(|e| AgentExecutionRecord {
949                execution_id: e.execution_id,
950                status: e.status,
951                timestamp: e.timestamp.to_rfc3339(),
952            })
953            .collect();
954        Ok(GetAgentHistoryResponse { history })
955    }
956
957    // ── Schedule endpoints ──────────────────────────────────────────
958
959    async fn list_schedules(&self) -> Result<Vec<ScheduleSummary>, RuntimeError> {
960        #[cfg(feature = "cron")]
961        if let Some(ref cron) = self.cron_scheduler {
962            let jobs = cron
963                .list_jobs()
964                .await
965                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
966            return Ok(jobs
967                .into_iter()
968                .map(|j| ScheduleSummary {
969                    job_id: j.job_id.to_string(),
970                    name: j.name,
971                    cron_expression: j.cron_expression,
972                    timezone: j.timezone,
973                    status: format!("{:?}", j.status),
974                    enabled: j.enabled,
975                    next_run: j.next_run.map(|t| t.to_rfc3339()),
976                    run_count: j.run_count,
977                })
978                .collect());
979        }
980        Ok(vec![])
981    }
982
983    async fn create_schedule(
984        &self,
985        request: CreateScheduleRequest,
986    ) -> Result<CreateScheduleResponse, RuntimeError> {
987        #[cfg(feature = "cron")]
988        if let Some(ref cron) = self.cron_scheduler {
989            use scheduler::cron_types::{CronJobDefinition, CronJobId};
990            let now = chrono::Utc::now();
991            let tz = if request.timezone.is_empty() {
992                "UTC".to_string()
993            } else {
994                request.timezone
995            };
996            let agent_config = types::AgentConfig {
997                id: types::AgentId::new(),
998                name: request.agent_name,
999                dsl_source: String::new(),
1000                execution_mode: Default::default(),
1001                security_tier: Default::default(),
1002                resource_limits: Default::default(),
1003                capabilities: Vec::new(),
1004                policies: Vec::new(),
1005                metadata: Default::default(),
1006                priority: Default::default(),
1007            };
1008            let job = CronJobDefinition {
1009                job_id: CronJobId::new(),
1010                name: request.name,
1011                cron_expression: request.cron_expression,
1012                timezone: tz,
1013                agent_config,
1014                policy_ids: request.policy_ids,
1015                audit_level: Default::default(),
1016                status: scheduler::cron_types::CronJobStatus::Active,
1017                enabled: true,
1018                one_shot: request.one_shot,
1019                created_at: now,
1020                updated_at: now,
1021                last_run: None,
1022                next_run: None,
1023                run_count: 0,
1024                failure_count: 0,
1025                max_retries: 3,
1026                max_concurrent: 1,
1027                delivery_config: None,
1028                jitter_max_secs: 0,
1029                session_mode: Default::default(),
1030                agentpin_jwt: None,
1031            };
1032            let job_id = cron
1033                .add_job(job)
1034                .await
1035                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1036            // Retrieve the saved job to get the computed next_run.
1037            let saved = cron
1038                .get_job(job_id)
1039                .await
1040                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1041            return Ok(CreateScheduleResponse {
1042                job_id: job_id.to_string(),
1043                next_run: saved.next_run.map(|t| t.to_rfc3339()),
1044                status: "created".to_string(),
1045            });
1046        }
1047        Err(RuntimeError::Internal(
1048            "Schedule API requires a running CronScheduler".to_string(),
1049        ))
1050    }
1051
1052    async fn get_schedule(&self, job_id: &str) -> Result<ScheduleDetail, RuntimeError> {
1053        #[cfg(feature = "cron")]
1054        if let Some(ref cron) = self.cron_scheduler {
1055            let id: scheduler::cron_types::CronJobId = job_id
1056                .parse()
1057                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1058            let j = cron
1059                .get_job(id)
1060                .await
1061                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1062            return Ok(ScheduleDetail {
1063                job_id: j.job_id.to_string(),
1064                name: j.name,
1065                cron_expression: j.cron_expression,
1066                timezone: j.timezone,
1067                status: format!("{:?}", j.status),
1068                enabled: j.enabled,
1069                one_shot: j.one_shot,
1070                next_run: j.next_run.map(|t| t.to_rfc3339()),
1071                last_run: j.last_run.map(|t| t.to_rfc3339()),
1072                run_count: j.run_count,
1073                failure_count: j.failure_count,
1074                created_at: j.created_at.to_rfc3339(),
1075                updated_at: j.updated_at.to_rfc3339(),
1076            });
1077        }
1078        Err(RuntimeError::Internal(
1079            "Schedule API requires a running CronScheduler".to_string(),
1080        ))
1081    }
1082
1083    async fn update_schedule(
1084        &self,
1085        job_id: &str,
1086        request: UpdateScheduleRequest,
1087    ) -> Result<ScheduleDetail, RuntimeError> {
1088        #[cfg(feature = "cron")]
1089        if let Some(ref cron) = self.cron_scheduler {
1090            let id: scheduler::cron_types::CronJobId = job_id
1091                .parse()
1092                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1093            let mut job = cron
1094                .get_job(id)
1095                .await
1096                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1097            if let Some(expr) = request.cron_expression {
1098                job.cron_expression = expr;
1099            }
1100            if let Some(tz) = request.timezone {
1101                job.timezone = tz;
1102            }
1103            if let Some(pids) = request.policy_ids {
1104                job.policy_ids = pids;
1105            }
1106            if let Some(one_shot) = request.one_shot {
1107                job.one_shot = one_shot;
1108            }
1109            cron.update_job(job)
1110                .await
1111                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1112            return self.get_schedule(job_id).await;
1113        }
1114        Err(RuntimeError::Internal(
1115            "Schedule API requires a running CronScheduler".to_string(),
1116        ))
1117    }
1118
1119    async fn delete_schedule(&self, job_id: &str) -> Result<DeleteScheduleResponse, RuntimeError> {
1120        #[cfg(feature = "cron")]
1121        if let Some(ref cron) = self.cron_scheduler {
1122            let id: scheduler::cron_types::CronJobId = job_id
1123                .parse()
1124                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1125            cron.remove_job(id)
1126                .await
1127                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1128            return Ok(DeleteScheduleResponse {
1129                job_id: job_id.to_string(),
1130                deleted: true,
1131            });
1132        }
1133        Err(RuntimeError::Internal(
1134            "Schedule API requires a running CronScheduler".to_string(),
1135        ))
1136    }
1137
1138    async fn pause_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1139        #[cfg(feature = "cron")]
1140        if let Some(ref cron) = self.cron_scheduler {
1141            let id: scheduler::cron_types::CronJobId = job_id
1142                .parse()
1143                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1144            cron.pause_job(id)
1145                .await
1146                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1147            return Ok(ScheduleActionResponse {
1148                job_id: job_id.to_string(),
1149                action: "pause".to_string(),
1150                status: "paused".to_string(),
1151            });
1152        }
1153        Err(RuntimeError::Internal(
1154            "Schedule API requires a running CronScheduler".to_string(),
1155        ))
1156    }
1157
1158    async fn resume_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1159        #[cfg(feature = "cron")]
1160        if let Some(ref cron) = self.cron_scheduler {
1161            let id: scheduler::cron_types::CronJobId = job_id
1162                .parse()
1163                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1164            cron.resume_job(id)
1165                .await
1166                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1167            return Ok(ScheduleActionResponse {
1168                job_id: job_id.to_string(),
1169                action: "resume".to_string(),
1170                status: "active".to_string(),
1171            });
1172        }
1173        Err(RuntimeError::Internal(
1174            "Schedule API requires a running CronScheduler".to_string(),
1175        ))
1176    }
1177
1178    async fn trigger_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1179        #[cfg(feature = "cron")]
1180        if let Some(ref cron) = self.cron_scheduler {
1181            let id: scheduler::cron_types::CronJobId = job_id
1182                .parse()
1183                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1184            cron.trigger_now(id)
1185                .await
1186                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1187            return Ok(ScheduleActionResponse {
1188                job_id: job_id.to_string(),
1189                action: "trigger".to_string(),
1190                status: "triggered".to_string(),
1191            });
1192        }
1193        Err(RuntimeError::Internal(
1194            "Schedule API requires a running CronScheduler".to_string(),
1195        ))
1196    }
1197
1198    async fn get_schedule_history(
1199        &self,
1200        job_id: &str,
1201        limit: usize,
1202    ) -> Result<ScheduleHistoryResponse, RuntimeError> {
1203        #[cfg(feature = "cron")]
1204        if let Some(ref cron) = self.cron_scheduler {
1205            let id: scheduler::cron_types::CronJobId = job_id
1206                .parse()
1207                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1208            let runs = cron
1209                .get_run_history(id, limit)
1210                .await
1211                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1212            return Ok(ScheduleHistoryResponse {
1213                job_id: job_id.to_string(),
1214                history: runs
1215                    .into_iter()
1216                    .map(|r| ScheduleRunEntry {
1217                        run_id: r.run_id.to_string(),
1218                        started_at: r.started_at.to_rfc3339(),
1219                        completed_at: r.completed_at.map(|t| t.to_rfc3339()),
1220                        status: format!("{:?}", r.status),
1221                        error: r.error,
1222                        execution_time_ms: r.execution_time_ms,
1223                    })
1224                    .collect(),
1225            });
1226        }
1227        Err(RuntimeError::Internal(
1228            "Schedule API requires a running CronScheduler".to_string(),
1229        ))
1230    }
1231
1232    async fn get_schedule_next_runs(
1233        &self,
1234        job_id: &str,
1235        count: usize,
1236    ) -> Result<NextRunsResponse, RuntimeError> {
1237        #[cfg(feature = "cron")]
1238        if let Some(ref cron) = self.cron_scheduler {
1239            let id: scheduler::cron_types::CronJobId = job_id
1240                .parse()
1241                .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1242            let job = cron
1243                .get_job(id)
1244                .await
1245                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1246            let runs = cron
1247                .get_next_runs(&job.cron_expression, &job.timezone, count)
1248                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1249            return Ok(NextRunsResponse {
1250                job_id: job_id.to_string(),
1251                next_runs: runs.into_iter().map(|t| t.to_rfc3339()).collect(),
1252            });
1253        }
1254        Err(RuntimeError::Internal(
1255            "Schedule API requires a running CronScheduler".to_string(),
1256        ))
1257    }
1258
1259    async fn get_scheduler_health(
1260        &self,
1261    ) -> Result<api::types::SchedulerHealthResponse, RuntimeError> {
1262        #[cfg(feature = "cron")]
1263        if let Some(ref cron) = self.cron_scheduler {
1264            let h = cron
1265                .check_health()
1266                .await
1267                .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1268            let m = cron.metrics();
1269            return Ok(api::types::SchedulerHealthResponse {
1270                is_running: h.is_running,
1271                store_accessible: h.store_accessible,
1272                jobs_total: h.jobs_total,
1273                jobs_active: h.jobs_active,
1274                jobs_paused: h.jobs_paused,
1275                jobs_dead_letter: h.jobs_dead_letter,
1276                global_active_runs: h.global_active_runs,
1277                max_concurrent: h.max_concurrent,
1278                runs_total: m.runs_total,
1279                runs_succeeded: m.runs_succeeded,
1280                runs_failed: m.runs_failed,
1281                average_execution_time_ms: m.average_execution_time_ms,
1282                longest_run_ms: m.longest_run_ms,
1283            });
1284        }
1285        // No CronScheduler — return a minimal response.
1286        Ok(api::types::SchedulerHealthResponse {
1287            is_running: false,
1288            store_accessible: false,
1289            jobs_total: 0,
1290            jobs_active: 0,
1291            jobs_paused: 0,
1292            jobs_dead_letter: 0,
1293            global_active_runs: 0,
1294            max_concurrent: 0,
1295            runs_total: 0,
1296            runs_succeeded: 0,
1297            runs_failed: 0,
1298            average_execution_time_ms: 0.0,
1299            longest_run_ms: 0,
1300        })
1301    }
1302
1303    // ── Channel endpoints ──────────────────────────────────────────
1304
1305    async fn list_channels(&self) -> Result<Vec<ChannelSummary>, RuntimeError> {
1306        // No ChannelAdapterManager — return empty list so dashboard renders gracefully
1307        Ok(vec![])
1308    }
1309
1310    async fn register_channel(
1311        &self,
1312        _request: RegisterChannelRequest,
1313    ) -> Result<RegisterChannelResponse, RuntimeError> {
1314        Err(RuntimeError::Internal(
1315            "Channel API requires a running ChannelAdapterManager".to_string(),
1316        ))
1317    }
1318
1319    async fn get_channel(&self, _id: &str) -> Result<ChannelDetail, RuntimeError> {
1320        Err(RuntimeError::Internal(
1321            "Channel API requires a running ChannelAdapterManager".to_string(),
1322        ))
1323    }
1324
1325    async fn update_channel(
1326        &self,
1327        _id: &str,
1328        _request: UpdateChannelRequest,
1329    ) -> Result<ChannelDetail, RuntimeError> {
1330        Err(RuntimeError::Internal(
1331            "Channel API requires a running ChannelAdapterManager".to_string(),
1332        ))
1333    }
1334
1335    async fn delete_channel(&self, _id: &str) -> Result<DeleteChannelResponse, RuntimeError> {
1336        Err(RuntimeError::Internal(
1337            "Channel API requires a running ChannelAdapterManager".to_string(),
1338        ))
1339    }
1340
1341    async fn start_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1342        Err(RuntimeError::Internal(
1343            "Channel API requires a running ChannelAdapterManager".to_string(),
1344        ))
1345    }
1346
1347    async fn stop_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1348        Err(RuntimeError::Internal(
1349            "Channel API requires a running ChannelAdapterManager".to_string(),
1350        ))
1351    }
1352
1353    async fn get_channel_health(&self, _id: &str) -> Result<ChannelHealthResponse, RuntimeError> {
1354        Err(RuntimeError::Internal(
1355            "Channel API requires a running ChannelAdapterManager".to_string(),
1356        ))
1357    }
1358
1359    async fn list_channel_mappings(
1360        &self,
1361        _id: &str,
1362    ) -> Result<Vec<IdentityMappingEntry>, RuntimeError> {
1363        Err(RuntimeError::Internal(
1364            "Channel identity mappings require enterprise edition".to_string(),
1365        ))
1366    }
1367
1368    async fn add_channel_mapping(
1369        &self,
1370        _id: &str,
1371        _request: AddIdentityMappingRequest,
1372    ) -> Result<IdentityMappingEntry, RuntimeError> {
1373        Err(RuntimeError::Internal(
1374            "Channel identity mappings require enterprise edition".to_string(),
1375        ))
1376    }
1377
1378    async fn remove_channel_mapping(&self, _id: &str, _user_id: &str) -> Result<(), RuntimeError> {
1379        Err(RuntimeError::Internal(
1380            "Channel identity mappings require enterprise edition".to_string(),
1381        ))
1382    }
1383
1384    async fn get_channel_audit(
1385        &self,
1386        _id: &str,
1387        _limit: usize,
1388    ) -> Result<ChannelAuditResponse, RuntimeError> {
1389        Err(RuntimeError::Internal(
1390            "Channel audit log requires enterprise edition".to_string(),
1391        ))
1392    }
1393
1394    // ── External agent endpoints ─────────────────────────────────────
1395
1396    async fn update_agent_heartbeat(
1397        &self,
1398        agent_id: AgentId,
1399        heartbeat: api::types::HeartbeatRequest,
1400    ) -> Result<(), RuntimeError> {
1401        // AP-4: gate external heartbeats on AgentPin when enabled.
1402        self.verify_agentpin_for_agent(heartbeat.agentpin_jwt.as_deref(), agent_id)
1403            .await?;
1404
1405        let ext_agents = self.scheduler.external_agents();
1406        let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1407            RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1408        })?;
1409
1410        entry.last_heartbeat = Some(chrono::Utc::now());
1411        entry.reported_state = heartbeat.state;
1412
1413        if let Some(metadata) = heartbeat.metadata {
1414            entry.metadata.extend(metadata);
1415        }
1416        if let Some(last_result) = heartbeat.last_result {
1417            entry.last_result = Some(last_result);
1418        }
1419
1420        Ok(())
1421    }
1422
1423    async fn push_agent_event(
1424        &self,
1425        agent_id: AgentId,
1426        event: api::types::PushEventRequest,
1427    ) -> Result<(), RuntimeError> {
1428        // AP-4: gate external agent events on AgentPin when enabled.
1429        self.verify_agentpin_for_agent(event.agentpin_jwt.as_deref(), agent_id)
1430            .await?;
1431
1432        let ext_agents = self.scheduler.external_agents();
1433        let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1434            RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1435        })?;
1436
1437        entry.push_event(api::types::AgentEvent {
1438            event_type: event.event_type,
1439            payload: event.payload,
1440            timestamp: chrono::Utc::now(),
1441        });
1442
1443        Ok(())
1444    }
1445
1446    async fn check_unreachable_agents(&self) {
1447        self.scheduler.check_unreachable_agents();
1448    }
1449
1450    async fn send_agent_message(
1451        &self,
1452        recipient: crate::types::AgentId,
1453        request: api::types::SendMessageRequest,
1454    ) -> Result<api::types::SendMessageResponse, crate::types::RuntimeError> {
1455        // AgentPin identity check (AP-1): reuse the shared helper so
1456        // every per-agent ingress path (messaging, heartbeat, push_event)
1457        // enforces the same rule.
1458        self.verify_agentpin_for_agent(request.agentpin_jwt.as_deref(), request.sender)
1459            .await?;
1460
1461        // Trust-boundary note (H-5 / M-1):
1462        // The `request` arrives from an HTTP caller (potentially another
1463        // runtime via RemoteCommunicationBus). We deliberately build a fresh
1464        // SecureMessage here via `create_internal_message` so it is signed
1465        // by THIS bus's Ed25519 key. The local bus then rejects any message
1466        // whose signature doesn't verify against its own key (see
1467        // DefaultCommunicationBus::verify_message_signature), which means
1468        // untrusted wire bytes — including SignatureAlgorithm::None or
1469        // EncryptionAlgorithm::None from a remote shim — can never survive
1470        // ingress without being re-wrapped here.
1471        //
1472        // Clamp TTL: bound by the bus's configured `message_ttl` so a caller
1473        // cannot pin queued messages beyond operator intent, and by a hard
1474        // safety cap so a misconfigured config can't allow forever-TTLs.
1475        const MAX_TTL_SECS: u64 = 24 * 3600;
1476        const DEFAULT_TTL_SECS: u64 = 300;
1477        let requested_secs = request.ttl_seconds.unwrap_or(DEFAULT_TTL_SECS);
1478        let configured_secs = self.config.read().await.communication.message_ttl.as_secs();
1479        let ttl_secs = requested_secs.min(configured_secs).clamp(1, MAX_TTL_SECS);
1480        let ttl = std::time::Duration::from_secs(ttl_secs);
1481
1482        // Decide message type: topic = publish, otherwise direct
1483        if let Some(ref topic) = request.topic {
1484            let msg = self.communication.create_internal_message(
1485                request.sender,
1486                recipient,
1487                bytes::Bytes::from(request.payload.into_bytes()),
1488                crate::types::communication::MessageType::Publish(topic.clone()),
1489                ttl,
1490            );
1491            let message_id = msg.id;
1492            self.communication
1493                .publish(topic.clone(), msg)
1494                .await
1495                .map_err(crate::types::RuntimeError::Communication)?;
1496            Ok(api::types::SendMessageResponse {
1497                message_id: message_id.0.to_string(),
1498                status: "pending".to_string(),
1499            })
1500        } else {
1501            let msg = self.communication.create_internal_message(
1502                request.sender,
1503                recipient,
1504                bytes::Bytes::from(request.payload.into_bytes()),
1505                crate::types::communication::MessageType::Direct(recipient),
1506                ttl,
1507            );
1508            let message_id = self
1509                .communication
1510                .send_message(msg)
1511                .await
1512                .map_err(crate::types::RuntimeError::Communication)?;
1513            Ok(api::types::SendMessageResponse {
1514                message_id: message_id.0.to_string(),
1515                status: "pending".to_string(),
1516            })
1517        }
1518    }
1519
1520    async fn receive_agent_messages(
1521        &self,
1522        agent_id: crate::types::AgentId,
1523    ) -> Result<api::types::ReceiveMessagesResponse, crate::types::RuntimeError> {
1524        let messages = self
1525            .communication
1526            .receive_messages(agent_id)
1527            .await
1528            .map_err(crate::types::RuntimeError::Communication)?;
1529
1530        let envelopes = messages
1531            .into_iter()
1532            .map(|m| {
1533                let timestamp_secs = m
1534                    .timestamp
1535                    .duration_since(std::time::UNIX_EPOCH)
1536                    .map(|d: std::time::Duration| d.as_secs())
1537                    .unwrap_or(0);
1538                let ttl_seconds = m.ttl.as_secs();
1539                let (message_type, topic) = match &m.message_type {
1540                    crate::types::communication::MessageType::Direct(_) => {
1541                        ("direct".to_string(), None)
1542                    }
1543                    crate::types::communication::MessageType::Publish(t) => {
1544                        ("publish".to_string(), Some(t.clone()))
1545                    }
1546                    crate::types::communication::MessageType::Subscribe(t) => {
1547                        ("subscribe".to_string(), Some(t.clone()))
1548                    }
1549                    crate::types::communication::MessageType::Broadcast => {
1550                        ("broadcast".to_string(), None)
1551                    }
1552                    crate::types::communication::MessageType::Request(_) => {
1553                        ("request".to_string(), None)
1554                    }
1555                    crate::types::communication::MessageType::Response(_) => {
1556                        ("response".to_string(), None)
1557                    }
1558                };
1559                // Payload is encrypted in-flight; for HTTP API return decrypted UTF-8 bytes.
1560                // The bus uses AES-256-GCM internally — the HTTP layer treats the payload
1561                // as opaque bytes that the receiver should decrypt if needed. For the
1562                // default bus (in-process), encryption is symmetric so callers see the
1563                // raw bytes they put in.
1564                let payload = String::from_utf8_lossy(&m.payload.data).to_string();
1565                api::types::MessageEnvelope {
1566                    message_id: m.id.0.to_string(),
1567                    sender: m.sender,
1568                    recipient: m.recipient,
1569                    topic,
1570                    payload,
1571                    message_type,
1572                    timestamp_secs,
1573                    ttl_seconds,
1574                }
1575            })
1576            .collect();
1577
1578        Ok(api::types::ReceiveMessagesResponse {
1579            messages: envelopes,
1580        })
1581    }
1582
1583    async fn get_message_status(
1584        &self,
1585        message_id: &str,
1586    ) -> Result<api::types::MessageStatusResponse, crate::types::RuntimeError> {
1587        let uuid = uuid::Uuid::parse_str(message_id).map_err(|_| {
1588            crate::types::RuntimeError::Communication(
1589                crate::types::CommunicationError::InvalidFormat(format!(
1590                    "Invalid message ID: {}",
1591                    message_id
1592                )),
1593            )
1594        })?;
1595        let mid = crate::types::MessageId(uuid);
1596        let status = self
1597            .communication
1598            .get_delivery_status(mid)
1599            .await
1600            .map_err(crate::types::RuntimeError::Communication)?;
1601        let status_str = match status {
1602            crate::communication::DeliveryStatus::Pending => "pending",
1603            crate::communication::DeliveryStatus::Delivered => "delivered",
1604            crate::communication::DeliveryStatus::Failed => "failed",
1605            crate::communication::DeliveryStatus::Expired => "expired",
1606        };
1607        Ok(api::types::MessageStatusResponse {
1608            message_id: message_id.to_string(),
1609            status: status_str.to_string(),
1610        })
1611    }
1612}