1pub mod communication;
7pub mod config;
8pub mod context;
9pub mod crypto;
10pub mod error_handler;
11pub mod integrations;
12pub mod lifecycle;
13pub mod logging;
14pub mod metrics;
15pub mod models;
16pub mod rag;
17pub mod reasoning;
18pub mod resource;
19pub mod routing;
20pub mod sandbox;
21pub mod scheduler;
22pub mod secrets;
23pub mod skills;
24pub mod toolclad;
25pub mod types;
26
27pub mod prelude;
28
29#[cfg(feature = "cli-executor")]
30pub mod cli_executor;
31
32#[cfg(feature = "http-api")]
33pub mod api;
34
35#[cfg(feature = "http-api")]
36use api::traits::RuntimeApiProvider;
37#[cfg(all(feature = "http-api", feature = "cron"))]
38use api::types::ScheduleRunEntry;
39#[cfg(feature = "http-api")]
40use api::types::{
41 AddIdentityMappingRequest, AgentExecutionRecord, AgentStatusResponse, ChannelActionResponse,
42 ChannelAuditResponse, ChannelDetail, ChannelHealthResponse, ChannelSummary, CreateAgentRequest,
43 CreateAgentResponse, CreateScheduleRequest, CreateScheduleResponse, DeleteAgentResponse,
44 DeleteChannelResponse, DeleteScheduleResponse, ExecuteAgentRequest, ExecuteAgentResponse,
45 GetAgentHistoryResponse, IdentityMappingEntry, NextRunsResponse, RegisterChannelRequest,
46 RegisterChannelResponse, ScheduleActionResponse, ScheduleDetail, ScheduleHistoryResponse,
47 ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse, UpdateChannelRequest,
48 UpdateScheduleRequest, WorkflowExecutionRequest,
49};
50#[cfg(feature = "http-api")]
51use async_trait::async_trait;
52
53#[cfg(feature = "http-input")]
54pub mod http_input;
55
56pub use communication::{CommunicationBus, CommunicationConfig, DefaultCommunicationBus};
58pub use config::SecurityConfig;
59pub use context::{ContextManager, ContextManagerConfig, StandardContextManager};
60pub use error_handler::{DefaultErrorHandler, ErrorHandler, ErrorHandlerConfig};
61pub use lifecycle::{DefaultLifecycleController, LifecycleConfig, LifecycleController};
62pub use logging::{LoggingConfig, ModelInteractionType, ModelLogger, RequestData, ResponseData};
63pub use models::{ModelCatalog, ModelCatalogError, SlmRunner, SlmRunnerError};
64pub use resource::{DefaultResourceManager, ResourceManager, ResourceManagerConfig};
65pub use routing::{
66 DefaultRoutingEngine, RouteDecision, RoutingConfig, RoutingContext, RoutingEngine, TaskType,
67};
68pub use sandbox::{E2BSandbox, ExecutionResult, SandboxRunner, SandboxTier};
69#[cfg(feature = "cron")]
70pub use scheduler::{
71 cron_scheduler::{
72 CronMetrics, CronScheduler, CronSchedulerConfig, CronSchedulerError, CronSchedulerHealth,
73 },
74 cron_types::{
75 AuditLevel, CronJobDefinition, CronJobId, CronJobStatus, DeliveryChannel, DeliveryConfig,
76 DeliveryReceipt, JobRunRecord, JobRunStatus,
77 },
78 delivery::{CustomDeliveryHandler, DefaultDeliveryRouter, DeliveryResult, DeliveryRouter},
79 heartbeat::{
80 HeartbeatAssessment, HeartbeatConfig, HeartbeatContextMode, HeartbeatSeverity,
81 HeartbeatState,
82 },
83 job_store::{JobStore, JobStoreError, SqliteJobStore},
84 policy_gate::{
85 PolicyGate, ScheduleContext, SchedulePolicyCondition, SchedulePolicyDecision,
86 SchedulePolicyEffect, SchedulePolicyRule,
87 },
88};
89pub use scheduler::{AgentScheduler, DefaultAgentScheduler, SchedulerConfig};
90pub use secrets::{SecretStore, SecretsConfig};
91pub use types::*;
92
93use std::sync::Arc;
94use tokio::sync::RwLock;
95
96#[cfg(feature = "http-api")]
103pub struct ExecutionLog {
104 entries: parking_lot::RwLock<std::collections::VecDeque<ExecutionEntry>>,
105 capacity: usize,
106}
107
108#[cfg(feature = "http-api")]
109#[derive(Debug, Clone)]
110struct ExecutionEntry {
111 agent_id: AgentId,
112 execution_id: String,
113 status: String,
114 timestamp: chrono::DateTime<chrono::Utc>,
115}
116
117#[cfg(feature = "http-api")]
118impl ExecutionLog {
119 fn new(capacity: usize) -> Self {
120 Self {
121 entries: parking_lot::RwLock::new(std::collections::VecDeque::with_capacity(capacity)),
122 capacity,
123 }
124 }
125
126 fn record(&self, agent_id: AgentId, execution_id: &str, status: &str) {
128 let entry = ExecutionEntry {
129 agent_id,
130 execution_id: execution_id.to_string(),
131 status: status.to_string(),
132 timestamp: chrono::Utc::now(),
133 };
134 let mut entries = self.entries.write();
135 if entries.len() >= self.capacity {
136 entries.pop_front();
137 }
138 entries.push_back(entry);
139 }
140
141 fn get_history(&self, agent_id: AgentId, limit: usize) -> Vec<ExecutionEntry> {
143 let entries = self.entries.read();
144 entries
145 .iter()
146 .rev()
147 .filter(|e| e.agent_id == agent_id)
148 .take(limit)
149 .cloned()
150 .collect()
151 }
152}
153
154#[derive(Clone)]
156pub struct AgentRuntime {
157 pub scheduler: Arc<dyn scheduler::AgentScheduler + Send + Sync>,
158 pub lifecycle: Arc<dyn lifecycle::LifecycleController + Send + Sync>,
159 pub resource_manager: Arc<dyn resource::ResourceManager + Send + Sync>,
160 pub communication: Arc<dyn communication::CommunicationBus + Send + Sync>,
161 pub error_handler: Arc<dyn error_handler::ErrorHandler + Send + Sync>,
162 pub context_manager: Arc<dyn context::ContextManager + Send + Sync>,
163 pub model_logger: Option<Arc<logging::ModelLogger>>,
164 pub model_catalog: Option<Arc<models::ModelCatalog>>,
165 pub system_agent_id: AgentId,
169 #[cfg(feature = "cron")]
170 cron_scheduler: Option<Arc<scheduler::cron_scheduler::CronScheduler>>,
171 config: Arc<RwLock<RuntimeConfig>>,
172 #[cfg(feature = "http-api")]
174 execution_log: Arc<ExecutionLog>,
175}
176
177impl AgentRuntime {
178 pub async fn new(config: RuntimeConfig) -> Result<Self, RuntimeError> {
180 let config = Arc::new(RwLock::new(config));
181
182 let scheduler = Arc::new(
184 scheduler::DefaultAgentScheduler::new(config.read().await.scheduler.clone()).await?,
185 );
186
187 let resource_manager = Arc::new(
188 resource::DefaultResourceManager::new(config.read().await.resource_manager.clone())
189 .await?,
190 );
191
192 let communication = Arc::new(
193 communication::DefaultCommunicationBus::new(config.read().await.communication.clone())
194 .await?,
195 );
196
197 let error_handler = Arc::new(
198 error_handler::DefaultErrorHandler::new(config.read().await.error_handler.clone())
199 .await?,
200 );
201
202 let lifecycle_config = lifecycle::LifecycleConfig {
203 max_agents: 1000,
204 initialization_timeout: std::time::Duration::from_secs(30),
205 termination_timeout: std::time::Duration::from_secs(30),
206 state_check_interval: std::time::Duration::from_secs(10),
207 enable_auto_recovery: true,
208 max_restart_attempts: 3,
209 };
210 let lifecycle =
211 Arc::new(lifecycle::DefaultLifecycleController::new(lifecycle_config).await?);
212
213 let context_manager = Arc::new(
214 context::StandardContextManager::new(
215 config.read().await.context_manager.clone(),
216 "runtime-system",
217 )
218 .await
219 .map_err(|e| {
220 RuntimeError::Internal(format!("Failed to create context manager: {}", e))
221 })?,
222 );
223
224 context_manager.initialize().await.map_err(|e| {
226 RuntimeError::Internal(format!("Failed to initialize context manager: {}", e))
227 })?;
228
229 let model_logger = if config.read().await.logging.enabled {
231 match logging::ModelLogger::new(config.read().await.logging.clone(), None) {
233 Ok(logger) => {
234 tracing::info!("Model logging initialized successfully");
235 Some(Arc::new(logger))
236 }
237 Err(e) => {
238 tracing::warn!("Failed to initialize model logger: {}", e);
239 None
240 }
241 }
242 } else {
243 tracing::info!("Model logging is disabled");
244 None
245 };
246
247 let model_catalog = if let Some(ref slm_config) = config.read().await.slm {
249 if slm_config.enabled {
250 match models::ModelCatalog::new(slm_config.clone()) {
251 Ok(catalog) => {
252 tracing::info!(
253 "Model catalog initialized with {} models",
254 catalog.list_models().len()
255 );
256 Some(Arc::new(catalog))
257 }
258 Err(e) => {
259 tracing::warn!("Failed to initialize model catalog: {}", e);
260 None
261 }
262 }
263 } else {
264 tracing::info!("SLM support is disabled");
265 None
266 }
267 } else {
268 tracing::info!("No SLM configuration provided");
269 None
270 };
271
272 Ok(Self {
273 scheduler,
274 lifecycle,
275 resource_manager,
276 communication,
277 error_handler,
278 context_manager,
279 model_logger,
280 model_catalog,
281 system_agent_id: AgentId::new(),
282 #[cfg(feature = "cron")]
283 cron_scheduler: None,
284 config,
285 #[cfg(feature = "http-api")]
286 execution_log: Arc::new(ExecutionLog::new(10_000)),
287 })
288 }
289
290 #[cfg(feature = "cron")]
292 pub fn with_cron_scheduler(
293 mut self,
294 cron: Arc<scheduler::cron_scheduler::CronScheduler>,
295 ) -> Self {
296 self.cron_scheduler = Some(cron);
297 self
298 }
299
300 pub async fn get_config(&self) -> RuntimeConfig {
302 self.config.read().await.clone()
303 }
304
305 pub async fn update_config(&self, config: RuntimeConfig) -> Result<(), RuntimeError> {
307 *self.config.write().await = config;
308 Ok(())
309 }
310
311 pub async fn shutdown(&self) -> Result<(), RuntimeError> {
313 tracing::info!("Starting Agent Runtime shutdown sequence");
314
315 self.lifecycle
317 .shutdown()
318 .await
319 .map_err(RuntimeError::Lifecycle)?;
320 self.communication
321 .shutdown()
322 .await
323 .map_err(RuntimeError::Communication)?;
324 self.resource_manager
325 .shutdown()
326 .await
327 .map_err(RuntimeError::Resource)?;
328 self.scheduler
329 .shutdown()
330 .await
331 .map_err(RuntimeError::Scheduler)?;
332 self.error_handler
333 .shutdown()
334 .await
335 .map_err(RuntimeError::ErrorHandler)?;
336
337 self.context_manager.shutdown().await.map_err(|e| {
339 RuntimeError::Internal(format!("Context manager shutdown failed: {}", e))
340 })?;
341
342 tracing::info!("Agent Runtime shutdown completed successfully");
343 Ok(())
344 }
345
346 pub async fn get_status(&self) -> SystemStatus {
348 self.scheduler.get_system_status().await
349 }
350}
351
352#[derive(Debug, Clone, Default)]
354pub struct RuntimeConfig {
355 pub scheduler: scheduler::SchedulerConfig,
356 pub resource_manager: resource::ResourceManagerConfig,
357 pub communication: communication::CommunicationConfig,
358 pub context_manager: context::ContextManagerConfig,
359 pub security: SecurityConfig,
360 pub audit: AuditConfig,
361 pub error_handler: error_handler::ErrorHandlerConfig,
362 pub logging: logging::LoggingConfig,
363 pub slm: Option<config::Slm>,
364 pub routing: Option<routing::RoutingConfig>,
365}
366
367#[cfg(feature = "http-api")]
369#[async_trait]
370#[allow(unused_variables)] impl RuntimeApiProvider for AgentRuntime {
372 async fn execute_workflow(
373 &self,
374 request: WorkflowExecutionRequest,
375 ) -> Result<serde_json::Value, RuntimeError> {
376 tracing::info!("Executing workflow: {}", request.workflow_id);
377
378 let workflow_dsl = &request.workflow_id; let (metadata, agent_config) = {
381 let parsed_tree = dsl::parse_dsl(workflow_dsl)
382 .map_err(|e| RuntimeError::Internal(format!("DSL parsing failed: {}", e)))?;
383
384 let metadata = dsl::extract_metadata(&parsed_tree, workflow_dsl);
386
387 let root_node = parsed_tree.root_node();
389 if root_node.has_error() {
390 return Err(RuntimeError::Internal(
391 "DSL contains syntax errors".to_string(),
392 ));
393 }
394
395 let agent_id = request.agent_id.unwrap_or_default();
397 let agent_config = AgentConfig {
398 id: agent_id,
399 name: metadata
400 .get("name")
401 .cloned()
402 .unwrap_or_else(|| "workflow_agent".to_string()),
403 dsl_source: workflow_dsl.to_string(),
404 execution_mode: ExecutionMode::Ephemeral,
405 security_tier: SecurityTier::Tier1,
406 resource_limits: ResourceLimits::default(),
407 capabilities: vec![Capability::Computation], policies: vec![],
409 metadata: metadata.clone(),
410 priority: Priority::Normal,
411 };
412
413 (metadata, agent_config)
414 };
415
416 let scheduled_agent_id = self
418 .scheduler
419 .schedule_agent(agent_config)
420 .await
421 .map_err(RuntimeError::Scheduler)?;
422
423 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
425
426 let system_status = self.scheduler.get_system_status().await;
428
429 let mut result = serde_json::json!({
431 "status": "success",
432 "workflow_id": request.workflow_id,
433 "agent_id": scheduled_agent_id.to_string(),
434 "execution_started": true,
435 "metadata": metadata,
436 "system_status": {
437 "total_agents": system_status.total_agents,
438 "running_agents": system_status.running_agents,
439 "resource_utilization": {
440 "memory_used": system_status.resource_utilization.memory_used,
441 "cpu_utilization": system_status.resource_utilization.cpu_utilization,
442 "disk_io_rate": system_status.resource_utilization.disk_io_rate,
443 "network_io_rate": system_status.resource_utilization.network_io_rate
444 }
445 }
446 });
447
448 if !request.parameters.is_null() {
450 result["parameters"] = request.parameters;
451 }
452
453 self.execution_log.record(
455 scheduled_agent_id,
456 &scheduled_agent_id.to_string(),
457 "workflow_started",
458 );
459
460 tracing::info!(
461 "Workflow execution initiated for agent: {}",
462 scheduled_agent_id
463 );
464 Ok(result)
465 }
466
467 async fn get_agent_status(
468 &self,
469 agent_id: AgentId,
470 ) -> Result<AgentStatusResponse, RuntimeError> {
471 let external_state = self.scheduler.get_external_agent_state(agent_id);
472 let agent_config = self.scheduler.get_agent_config(agent_id);
473
474 match self.scheduler.get_agent_status(agent_id).await {
475 Ok(agent_status) => {
476 let last_activity =
478 chrono::DateTime::<chrono::Utc>::from(agent_status.last_activity);
479
480 let execution_mode_label = agent_config.as_ref().map(|c| match &c.execution_mode {
481 crate::types::agent::ExecutionMode::External { .. } => "External".to_string(),
482 crate::types::agent::ExecutionMode::Persistent => "Persistent".to_string(),
483 crate::types::agent::ExecutionMode::Ephemeral => "Ephemeral".to_string(),
484 crate::types::agent::ExecutionMode::Scheduled { .. } => "Scheduled".to_string(),
485 crate::types::agent::ExecutionMode::CronScheduled { .. } => {
486 "CronScheduled".to_string()
487 }
488 crate::types::agent::ExecutionMode::EventDriven => "EventDriven".to_string(),
489 });
490
491 let (metadata, last_result, recent_events) = if let Some(ref ext) = external_state {
492 (
493 Some(ext.metadata.clone()),
494 ext.last_result.clone(),
495 Some(ext.events.iter().cloned().collect::<Vec<_>>()),
496 )
497 } else {
498 (None, None, None)
499 };
500
501 Ok(AgentStatusResponse {
502 agent_id: agent_status.agent_id,
503 state: agent_status.state,
504 last_activity,
505 resource_usage: api::types::ResourceUsage {
506 memory_bytes: agent_status.memory_usage,
507 cpu_percent: agent_status.cpu_usage,
508 active_tasks: agent_status.active_tasks,
509 },
510 metadata,
511 last_result,
512 recent_events,
513 execution_mode: execution_mode_label,
514 })
515 }
516 Err(scheduler_error) => {
517 tracing::warn!(
518 "Failed to get agent status for {}: {}",
519 agent_id,
520 scheduler_error
521 );
522 Err(RuntimeError::Internal(format!(
523 "Agent {} not found",
524 agent_id
525 )))
526 }
527 }
528 }
529
530 async fn get_system_health(&self) -> Result<serde_json::Value, RuntimeError> {
531 let scheduler_health =
533 self.scheduler.check_health().await.map_err(|e| {
534 RuntimeError::Internal(format!("Scheduler health check failed: {}", e))
535 })?;
536
537 let lifecycle_health =
538 self.lifecycle.check_health().await.map_err(|e| {
539 RuntimeError::Internal(format!("Lifecycle health check failed: {}", e))
540 })?;
541
542 let resource_health = self.resource_manager.check_health().await.map_err(|e| {
543 RuntimeError::Internal(format!("Resource manager health check failed: {}", e))
544 })?;
545
546 let communication_health = self.communication.check_health().await.map_err(|e| {
547 RuntimeError::Internal(format!("Communication health check failed: {}", e))
548 })?;
549
550 let component_healths = vec![
552 ("scheduler", &scheduler_health),
553 ("lifecycle", &lifecycle_health),
554 ("resource_manager", &resource_health),
555 ("communication", &communication_health),
556 ];
557
558 let overall_status = if component_healths
559 .iter()
560 .all(|(_, h)| h.status == HealthStatus::Healthy)
561 {
562 "healthy"
563 } else if component_healths
564 .iter()
565 .any(|(_, h)| h.status == HealthStatus::Unhealthy)
566 {
567 "unhealthy"
568 } else {
569 "degraded"
570 };
571
572 let mut components = serde_json::Map::new();
574 for (name, health) in component_healths {
575 let component_info = serde_json::json!({
576 "status": match health.status {
577 HealthStatus::Healthy => "healthy",
578 HealthStatus::Degraded => "degraded",
579 HealthStatus::Unhealthy => "unhealthy",
580 },
581 "message": health.message,
582 "last_check": health.last_check
583 .duration_since(std::time::UNIX_EPOCH)
584 .unwrap_or_default()
585 .as_secs(),
586 "uptime_seconds": health.uptime.as_secs(),
587 "metrics": health.metrics
588 });
589 components.insert(name.to_string(), component_info);
590 }
591
592 Ok(serde_json::json!({
593 "status": overall_status,
594 "timestamp": std::time::SystemTime::now()
595 .duration_since(std::time::UNIX_EPOCH)
596 .unwrap_or_default()
597 .as_secs(),
598 "components": components
599 }))
600 }
601
602 async fn list_agents(&self) -> Result<Vec<AgentId>, RuntimeError> {
603 Ok(self.scheduler.list_agents().await)
604 }
605
606 async fn list_agents_detailed(
607 &self,
608 ) -> Result<Vec<crate::api::types::AgentSummary>, RuntimeError> {
609 let ids = self.scheduler.list_agents().await;
610 let mut summaries = Vec::new();
611 for id in ids {
612 let name = self
613 .scheduler
614 .get_agent_config(id)
615 .map(|c| c.name)
616 .unwrap_or_default();
617 let state = self
618 .scheduler
619 .get_agent_status(id)
620 .await
621 .map(|s| s.state)
622 .unwrap_or(AgentState::Created);
623 summaries.push(crate::api::types::AgentSummary { id, name, state });
624 }
625 Ok(summaries)
626 }
627
628 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), RuntimeError> {
629 self.scheduler
630 .shutdown_agent(agent_id)
631 .await
632 .map_err(RuntimeError::Scheduler)
633 }
634
635 async fn get_metrics(&self) -> Result<serde_json::Value, RuntimeError> {
636 let status = self.get_status().await;
637
638 let agent_ids = self.scheduler.list_agents().await;
640 let mut error_count: usize = 0;
641 for aid in &agent_ids {
642 if let Ok(agent_status) = self.scheduler.get_agent_status(*aid).await {
643 if agent_status.state == AgentState::Failed {
644 error_count += 1;
645 }
646 }
647 }
648
649 let idle = status
650 .total_agents
651 .saturating_sub(status.running_agents)
652 .saturating_sub(error_count);
653
654 Ok(serde_json::json!({
655 "agents": {
656 "total": status.total_agents,
657 "running": status.running_agents,
658 "idle": idle,
659 "error": error_count
660 },
661 "system": {
662 "uptime_seconds": status.uptime.as_secs(),
663 "memory_usage": status.resource_utilization.memory_used,
664 "cpu_usage": status.resource_utilization.cpu_utilization
665 }
666 }))
667 }
668
669 async fn create_agent(
670 &self,
671 request: CreateAgentRequest,
672 ) -> Result<CreateAgentResponse, RuntimeError> {
673 if request.name.is_empty() {
675 return Err(RuntimeError::Internal(
676 "Agent name cannot be empty".to_string(),
677 ));
678 }
679
680 let execution_mode = request.execution_mode.clone().unwrap_or_default();
681
682 let dsl_source = match &execution_mode {
683 crate::types::agent::ExecutionMode::External { .. } => {
684 request.dsl.clone().unwrap_or_default()
685 }
686 _ => {
687 let dsl = request.dsl.as_deref().unwrap_or("");
688 if dsl.is_empty() {
689 return Err(RuntimeError::Internal(
690 "Agent DSL cannot be empty for non-external agents".to_string(),
691 ));
692 }
693 dsl.to_string()
694 }
695 };
696
697 let agent_id = AgentId::new();
699 let agent_config = AgentConfig {
700 id: agent_id,
701 name: request.name,
702 dsl_source,
703 execution_mode,
704 security_tier: SecurityTier::Tier1,
705 resource_limits: ResourceLimits::default(),
706 capabilities: request
707 .capabilities
708 .unwrap_or_default()
709 .into_iter()
710 .map(Capability::Custom)
711 .collect(),
712 policies: vec![],
713 metadata: request.metadata.unwrap_or_default(),
714 priority: Priority::Normal,
715 };
716
717 let scheduled_agent_id = self
719 .scheduler
720 .schedule_agent(agent_config)
721 .await
722 .map_err(RuntimeError::Scheduler)?;
723
724 tracing::info!("Created and scheduled agent: {}", scheduled_agent_id);
725
726 self.execution_log.record(
728 scheduled_agent_id,
729 &scheduled_agent_id.to_string(),
730 "created",
731 );
732
733 Ok(CreateAgentResponse {
734 id: scheduled_agent_id.to_string(),
735 status: "scheduled".to_string(),
736 })
737 }
738
739 async fn update_agent(
740 &self,
741 agent_id: AgentId,
742 request: UpdateAgentRequest,
743 ) -> Result<UpdateAgentResponse, RuntimeError> {
744 if request.name.is_none() && request.dsl.is_none() {
746 return Err(RuntimeError::Internal(
747 "At least one field (name or dsl) must be provided for update".to_string(),
748 ));
749 }
750
751 if let Some(ref name) = request.name {
753 if name.is_empty() {
754 return Err(RuntimeError::Internal(
755 "Agent name cannot be empty".to_string(),
756 ));
757 }
758 }
759
760 if let Some(ref dsl) = request.dsl {
761 if dsl.is_empty() {
762 return Err(RuntimeError::Internal(
763 "Agent DSL cannot be empty".to_string(),
764 ));
765 }
766 }
767
768 self.scheduler
770 .update_agent(agent_id, request)
771 .await
772 .map_err(RuntimeError::Scheduler)?;
773
774 tracing::info!("Successfully updated agent: {}", agent_id);
775
776 Ok(UpdateAgentResponse {
777 id: agent_id.to_string(),
778 status: "updated".to_string(),
779 })
780 }
781
782 async fn delete_agent(&self, agent_id: AgentId) -> Result<DeleteAgentResponse, RuntimeError> {
783 self.scheduler
784 .delete_agent(agent_id)
785 .await
786 .map_err(RuntimeError::Scheduler)?;
787
788 Ok(DeleteAgentResponse {
789 id: agent_id.to_string(),
790 status: "deleted".to_string(),
791 })
792 }
793
794 async fn execute_agent(
795 &self,
796 agent_id: AgentId,
797 request: ExecuteAgentRequest,
798 ) -> Result<ExecuteAgentResponse, RuntimeError> {
799 if !self.scheduler.has_agent(agent_id) {
801 return Err(RuntimeError::Internal(format!(
802 "Agent {} not found",
803 agent_id
804 )));
805 }
806
807 let status = self.get_agent_status(agent_id).await?;
809 if status.state == AgentState::Completed {
810 if let Some(config) = self.scheduler.get_agent_config(agent_id) {
811 self.scheduler
812 .schedule_agent(config)
813 .await
814 .map_err(RuntimeError::Scheduler)?;
815 }
816 } else if status.state != AgentState::Running {
817 self.lifecycle
818 .start_agent(agent_id)
819 .await
820 .map_err(RuntimeError::Lifecycle)?;
821 }
822 let execution_id = uuid::Uuid::new_v4().to_string();
823 let payload_data: bytes::Bytes = serde_json::to_vec(&request)
824 .map_err(|e| RuntimeError::Internal(e.to_string()))?
825 .into();
826 let message = self.communication.create_internal_message(
827 self.system_agent_id,
828 agent_id,
829 payload_data,
830 types::MessageType::Direct(agent_id),
831 std::time::Duration::from_secs(300),
832 );
833 self.communication
834 .send_message(message)
835 .await
836 .map_err(RuntimeError::Communication)?;
837
838 #[cfg(feature = "http-api")]
840 self.execution_log
841 .record(agent_id, &execution_id, "execution_started");
842
843 Ok(ExecuteAgentResponse {
844 execution_id,
845 status: "execution_started".to_string(),
846 })
847 }
848
849 async fn get_agent_history(
850 &self,
851 agent_id: AgentId,
852 ) -> Result<GetAgentHistoryResponse, RuntimeError> {
853 let entries = self.execution_log.get_history(agent_id, 100);
854 let history = entries
855 .into_iter()
856 .map(|e| AgentExecutionRecord {
857 execution_id: e.execution_id,
858 status: e.status,
859 timestamp: e.timestamp.to_rfc3339(),
860 })
861 .collect();
862 Ok(GetAgentHistoryResponse { history })
863 }
864
865 async fn list_schedules(&self) -> Result<Vec<ScheduleSummary>, RuntimeError> {
868 #[cfg(feature = "cron")]
869 if let Some(ref cron) = self.cron_scheduler {
870 let jobs = cron
871 .list_jobs()
872 .await
873 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
874 return Ok(jobs
875 .into_iter()
876 .map(|j| ScheduleSummary {
877 job_id: j.job_id.to_string(),
878 name: j.name,
879 cron_expression: j.cron_expression,
880 timezone: j.timezone,
881 status: format!("{:?}", j.status),
882 enabled: j.enabled,
883 next_run: j.next_run.map(|t| t.to_rfc3339()),
884 run_count: j.run_count,
885 })
886 .collect());
887 }
888 Ok(vec![])
889 }
890
891 async fn create_schedule(
892 &self,
893 request: CreateScheduleRequest,
894 ) -> Result<CreateScheduleResponse, RuntimeError> {
895 #[cfg(feature = "cron")]
896 if let Some(ref cron) = self.cron_scheduler {
897 use scheduler::cron_types::{CronJobDefinition, CronJobId};
898 let now = chrono::Utc::now();
899 let tz = if request.timezone.is_empty() {
900 "UTC".to_string()
901 } else {
902 request.timezone
903 };
904 let agent_config = types::AgentConfig {
905 id: types::AgentId::new(),
906 name: request.agent_name,
907 dsl_source: String::new(),
908 execution_mode: Default::default(),
909 security_tier: Default::default(),
910 resource_limits: Default::default(),
911 capabilities: Vec::new(),
912 policies: Vec::new(),
913 metadata: Default::default(),
914 priority: Default::default(),
915 };
916 let job = CronJobDefinition {
917 job_id: CronJobId::new(),
918 name: request.name,
919 cron_expression: request.cron_expression,
920 timezone: tz,
921 agent_config,
922 policy_ids: request.policy_ids,
923 audit_level: Default::default(),
924 status: scheduler::cron_types::CronJobStatus::Active,
925 enabled: true,
926 one_shot: request.one_shot,
927 created_at: now,
928 updated_at: now,
929 last_run: None,
930 next_run: None,
931 run_count: 0,
932 failure_count: 0,
933 max_retries: 3,
934 max_concurrent: 1,
935 delivery_config: None,
936 jitter_max_secs: 0,
937 session_mode: Default::default(),
938 agentpin_jwt: None,
939 };
940 let job_id = cron
941 .add_job(job)
942 .await
943 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
944 let saved = cron
946 .get_job(job_id)
947 .await
948 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
949 return Ok(CreateScheduleResponse {
950 job_id: job_id.to_string(),
951 next_run: saved.next_run.map(|t| t.to_rfc3339()),
952 status: "created".to_string(),
953 });
954 }
955 Err(RuntimeError::Internal(
956 "Schedule API requires a running CronScheduler".to_string(),
957 ))
958 }
959
960 async fn get_schedule(&self, job_id: &str) -> Result<ScheduleDetail, RuntimeError> {
961 #[cfg(feature = "cron")]
962 if let Some(ref cron) = self.cron_scheduler {
963 let id: scheduler::cron_types::CronJobId = job_id
964 .parse()
965 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
966 let j = cron
967 .get_job(id)
968 .await
969 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
970 return Ok(ScheduleDetail {
971 job_id: j.job_id.to_string(),
972 name: j.name,
973 cron_expression: j.cron_expression,
974 timezone: j.timezone,
975 status: format!("{:?}", j.status),
976 enabled: j.enabled,
977 one_shot: j.one_shot,
978 next_run: j.next_run.map(|t| t.to_rfc3339()),
979 last_run: j.last_run.map(|t| t.to_rfc3339()),
980 run_count: j.run_count,
981 failure_count: j.failure_count,
982 created_at: j.created_at.to_rfc3339(),
983 updated_at: j.updated_at.to_rfc3339(),
984 });
985 }
986 Err(RuntimeError::Internal(
987 "Schedule API requires a running CronScheduler".to_string(),
988 ))
989 }
990
991 async fn update_schedule(
992 &self,
993 job_id: &str,
994 request: UpdateScheduleRequest,
995 ) -> Result<ScheduleDetail, RuntimeError> {
996 #[cfg(feature = "cron")]
997 if let Some(ref cron) = self.cron_scheduler {
998 let id: scheduler::cron_types::CronJobId = job_id
999 .parse()
1000 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1001 let mut job = cron
1002 .get_job(id)
1003 .await
1004 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1005 if let Some(expr) = request.cron_expression {
1006 job.cron_expression = expr;
1007 }
1008 if let Some(tz) = request.timezone {
1009 job.timezone = tz;
1010 }
1011 if let Some(pids) = request.policy_ids {
1012 job.policy_ids = pids;
1013 }
1014 if let Some(one_shot) = request.one_shot {
1015 job.one_shot = one_shot;
1016 }
1017 cron.update_job(job)
1018 .await
1019 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1020 return self.get_schedule(job_id).await;
1021 }
1022 Err(RuntimeError::Internal(
1023 "Schedule API requires a running CronScheduler".to_string(),
1024 ))
1025 }
1026
1027 async fn delete_schedule(&self, job_id: &str) -> Result<DeleteScheduleResponse, RuntimeError> {
1028 #[cfg(feature = "cron")]
1029 if let Some(ref cron) = self.cron_scheduler {
1030 let id: scheduler::cron_types::CronJobId = job_id
1031 .parse()
1032 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1033 cron.remove_job(id)
1034 .await
1035 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1036 return Ok(DeleteScheduleResponse {
1037 job_id: job_id.to_string(),
1038 deleted: true,
1039 });
1040 }
1041 Err(RuntimeError::Internal(
1042 "Schedule API requires a running CronScheduler".to_string(),
1043 ))
1044 }
1045
1046 async fn pause_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1047 #[cfg(feature = "cron")]
1048 if let Some(ref cron) = self.cron_scheduler {
1049 let id: scheduler::cron_types::CronJobId = job_id
1050 .parse()
1051 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1052 cron.pause_job(id)
1053 .await
1054 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1055 return Ok(ScheduleActionResponse {
1056 job_id: job_id.to_string(),
1057 action: "pause".to_string(),
1058 status: "paused".to_string(),
1059 });
1060 }
1061 Err(RuntimeError::Internal(
1062 "Schedule API requires a running CronScheduler".to_string(),
1063 ))
1064 }
1065
1066 async fn resume_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1067 #[cfg(feature = "cron")]
1068 if let Some(ref cron) = self.cron_scheduler {
1069 let id: scheduler::cron_types::CronJobId = job_id
1070 .parse()
1071 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1072 cron.resume_job(id)
1073 .await
1074 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1075 return Ok(ScheduleActionResponse {
1076 job_id: job_id.to_string(),
1077 action: "resume".to_string(),
1078 status: "active".to_string(),
1079 });
1080 }
1081 Err(RuntimeError::Internal(
1082 "Schedule API requires a running CronScheduler".to_string(),
1083 ))
1084 }
1085
1086 async fn trigger_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1087 #[cfg(feature = "cron")]
1088 if let Some(ref cron) = self.cron_scheduler {
1089 let id: scheduler::cron_types::CronJobId = job_id
1090 .parse()
1091 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1092 cron.trigger_now(id)
1093 .await
1094 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1095 return Ok(ScheduleActionResponse {
1096 job_id: job_id.to_string(),
1097 action: "trigger".to_string(),
1098 status: "triggered".to_string(),
1099 });
1100 }
1101 Err(RuntimeError::Internal(
1102 "Schedule API requires a running CronScheduler".to_string(),
1103 ))
1104 }
1105
1106 async fn get_schedule_history(
1107 &self,
1108 job_id: &str,
1109 limit: usize,
1110 ) -> Result<ScheduleHistoryResponse, RuntimeError> {
1111 #[cfg(feature = "cron")]
1112 if let Some(ref cron) = self.cron_scheduler {
1113 let id: scheduler::cron_types::CronJobId = job_id
1114 .parse()
1115 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1116 let runs = cron
1117 .get_run_history(id, limit)
1118 .await
1119 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1120 return Ok(ScheduleHistoryResponse {
1121 job_id: job_id.to_string(),
1122 history: runs
1123 .into_iter()
1124 .map(|r| ScheduleRunEntry {
1125 run_id: r.run_id.to_string(),
1126 started_at: r.started_at.to_rfc3339(),
1127 completed_at: r.completed_at.map(|t| t.to_rfc3339()),
1128 status: format!("{:?}", r.status),
1129 error: r.error,
1130 execution_time_ms: r.execution_time_ms,
1131 })
1132 .collect(),
1133 });
1134 }
1135 Err(RuntimeError::Internal(
1136 "Schedule API requires a running CronScheduler".to_string(),
1137 ))
1138 }
1139
1140 async fn get_schedule_next_runs(
1141 &self,
1142 job_id: &str,
1143 count: usize,
1144 ) -> Result<NextRunsResponse, RuntimeError> {
1145 #[cfg(feature = "cron")]
1146 if let Some(ref cron) = self.cron_scheduler {
1147 let id: scheduler::cron_types::CronJobId = job_id
1148 .parse()
1149 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1150 let job = cron
1151 .get_job(id)
1152 .await
1153 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1154 let runs = cron
1155 .get_next_runs(&job.cron_expression, &job.timezone, count)
1156 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1157 return Ok(NextRunsResponse {
1158 job_id: job_id.to_string(),
1159 next_runs: runs.into_iter().map(|t| t.to_rfc3339()).collect(),
1160 });
1161 }
1162 Err(RuntimeError::Internal(
1163 "Schedule API requires a running CronScheduler".to_string(),
1164 ))
1165 }
1166
1167 async fn get_scheduler_health(
1168 &self,
1169 ) -> Result<api::types::SchedulerHealthResponse, RuntimeError> {
1170 #[cfg(feature = "cron")]
1171 if let Some(ref cron) = self.cron_scheduler {
1172 let h = cron
1173 .check_health()
1174 .await
1175 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1176 let m = cron.metrics();
1177 return Ok(api::types::SchedulerHealthResponse {
1178 is_running: h.is_running,
1179 store_accessible: h.store_accessible,
1180 jobs_total: h.jobs_total,
1181 jobs_active: h.jobs_active,
1182 jobs_paused: h.jobs_paused,
1183 jobs_dead_letter: h.jobs_dead_letter,
1184 global_active_runs: h.global_active_runs,
1185 max_concurrent: h.max_concurrent,
1186 runs_total: m.runs_total,
1187 runs_succeeded: m.runs_succeeded,
1188 runs_failed: m.runs_failed,
1189 average_execution_time_ms: m.average_execution_time_ms,
1190 longest_run_ms: m.longest_run_ms,
1191 });
1192 }
1193 Ok(api::types::SchedulerHealthResponse {
1195 is_running: false,
1196 store_accessible: false,
1197 jobs_total: 0,
1198 jobs_active: 0,
1199 jobs_paused: 0,
1200 jobs_dead_letter: 0,
1201 global_active_runs: 0,
1202 max_concurrent: 0,
1203 runs_total: 0,
1204 runs_succeeded: 0,
1205 runs_failed: 0,
1206 average_execution_time_ms: 0.0,
1207 longest_run_ms: 0,
1208 })
1209 }
1210
1211 async fn list_channels(&self) -> Result<Vec<ChannelSummary>, RuntimeError> {
1214 Ok(vec![])
1216 }
1217
1218 async fn register_channel(
1219 &self,
1220 _request: RegisterChannelRequest,
1221 ) -> Result<RegisterChannelResponse, RuntimeError> {
1222 Err(RuntimeError::Internal(
1223 "Channel API requires a running ChannelAdapterManager".to_string(),
1224 ))
1225 }
1226
1227 async fn get_channel(&self, _id: &str) -> Result<ChannelDetail, RuntimeError> {
1228 Err(RuntimeError::Internal(
1229 "Channel API requires a running ChannelAdapterManager".to_string(),
1230 ))
1231 }
1232
1233 async fn update_channel(
1234 &self,
1235 _id: &str,
1236 _request: UpdateChannelRequest,
1237 ) -> Result<ChannelDetail, RuntimeError> {
1238 Err(RuntimeError::Internal(
1239 "Channel API requires a running ChannelAdapterManager".to_string(),
1240 ))
1241 }
1242
1243 async fn delete_channel(&self, _id: &str) -> Result<DeleteChannelResponse, RuntimeError> {
1244 Err(RuntimeError::Internal(
1245 "Channel API requires a running ChannelAdapterManager".to_string(),
1246 ))
1247 }
1248
1249 async fn start_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1250 Err(RuntimeError::Internal(
1251 "Channel API requires a running ChannelAdapterManager".to_string(),
1252 ))
1253 }
1254
1255 async fn stop_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1256 Err(RuntimeError::Internal(
1257 "Channel API requires a running ChannelAdapterManager".to_string(),
1258 ))
1259 }
1260
1261 async fn get_channel_health(&self, _id: &str) -> Result<ChannelHealthResponse, RuntimeError> {
1262 Err(RuntimeError::Internal(
1263 "Channel API requires a running ChannelAdapterManager".to_string(),
1264 ))
1265 }
1266
1267 async fn list_channel_mappings(
1268 &self,
1269 _id: &str,
1270 ) -> Result<Vec<IdentityMappingEntry>, RuntimeError> {
1271 Err(RuntimeError::Internal(
1272 "Channel identity mappings require enterprise edition".to_string(),
1273 ))
1274 }
1275
1276 async fn add_channel_mapping(
1277 &self,
1278 _id: &str,
1279 _request: AddIdentityMappingRequest,
1280 ) -> Result<IdentityMappingEntry, RuntimeError> {
1281 Err(RuntimeError::Internal(
1282 "Channel identity mappings require enterprise edition".to_string(),
1283 ))
1284 }
1285
1286 async fn remove_channel_mapping(&self, _id: &str, _user_id: &str) -> Result<(), RuntimeError> {
1287 Err(RuntimeError::Internal(
1288 "Channel identity mappings require enterprise edition".to_string(),
1289 ))
1290 }
1291
1292 async fn get_channel_audit(
1293 &self,
1294 _id: &str,
1295 _limit: usize,
1296 ) -> Result<ChannelAuditResponse, RuntimeError> {
1297 Err(RuntimeError::Internal(
1298 "Channel audit log requires enterprise edition".to_string(),
1299 ))
1300 }
1301
1302 async fn update_agent_heartbeat(
1305 &self,
1306 agent_id: AgentId,
1307 heartbeat: api::types::HeartbeatRequest,
1308 ) -> Result<(), RuntimeError> {
1309 let ext_agents = self.scheduler.external_agents();
1310 let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1311 RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1312 })?;
1313
1314 entry.last_heartbeat = Some(chrono::Utc::now());
1315 entry.reported_state = heartbeat.state;
1316
1317 if let Some(metadata) = heartbeat.metadata {
1318 entry.metadata.extend(metadata);
1319 }
1320 if let Some(last_result) = heartbeat.last_result {
1321 entry.last_result = Some(last_result);
1322 }
1323
1324 Ok(())
1325 }
1326
1327 async fn push_agent_event(
1328 &self,
1329 agent_id: AgentId,
1330 event: api::types::PushEventRequest,
1331 ) -> Result<(), RuntimeError> {
1332 let ext_agents = self.scheduler.external_agents();
1333 let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1334 RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1335 })?;
1336
1337 entry.push_event(api::types::AgentEvent {
1338 event_type: event.event_type,
1339 payload: event.payload,
1340 timestamp: chrono::Utc::now(),
1341 });
1342
1343 Ok(())
1344 }
1345
1346 async fn check_unreachable_agents(&self) {
1347 self.scheduler.check_unreachable_agents();
1348 }
1349}