1pub mod communication;
7pub mod config;
8pub mod context;
9pub mod crypto;
10pub mod env;
11pub mod error_handler;
12pub mod integrations;
13pub mod lifecycle;
14pub mod logging;
15pub mod metrics;
16pub mod models;
17pub mod net_guard;
18pub mod rag;
19pub mod reasoning;
20pub mod resource;
21pub mod routing;
22pub mod sandbox;
23pub mod scheduler;
24pub mod secrets;
25pub mod skills;
26pub mod toolclad;
27pub mod types;
28
29pub mod prelude;
30
31#[cfg(feature = "cli-executor")]
32pub mod cli_executor;
33
34#[cfg(feature = "http-api")]
35pub mod api;
36
37#[cfg(feature = "http-api")]
38use api::traits::RuntimeApiProvider;
39#[cfg(all(feature = "http-api", feature = "cron"))]
40use api::types::ScheduleRunEntry;
41#[cfg(feature = "http-api")]
42use api::types::{
43 AddIdentityMappingRequest, AgentExecutionRecord, AgentStatusResponse, ChannelActionResponse,
44 ChannelAuditResponse, ChannelDetail, ChannelHealthResponse, ChannelSummary, CreateAgentRequest,
45 CreateAgentResponse, CreateScheduleRequest, CreateScheduleResponse, DeleteAgentResponse,
46 DeleteChannelResponse, DeleteScheduleResponse, ExecuteAgentRequest, ExecuteAgentResponse,
47 GetAgentHistoryResponse, IdentityMappingEntry, NextRunsResponse, RegisterChannelRequest,
48 RegisterChannelResponse, ScheduleActionResponse, ScheduleDetail, ScheduleHistoryResponse,
49 ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse, UpdateChannelRequest,
50 UpdateScheduleRequest, WorkflowExecutionRequest,
51};
52#[cfg(feature = "http-api")]
53use async_trait::async_trait;
54
55#[cfg(feature = "http-input")]
56pub mod http_input;
57
58pub use communication::{CommunicationBus, CommunicationConfig, DefaultCommunicationBus};
60pub use config::SecurityConfig;
61pub use context::{ContextManager, ContextManagerConfig, StandardContextManager};
62pub use error_handler::{DefaultErrorHandler, ErrorHandler, ErrorHandlerConfig};
63pub use lifecycle::{DefaultLifecycleController, LifecycleConfig, LifecycleController};
64pub use logging::{LoggingConfig, ModelInteractionType, ModelLogger, RequestData, ResponseData};
65pub use models::{ModelCatalog, ModelCatalogError, SlmRunner, SlmRunnerError};
66pub use resource::{DefaultResourceManager, ResourceManager, ResourceManagerConfig};
67pub use routing::{
68 DefaultRoutingEngine, RouteDecision, RoutingConfig, RoutingContext, RoutingEngine, TaskType,
69};
70pub use sandbox::{E2BSandbox, ExecutionResult, SandboxRunner, SandboxTier};
71#[cfg(feature = "cron")]
72pub use scheduler::{
73 cron_scheduler::{
74 CronMetrics, CronScheduler, CronSchedulerConfig, CronSchedulerError, CronSchedulerHealth,
75 },
76 cron_types::{
77 AuditLevel, CronJobDefinition, CronJobId, CronJobStatus, DeliveryChannel, DeliveryConfig,
78 DeliveryReceipt, JobRunRecord, JobRunStatus,
79 },
80 delivery::{CustomDeliveryHandler, DefaultDeliveryRouter, DeliveryResult, DeliveryRouter},
81 heartbeat::{
82 HeartbeatAssessment, HeartbeatConfig, HeartbeatContextMode, HeartbeatSeverity,
83 HeartbeatState,
84 },
85 job_store::{JobStore, JobStoreError, SqliteJobStore},
86 policy_gate::{
87 PolicyGate, ScheduleContext, SchedulePolicyCondition, SchedulePolicyDecision,
88 SchedulePolicyEffect, SchedulePolicyRule,
89 },
90};
91pub use scheduler::{AgentScheduler, DefaultAgentScheduler, SchedulerConfig};
92pub use secrets::{SecretStore, SecretsConfig};
93pub use types::*;
94
95use std::sync::Arc;
96use tokio::sync::RwLock;
97
98#[cfg(feature = "http-api")]
105pub struct ExecutionLog {
106 entries: parking_lot::RwLock<std::collections::VecDeque<ExecutionEntry>>,
107 capacity: usize,
108}
109
110#[cfg(feature = "http-api")]
111#[derive(Debug, Clone)]
112struct ExecutionEntry {
113 agent_id: AgentId,
114 execution_id: String,
115 status: String,
116 timestamp: chrono::DateTime<chrono::Utc>,
117}
118
119#[cfg(feature = "http-api")]
120impl ExecutionLog {
121 fn new(capacity: usize) -> Self {
122 Self {
123 entries: parking_lot::RwLock::new(std::collections::VecDeque::with_capacity(capacity)),
124 capacity,
125 }
126 }
127
128 fn record(&self, agent_id: AgentId, execution_id: &str, status: &str) {
130 let entry = ExecutionEntry {
131 agent_id,
132 execution_id: execution_id.to_string(),
133 status: status.to_string(),
134 timestamp: chrono::Utc::now(),
135 };
136 let mut entries = self.entries.write();
137 if entries.len() >= self.capacity {
138 entries.pop_front();
139 }
140 entries.push_back(entry);
141 }
142
143 fn get_history(&self, agent_id: AgentId, limit: usize) -> Vec<ExecutionEntry> {
145 let entries = self.entries.read();
146 entries
147 .iter()
148 .rev()
149 .filter(|e| e.agent_id == agent_id)
150 .take(limit)
151 .cloned()
152 .collect()
153 }
154}
155
156#[derive(Clone)]
158pub struct AgentRuntime {
159 pub scheduler: Arc<dyn scheduler::AgentScheduler + Send + Sync>,
160 pub lifecycle: Arc<dyn lifecycle::LifecycleController + Send + Sync>,
161 pub resource_manager: Arc<dyn resource::ResourceManager + Send + Sync>,
162 pub communication: Arc<dyn communication::CommunicationBus + Send + Sync>,
163 pub error_handler: Arc<dyn error_handler::ErrorHandler + Send + Sync>,
164 pub context_manager: Arc<dyn context::ContextManager + Send + Sync>,
165 pub model_logger: Option<Arc<logging::ModelLogger>>,
166 pub model_catalog: Option<Arc<models::ModelCatalog>>,
167 pub system_agent_id: AgentId,
171 pub agentpin_verifier: Option<Arc<dyn integrations::AgentPinVerifier>>,
177 #[cfg(feature = "cron")]
178 cron_scheduler: Option<Arc<scheduler::cron_scheduler::CronScheduler>>,
179 config: Arc<RwLock<RuntimeConfig>>,
180 #[cfg(feature = "http-api")]
182 execution_log: Arc<ExecutionLog>,
183}
184
185impl AgentRuntime {
186 pub async fn new(config: RuntimeConfig) -> Result<Self, RuntimeError> {
188 let config = Arc::new(RwLock::new(config));
189
190 let scheduler = Arc::new(
192 scheduler::DefaultAgentScheduler::new(config.read().await.scheduler.clone()).await?,
193 );
194
195 let resource_manager = Arc::new(
196 resource::DefaultResourceManager::new(config.read().await.resource_manager.clone())
197 .await?,
198 );
199
200 let communication = Arc::new(
201 communication::DefaultCommunicationBus::new(config.read().await.communication.clone())
202 .await?,
203 );
204
205 let error_handler = Arc::new(
206 error_handler::DefaultErrorHandler::new(config.read().await.error_handler.clone())
207 .await?,
208 );
209
210 let lifecycle_config = lifecycle::LifecycleConfig {
211 max_agents: 1000,
212 initialization_timeout: std::time::Duration::from_secs(30),
213 termination_timeout: std::time::Duration::from_secs(30),
214 state_check_interval: std::time::Duration::from_secs(10),
215 enable_auto_recovery: true,
216 max_restart_attempts: 3,
217 };
218 let lifecycle =
219 Arc::new(lifecycle::DefaultLifecycleController::new(lifecycle_config).await?);
220
221 let context_manager = Arc::new(
222 context::StandardContextManager::new(
223 config.read().await.context_manager.clone(),
224 "runtime-system",
225 )
226 .await
227 .map_err(|e| {
228 RuntimeError::Internal(format!("Failed to create context manager: {}", e))
229 })?,
230 );
231
232 context_manager.initialize().await.map_err(|e| {
234 RuntimeError::Internal(format!("Failed to initialize context manager: {}", e))
235 })?;
236
237 let model_logger = if config.read().await.logging.enabled {
239 match logging::ModelLogger::new(config.read().await.logging.clone(), None) {
241 Ok(logger) => {
242 tracing::info!("Model logging initialized successfully");
243 Some(Arc::new(logger))
244 }
245 Err(e) => {
246 tracing::warn!("Failed to initialize model logger: {}", e);
247 None
248 }
249 }
250 } else {
251 tracing::info!("Model logging is disabled");
252 None
253 };
254
255 let model_catalog = if let Some(ref slm_config) = config.read().await.slm {
257 if slm_config.enabled {
258 match models::ModelCatalog::new(slm_config.clone()) {
259 Ok(catalog) => {
260 tracing::info!(
261 "Model catalog initialized with {} models",
262 catalog.list_models().len()
263 );
264 Some(Arc::new(catalog))
265 }
266 Err(e) => {
267 tracing::warn!("Failed to initialize model catalog: {}", e);
268 None
269 }
270 }
271 } else {
272 tracing::info!("SLM support is disabled");
273 None
274 }
275 } else {
276 tracing::info!("No SLM configuration provided");
277 None
278 };
279
280 let agentpin_verifier: Option<Arc<dyn integrations::AgentPinVerifier>> = {
285 let cfg = config.read().await;
286 match cfg.agentpin.as_ref() {
287 Some(ap_cfg) if ap_cfg.enabled => {
288 let verifier = integrations::DefaultAgentPinVerifier::new(ap_cfg.clone())
289 .map_err(|e| {
290 RuntimeError::Internal(format!(
291 "Failed to construct AgentPin verifier: {}",
292 e
293 ))
294 })?;
295 tracing::info!(
296 "AgentPin verifier enabled (discovery_mode={:?}, audience={:?})",
297 ap_cfg.discovery_mode,
298 ap_cfg.audience
299 );
300 Some(Arc::new(verifier) as Arc<dyn integrations::AgentPinVerifier>)
301 }
302 Some(_) => {
303 tracing::info!("AgentPin configured but disabled; identity checks skipped");
304 None
305 }
306 None => None,
307 }
308 };
309
310 Ok(Self {
311 scheduler,
312 lifecycle,
313 resource_manager,
314 communication,
315 error_handler,
316 context_manager,
317 model_logger,
318 model_catalog,
319 system_agent_id: AgentId::new(),
320 agentpin_verifier,
321 #[cfg(feature = "cron")]
322 cron_scheduler: None,
323 config,
324 #[cfg(feature = "http-api")]
325 execution_log: Arc::new(ExecutionLog::new(10_000)),
326 })
327 }
328
329 #[cfg(feature = "cron")]
331 pub fn with_cron_scheduler(
332 mut self,
333 cron: Arc<scheduler::cron_scheduler::CronScheduler>,
334 ) -> Self {
335 self.cron_scheduler = Some(cron);
336 self
337 }
338
339 pub async fn get_config(&self) -> RuntimeConfig {
341 self.config.read().await.clone()
342 }
343
344 pub async fn update_config(&self, config: RuntimeConfig) -> Result<(), RuntimeError> {
346 *self.config.write().await = config;
347 Ok(())
348 }
349
350 pub async fn verify_agentpin_for_agent(
357 &self,
358 jwt: Option<&str>,
359 agent_id: AgentId,
360 ) -> Result<(), RuntimeError> {
361 let Some(verifier) = self.agentpin_verifier.as_ref() else {
362 if jwt.is_some() {
363 tracing::warn!("AgentPin JWT supplied but verifier is disabled on this runtime");
364 }
365 return Ok(());
366 };
367 let jwt = jwt.ok_or_else(|| {
368 RuntimeError::Authentication(
369 "AgentPin verification is enabled; agentpin_jwt is required".to_string(),
370 )
371 })?;
372 let result = verifier
373 .verify_credential(jwt)
374 .await
375 .map_err(|e| RuntimeError::Authentication(format!("AgentPin: {}", e)))?;
376 if !result.valid {
377 return Err(RuntimeError::Authentication(format!(
378 "AgentPin credential invalid: {}",
379 result
380 .error_message
381 .unwrap_or_else(|| "no reason".to_string())
382 )));
383 }
384 let expected = agent_id.0.to_string();
385 let sub_matches = result
386 .agent_id
387 .as_deref()
388 .map(|sub| sub == expected)
389 .unwrap_or(false);
390 if !sub_matches {
391 return Err(RuntimeError::Authentication(format!(
392 "AgentPin JWT does not cover agent {}: sub={:?}",
393 expected, result.agent_id
394 )));
395 }
396 Ok(())
397 }
398
399 pub async fn shutdown(&self) -> Result<(), RuntimeError> {
401 tracing::info!("Starting Agent Runtime shutdown sequence");
402
403 self.lifecycle
405 .shutdown()
406 .await
407 .map_err(RuntimeError::Lifecycle)?;
408 self.communication
409 .shutdown()
410 .await
411 .map_err(RuntimeError::Communication)?;
412 self.resource_manager
413 .shutdown()
414 .await
415 .map_err(RuntimeError::Resource)?;
416 self.scheduler
417 .shutdown()
418 .await
419 .map_err(RuntimeError::Scheduler)?;
420 self.error_handler
421 .shutdown()
422 .await
423 .map_err(RuntimeError::ErrorHandler)?;
424
425 self.context_manager.shutdown().await.map_err(|e| {
427 RuntimeError::Internal(format!("Context manager shutdown failed: {}", e))
428 })?;
429
430 tracing::info!("Agent Runtime shutdown completed successfully");
431 Ok(())
432 }
433
434 pub async fn get_status(&self) -> SystemStatus {
436 self.scheduler.get_system_status().await
437 }
438}
439
440#[derive(Debug, Clone, Default)]
442pub struct RuntimeConfig {
443 pub scheduler: scheduler::SchedulerConfig,
444 pub resource_manager: resource::ResourceManagerConfig,
445 pub communication: communication::CommunicationConfig,
446 pub context_manager: context::ContextManagerConfig,
447 pub security: SecurityConfig,
448 pub audit: AuditConfig,
449 pub error_handler: error_handler::ErrorHandlerConfig,
450 pub logging: logging::LoggingConfig,
451 pub slm: Option<config::Slm>,
452 pub routing: Option<routing::RoutingConfig>,
453 pub agentpin: Option<crate::integrations::AgentPinConfig>,
457}
458
459#[cfg(feature = "http-api")]
461#[async_trait]
462#[allow(unused_variables)] impl RuntimeApiProvider for AgentRuntime {
464 async fn execute_workflow(
465 &self,
466 request: WorkflowExecutionRequest,
467 ) -> Result<serde_json::Value, RuntimeError> {
468 tracing::info!("Executing workflow: {}", request.workflow_id);
469
470 let workflow_dsl = &request.workflow_id; let (metadata, agent_config) = {
473 let parsed_tree = dsl::parse_dsl(workflow_dsl)
474 .map_err(|e| RuntimeError::Internal(format!("DSL parsing failed: {}", e)))?;
475
476 let metadata = dsl::extract_metadata(&parsed_tree, workflow_dsl);
478
479 let root_node = parsed_tree.root_node();
481 if root_node.has_error() {
482 return Err(RuntimeError::Internal(
483 "DSL contains syntax errors".to_string(),
484 ));
485 }
486
487 let agent_id = request.agent_id.unwrap_or_default();
489 let agent_config = AgentConfig {
490 id: agent_id,
491 name: metadata
492 .get("name")
493 .cloned()
494 .unwrap_or_else(|| "workflow_agent".to_string()),
495 dsl_source: workflow_dsl.to_string(),
496 execution_mode: ExecutionMode::Ephemeral,
497 security_tier: SecurityTier::Tier1,
498 resource_limits: ResourceLimits::default(),
499 capabilities: vec![Capability::Computation], policies: vec![],
501 metadata: metadata.clone(),
502 priority: Priority::Normal,
503 };
504
505 (metadata, agent_config)
506 };
507
508 let scheduled_agent_id = self
510 .scheduler
511 .schedule_agent(agent_config)
512 .await
513 .map_err(RuntimeError::Scheduler)?;
514
515 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
517
518 let system_status = self.scheduler.get_system_status().await;
520
521 let mut result = serde_json::json!({
523 "status": "success",
524 "workflow_id": request.workflow_id,
525 "agent_id": scheduled_agent_id.to_string(),
526 "execution_started": true,
527 "metadata": metadata,
528 "system_status": {
529 "total_agents": system_status.total_agents,
530 "running_agents": system_status.running_agents,
531 "resource_utilization": {
532 "memory_used": system_status.resource_utilization.memory_used,
533 "cpu_utilization": system_status.resource_utilization.cpu_utilization,
534 "disk_io_rate": system_status.resource_utilization.disk_io_rate,
535 "network_io_rate": system_status.resource_utilization.network_io_rate
536 }
537 }
538 });
539
540 if !request.parameters.is_null() {
542 result["parameters"] = request.parameters;
543 }
544
545 self.execution_log.record(
547 scheduled_agent_id,
548 &scheduled_agent_id.to_string(),
549 "workflow_started",
550 );
551
552 tracing::info!(
553 "Workflow execution initiated for agent: {}",
554 scheduled_agent_id
555 );
556 Ok(result)
557 }
558
559 async fn get_agent_status(
560 &self,
561 agent_id: AgentId,
562 ) -> Result<AgentStatusResponse, RuntimeError> {
563 let external_state = self.scheduler.get_external_agent_state(agent_id);
564 let agent_config = self.scheduler.get_agent_config(agent_id);
565
566 match self.scheduler.get_agent_status(agent_id).await {
567 Ok(agent_status) => {
568 let last_activity =
570 chrono::DateTime::<chrono::Utc>::from(agent_status.last_activity);
571
572 let execution_mode_label = agent_config.as_ref().map(|c| match &c.execution_mode {
573 crate::types::agent::ExecutionMode::External { .. } => "External".to_string(),
574 crate::types::agent::ExecutionMode::Persistent => "Persistent".to_string(),
575 crate::types::agent::ExecutionMode::Ephemeral => "Ephemeral".to_string(),
576 crate::types::agent::ExecutionMode::Scheduled { .. } => "Scheduled".to_string(),
577 crate::types::agent::ExecutionMode::CronScheduled { .. } => {
578 "CronScheduled".to_string()
579 }
580 crate::types::agent::ExecutionMode::EventDriven => "EventDriven".to_string(),
581 });
582
583 let (metadata, last_result, recent_events) = if let Some(ref ext) = external_state {
584 (
585 Some(ext.metadata.clone()),
586 ext.last_result.clone(),
587 Some(ext.events.iter().cloned().collect::<Vec<_>>()),
588 )
589 } else {
590 (None, None, None)
591 };
592
593 Ok(AgentStatusResponse {
594 agent_id: agent_status.agent_id,
595 state: agent_status.state,
596 last_activity,
597 resource_usage: api::types::ResourceUsage {
598 memory_bytes: agent_status.memory_usage,
599 cpu_percent: agent_status.cpu_usage,
600 active_tasks: agent_status.active_tasks,
601 },
602 metadata,
603 last_result,
604 recent_events,
605 execution_mode: execution_mode_label,
606 })
607 }
608 Err(scheduler_error) => {
609 tracing::warn!(
610 "Failed to get agent status for {}: {}",
611 agent_id,
612 scheduler_error
613 );
614 Err(RuntimeError::Internal(format!(
615 "Agent {} not found",
616 agent_id
617 )))
618 }
619 }
620 }
621
622 async fn get_system_health(&self) -> Result<serde_json::Value, RuntimeError> {
623 let scheduler_health =
625 self.scheduler.check_health().await.map_err(|e| {
626 RuntimeError::Internal(format!("Scheduler health check failed: {}", e))
627 })?;
628
629 let lifecycle_health =
630 self.lifecycle.check_health().await.map_err(|e| {
631 RuntimeError::Internal(format!("Lifecycle health check failed: {}", e))
632 })?;
633
634 let resource_health = self.resource_manager.check_health().await.map_err(|e| {
635 RuntimeError::Internal(format!("Resource manager health check failed: {}", e))
636 })?;
637
638 let communication_health = self.communication.check_health().await.map_err(|e| {
639 RuntimeError::Internal(format!("Communication health check failed: {}", e))
640 })?;
641
642 let component_healths = vec![
644 ("scheduler", &scheduler_health),
645 ("lifecycle", &lifecycle_health),
646 ("resource_manager", &resource_health),
647 ("communication", &communication_health),
648 ];
649
650 let overall_status = if component_healths
651 .iter()
652 .all(|(_, h)| h.status == HealthStatus::Healthy)
653 {
654 "healthy"
655 } else if component_healths
656 .iter()
657 .any(|(_, h)| h.status == HealthStatus::Unhealthy)
658 {
659 "unhealthy"
660 } else {
661 "degraded"
662 };
663
664 let mut components = serde_json::Map::new();
666 for (name, health) in component_healths {
667 let component_info = serde_json::json!({
668 "status": match health.status {
669 HealthStatus::Healthy => "healthy",
670 HealthStatus::Degraded => "degraded",
671 HealthStatus::Unhealthy => "unhealthy",
672 },
673 "message": health.message,
674 "last_check": health.last_check
675 .duration_since(std::time::UNIX_EPOCH)
676 .unwrap_or_default()
677 .as_secs(),
678 "uptime_seconds": health.uptime.as_secs(),
679 "metrics": health.metrics
680 });
681 components.insert(name.to_string(), component_info);
682 }
683
684 Ok(serde_json::json!({
685 "status": overall_status,
686 "timestamp": std::time::SystemTime::now()
687 .duration_since(std::time::UNIX_EPOCH)
688 .unwrap_or_default()
689 .as_secs(),
690 "components": components
691 }))
692 }
693
694 async fn list_agents(&self) -> Result<Vec<AgentId>, RuntimeError> {
695 Ok(self.scheduler.list_agents().await)
696 }
697
698 async fn list_agents_detailed(
699 &self,
700 ) -> Result<Vec<crate::api::types::AgentSummary>, RuntimeError> {
701 let ids = self.scheduler.list_agents().await;
702 let mut summaries = Vec::new();
703 for id in ids {
704 let name = self
705 .scheduler
706 .get_agent_config(id)
707 .map(|c| c.name)
708 .unwrap_or_default();
709 let state = self
710 .scheduler
711 .get_agent_status(id)
712 .await
713 .map(|s| s.state)
714 .unwrap_or(AgentState::Created);
715 summaries.push(crate::api::types::AgentSummary { id, name, state });
716 }
717 Ok(summaries)
718 }
719
720 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), RuntimeError> {
721 self.scheduler
722 .shutdown_agent(agent_id)
723 .await
724 .map_err(RuntimeError::Scheduler)
725 }
726
727 async fn get_metrics(&self) -> Result<serde_json::Value, RuntimeError> {
728 let status = self.get_status().await;
729
730 let agent_ids = self.scheduler.list_agents().await;
732 let mut error_count: usize = 0;
733 for aid in &agent_ids {
734 if let Ok(agent_status) = self.scheduler.get_agent_status(*aid).await {
735 if agent_status.state == AgentState::Failed {
736 error_count += 1;
737 }
738 }
739 }
740
741 let idle = status
742 .total_agents
743 .saturating_sub(status.running_agents)
744 .saturating_sub(error_count);
745
746 Ok(serde_json::json!({
747 "agents": {
748 "total": status.total_agents,
749 "running": status.running_agents,
750 "idle": idle,
751 "error": error_count
752 },
753 "system": {
754 "uptime_seconds": status.uptime.as_secs(),
755 "memory_usage": status.resource_utilization.memory_used,
756 "cpu_usage": status.resource_utilization.cpu_utilization
757 }
758 }))
759 }
760
761 async fn create_agent(
762 &self,
763 request: CreateAgentRequest,
764 ) -> Result<CreateAgentResponse, RuntimeError> {
765 if request.name.is_empty() {
767 return Err(RuntimeError::Internal(
768 "Agent name cannot be empty".to_string(),
769 ));
770 }
771
772 let execution_mode = request.execution_mode.clone().unwrap_or_default();
773
774 let dsl_source = match &execution_mode {
775 crate::types::agent::ExecutionMode::External { .. } => {
776 request.dsl.clone().unwrap_or_default()
777 }
778 _ => {
779 let dsl = request.dsl.as_deref().unwrap_or("");
780 if dsl.is_empty() {
781 return Err(RuntimeError::Internal(
782 "Agent DSL cannot be empty for non-external agents".to_string(),
783 ));
784 }
785 dsl.to_string()
786 }
787 };
788
789 let agent_id = AgentId::new();
791 let agent_config = AgentConfig {
792 id: agent_id,
793 name: request.name,
794 dsl_source,
795 execution_mode,
796 security_tier: SecurityTier::Tier1,
797 resource_limits: ResourceLimits::default(),
798 capabilities: request
799 .capabilities
800 .unwrap_or_default()
801 .into_iter()
802 .map(Capability::Custom)
803 .collect(),
804 policies: vec![],
805 metadata: request.metadata.unwrap_or_default(),
806 priority: Priority::Normal,
807 };
808
809 let scheduled_agent_id = self
811 .scheduler
812 .schedule_agent(agent_config)
813 .await
814 .map_err(RuntimeError::Scheduler)?;
815
816 tracing::info!("Created and scheduled agent: {}", scheduled_agent_id);
817
818 self.execution_log.record(
820 scheduled_agent_id,
821 &scheduled_agent_id.to_string(),
822 "created",
823 );
824
825 Ok(CreateAgentResponse {
826 id: scheduled_agent_id.to_string(),
827 status: "scheduled".to_string(),
828 })
829 }
830
831 async fn update_agent(
832 &self,
833 agent_id: AgentId,
834 request: UpdateAgentRequest,
835 ) -> Result<UpdateAgentResponse, RuntimeError> {
836 if request.name.is_none() && request.dsl.is_none() {
838 return Err(RuntimeError::Internal(
839 "At least one field (name or dsl) must be provided for update".to_string(),
840 ));
841 }
842
843 if let Some(ref name) = request.name {
845 if name.is_empty() {
846 return Err(RuntimeError::Internal(
847 "Agent name cannot be empty".to_string(),
848 ));
849 }
850 }
851
852 if let Some(ref dsl) = request.dsl {
853 if dsl.is_empty() {
854 return Err(RuntimeError::Internal(
855 "Agent DSL cannot be empty".to_string(),
856 ));
857 }
858 }
859
860 self.scheduler
862 .update_agent(agent_id, request)
863 .await
864 .map_err(RuntimeError::Scheduler)?;
865
866 tracing::info!("Successfully updated agent: {}", agent_id);
867
868 Ok(UpdateAgentResponse {
869 id: agent_id.to_string(),
870 status: "updated".to_string(),
871 })
872 }
873
874 async fn delete_agent(&self, agent_id: AgentId) -> Result<DeleteAgentResponse, RuntimeError> {
875 self.scheduler
876 .delete_agent(agent_id)
877 .await
878 .map_err(RuntimeError::Scheduler)?;
879
880 Ok(DeleteAgentResponse {
881 id: agent_id.to_string(),
882 status: "deleted".to_string(),
883 })
884 }
885
886 async fn execute_agent(
887 &self,
888 agent_id: AgentId,
889 request: ExecuteAgentRequest,
890 ) -> Result<ExecuteAgentResponse, RuntimeError> {
891 if !self.scheduler.has_agent(agent_id) {
893 return Err(RuntimeError::Internal(format!(
894 "Agent {} not found",
895 agent_id
896 )));
897 }
898
899 let status = self.get_agent_status(agent_id).await?;
901 if status.state == AgentState::Completed {
902 if let Some(config) = self.scheduler.get_agent_config(agent_id) {
903 self.scheduler
904 .schedule_agent(config)
905 .await
906 .map_err(RuntimeError::Scheduler)?;
907 }
908 } else if status.state != AgentState::Running {
909 self.lifecycle
910 .start_agent(agent_id)
911 .await
912 .map_err(RuntimeError::Lifecycle)?;
913 }
914 let execution_id = uuid::Uuid::new_v4().to_string();
915 let payload_data: bytes::Bytes = serde_json::to_vec(&request)
916 .map_err(|e| RuntimeError::Internal(e.to_string()))?
917 .into();
918 let message = self.communication.create_internal_message(
919 self.system_agent_id,
920 agent_id,
921 payload_data,
922 types::MessageType::Direct(agent_id),
923 std::time::Duration::from_secs(300),
924 );
925 self.communication
926 .send_message(message)
927 .await
928 .map_err(RuntimeError::Communication)?;
929
930 #[cfg(feature = "http-api")]
932 self.execution_log
933 .record(agent_id, &execution_id, "execution_started");
934
935 Ok(ExecuteAgentResponse {
936 execution_id,
937 status: "execution_started".to_string(),
938 })
939 }
940
941 async fn get_agent_history(
942 &self,
943 agent_id: AgentId,
944 ) -> Result<GetAgentHistoryResponse, RuntimeError> {
945 let entries = self.execution_log.get_history(agent_id, 100);
946 let history = entries
947 .into_iter()
948 .map(|e| AgentExecutionRecord {
949 execution_id: e.execution_id,
950 status: e.status,
951 timestamp: e.timestamp.to_rfc3339(),
952 })
953 .collect();
954 Ok(GetAgentHistoryResponse { history })
955 }
956
957 async fn list_schedules(&self) -> Result<Vec<ScheduleSummary>, RuntimeError> {
960 #[cfg(feature = "cron")]
961 if let Some(ref cron) = self.cron_scheduler {
962 let jobs = cron
963 .list_jobs()
964 .await
965 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
966 return Ok(jobs
967 .into_iter()
968 .map(|j| ScheduleSummary {
969 job_id: j.job_id.to_string(),
970 name: j.name,
971 cron_expression: j.cron_expression,
972 timezone: j.timezone,
973 status: format!("{:?}", j.status),
974 enabled: j.enabled,
975 next_run: j.next_run.map(|t| t.to_rfc3339()),
976 run_count: j.run_count,
977 })
978 .collect());
979 }
980 Ok(vec![])
981 }
982
983 async fn create_schedule(
984 &self,
985 request: CreateScheduleRequest,
986 ) -> Result<CreateScheduleResponse, RuntimeError> {
987 #[cfg(feature = "cron")]
988 if let Some(ref cron) = self.cron_scheduler {
989 use scheduler::cron_types::{CronJobDefinition, CronJobId};
990 let now = chrono::Utc::now();
991 let tz = if request.timezone.is_empty() {
992 "UTC".to_string()
993 } else {
994 request.timezone
995 };
996 let agent_config = types::AgentConfig {
997 id: types::AgentId::new(),
998 name: request.agent_name,
999 dsl_source: String::new(),
1000 execution_mode: Default::default(),
1001 security_tier: Default::default(),
1002 resource_limits: Default::default(),
1003 capabilities: Vec::new(),
1004 policies: Vec::new(),
1005 metadata: Default::default(),
1006 priority: Default::default(),
1007 };
1008 let job = CronJobDefinition {
1009 job_id: CronJobId::new(),
1010 name: request.name,
1011 cron_expression: request.cron_expression,
1012 timezone: tz,
1013 agent_config,
1014 policy_ids: request.policy_ids,
1015 audit_level: Default::default(),
1016 status: scheduler::cron_types::CronJobStatus::Active,
1017 enabled: true,
1018 one_shot: request.one_shot,
1019 created_at: now,
1020 updated_at: now,
1021 last_run: None,
1022 next_run: None,
1023 run_count: 0,
1024 failure_count: 0,
1025 max_retries: 3,
1026 max_concurrent: 1,
1027 delivery_config: None,
1028 jitter_max_secs: 0,
1029 session_mode: Default::default(),
1030 agentpin_jwt: None,
1031 };
1032 let job_id = cron
1033 .add_job(job)
1034 .await
1035 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1036 let saved = cron
1038 .get_job(job_id)
1039 .await
1040 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1041 return Ok(CreateScheduleResponse {
1042 job_id: job_id.to_string(),
1043 next_run: saved.next_run.map(|t| t.to_rfc3339()),
1044 status: "created".to_string(),
1045 });
1046 }
1047 Err(RuntimeError::Internal(
1048 "Schedule API requires a running CronScheduler".to_string(),
1049 ))
1050 }
1051
1052 async fn get_schedule(&self, job_id: &str) -> Result<ScheduleDetail, RuntimeError> {
1053 #[cfg(feature = "cron")]
1054 if let Some(ref cron) = self.cron_scheduler {
1055 let id: scheduler::cron_types::CronJobId = job_id
1056 .parse()
1057 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1058 let j = cron
1059 .get_job(id)
1060 .await
1061 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1062 return Ok(ScheduleDetail {
1063 job_id: j.job_id.to_string(),
1064 name: j.name,
1065 cron_expression: j.cron_expression,
1066 timezone: j.timezone,
1067 status: format!("{:?}", j.status),
1068 enabled: j.enabled,
1069 one_shot: j.one_shot,
1070 next_run: j.next_run.map(|t| t.to_rfc3339()),
1071 last_run: j.last_run.map(|t| t.to_rfc3339()),
1072 run_count: j.run_count,
1073 failure_count: j.failure_count,
1074 created_at: j.created_at.to_rfc3339(),
1075 updated_at: j.updated_at.to_rfc3339(),
1076 });
1077 }
1078 Err(RuntimeError::Internal(
1079 "Schedule API requires a running CronScheduler".to_string(),
1080 ))
1081 }
1082
1083 async fn update_schedule(
1084 &self,
1085 job_id: &str,
1086 request: UpdateScheduleRequest,
1087 ) -> Result<ScheduleDetail, RuntimeError> {
1088 #[cfg(feature = "cron")]
1089 if let Some(ref cron) = self.cron_scheduler {
1090 let id: scheduler::cron_types::CronJobId = job_id
1091 .parse()
1092 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1093 let mut job = cron
1094 .get_job(id)
1095 .await
1096 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1097 if let Some(expr) = request.cron_expression {
1098 job.cron_expression = expr;
1099 }
1100 if let Some(tz) = request.timezone {
1101 job.timezone = tz;
1102 }
1103 if let Some(pids) = request.policy_ids {
1104 job.policy_ids = pids;
1105 }
1106 if let Some(one_shot) = request.one_shot {
1107 job.one_shot = one_shot;
1108 }
1109 cron.update_job(job)
1110 .await
1111 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1112 return self.get_schedule(job_id).await;
1113 }
1114 Err(RuntimeError::Internal(
1115 "Schedule API requires a running CronScheduler".to_string(),
1116 ))
1117 }
1118
1119 async fn delete_schedule(&self, job_id: &str) -> Result<DeleteScheduleResponse, RuntimeError> {
1120 #[cfg(feature = "cron")]
1121 if let Some(ref cron) = self.cron_scheduler {
1122 let id: scheduler::cron_types::CronJobId = job_id
1123 .parse()
1124 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1125 cron.remove_job(id)
1126 .await
1127 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1128 return Ok(DeleteScheduleResponse {
1129 job_id: job_id.to_string(),
1130 deleted: true,
1131 });
1132 }
1133 Err(RuntimeError::Internal(
1134 "Schedule API requires a running CronScheduler".to_string(),
1135 ))
1136 }
1137
1138 async fn pause_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1139 #[cfg(feature = "cron")]
1140 if let Some(ref cron) = self.cron_scheduler {
1141 let id: scheduler::cron_types::CronJobId = job_id
1142 .parse()
1143 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1144 cron.pause_job(id)
1145 .await
1146 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1147 return Ok(ScheduleActionResponse {
1148 job_id: job_id.to_string(),
1149 action: "pause".to_string(),
1150 status: "paused".to_string(),
1151 });
1152 }
1153 Err(RuntimeError::Internal(
1154 "Schedule API requires a running CronScheduler".to_string(),
1155 ))
1156 }
1157
1158 async fn resume_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1159 #[cfg(feature = "cron")]
1160 if let Some(ref cron) = self.cron_scheduler {
1161 let id: scheduler::cron_types::CronJobId = job_id
1162 .parse()
1163 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1164 cron.resume_job(id)
1165 .await
1166 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1167 return Ok(ScheduleActionResponse {
1168 job_id: job_id.to_string(),
1169 action: "resume".to_string(),
1170 status: "active".to_string(),
1171 });
1172 }
1173 Err(RuntimeError::Internal(
1174 "Schedule API requires a running CronScheduler".to_string(),
1175 ))
1176 }
1177
1178 async fn trigger_schedule(&self, job_id: &str) -> Result<ScheduleActionResponse, RuntimeError> {
1179 #[cfg(feature = "cron")]
1180 if let Some(ref cron) = self.cron_scheduler {
1181 let id: scheduler::cron_types::CronJobId = job_id
1182 .parse()
1183 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1184 cron.trigger_now(id)
1185 .await
1186 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1187 return Ok(ScheduleActionResponse {
1188 job_id: job_id.to_string(),
1189 action: "trigger".to_string(),
1190 status: "triggered".to_string(),
1191 });
1192 }
1193 Err(RuntimeError::Internal(
1194 "Schedule API requires a running CronScheduler".to_string(),
1195 ))
1196 }
1197
1198 async fn get_schedule_history(
1199 &self,
1200 job_id: &str,
1201 limit: usize,
1202 ) -> Result<ScheduleHistoryResponse, RuntimeError> {
1203 #[cfg(feature = "cron")]
1204 if let Some(ref cron) = self.cron_scheduler {
1205 let id: scheduler::cron_types::CronJobId = job_id
1206 .parse()
1207 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1208 let runs = cron
1209 .get_run_history(id, limit)
1210 .await
1211 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1212 return Ok(ScheduleHistoryResponse {
1213 job_id: job_id.to_string(),
1214 history: runs
1215 .into_iter()
1216 .map(|r| ScheduleRunEntry {
1217 run_id: r.run_id.to_string(),
1218 started_at: r.started_at.to_rfc3339(),
1219 completed_at: r.completed_at.map(|t| t.to_rfc3339()),
1220 status: format!("{:?}", r.status),
1221 error: r.error,
1222 execution_time_ms: r.execution_time_ms,
1223 })
1224 .collect(),
1225 });
1226 }
1227 Err(RuntimeError::Internal(
1228 "Schedule API requires a running CronScheduler".to_string(),
1229 ))
1230 }
1231
1232 async fn get_schedule_next_runs(
1233 &self,
1234 job_id: &str,
1235 count: usize,
1236 ) -> Result<NextRunsResponse, RuntimeError> {
1237 #[cfg(feature = "cron")]
1238 if let Some(ref cron) = self.cron_scheduler {
1239 let id: scheduler::cron_types::CronJobId = job_id
1240 .parse()
1241 .map_err(|_| RuntimeError::Internal(format!("Invalid job ID: {}", job_id)))?;
1242 let job = cron
1243 .get_job(id)
1244 .await
1245 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1246 let runs = cron
1247 .get_next_runs(&job.cron_expression, &job.timezone, count)
1248 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1249 return Ok(NextRunsResponse {
1250 job_id: job_id.to_string(),
1251 next_runs: runs.into_iter().map(|t| t.to_rfc3339()).collect(),
1252 });
1253 }
1254 Err(RuntimeError::Internal(
1255 "Schedule API requires a running CronScheduler".to_string(),
1256 ))
1257 }
1258
1259 async fn get_scheduler_health(
1260 &self,
1261 ) -> Result<api::types::SchedulerHealthResponse, RuntimeError> {
1262 #[cfg(feature = "cron")]
1263 if let Some(ref cron) = self.cron_scheduler {
1264 let h = cron
1265 .check_health()
1266 .await
1267 .map_err(|e| RuntimeError::Internal(e.to_string()))?;
1268 let m = cron.metrics();
1269 return Ok(api::types::SchedulerHealthResponse {
1270 is_running: h.is_running,
1271 store_accessible: h.store_accessible,
1272 jobs_total: h.jobs_total,
1273 jobs_active: h.jobs_active,
1274 jobs_paused: h.jobs_paused,
1275 jobs_dead_letter: h.jobs_dead_letter,
1276 global_active_runs: h.global_active_runs,
1277 max_concurrent: h.max_concurrent,
1278 runs_total: m.runs_total,
1279 runs_succeeded: m.runs_succeeded,
1280 runs_failed: m.runs_failed,
1281 average_execution_time_ms: m.average_execution_time_ms,
1282 longest_run_ms: m.longest_run_ms,
1283 });
1284 }
1285 Ok(api::types::SchedulerHealthResponse {
1287 is_running: false,
1288 store_accessible: false,
1289 jobs_total: 0,
1290 jobs_active: 0,
1291 jobs_paused: 0,
1292 jobs_dead_letter: 0,
1293 global_active_runs: 0,
1294 max_concurrent: 0,
1295 runs_total: 0,
1296 runs_succeeded: 0,
1297 runs_failed: 0,
1298 average_execution_time_ms: 0.0,
1299 longest_run_ms: 0,
1300 })
1301 }
1302
1303 async fn list_channels(&self) -> Result<Vec<ChannelSummary>, RuntimeError> {
1306 Ok(vec![])
1308 }
1309
1310 async fn register_channel(
1311 &self,
1312 _request: RegisterChannelRequest,
1313 ) -> Result<RegisterChannelResponse, RuntimeError> {
1314 Err(RuntimeError::Internal(
1315 "Channel API requires a running ChannelAdapterManager".to_string(),
1316 ))
1317 }
1318
1319 async fn get_channel(&self, _id: &str) -> Result<ChannelDetail, RuntimeError> {
1320 Err(RuntimeError::Internal(
1321 "Channel API requires a running ChannelAdapterManager".to_string(),
1322 ))
1323 }
1324
1325 async fn update_channel(
1326 &self,
1327 _id: &str,
1328 _request: UpdateChannelRequest,
1329 ) -> Result<ChannelDetail, RuntimeError> {
1330 Err(RuntimeError::Internal(
1331 "Channel API requires a running ChannelAdapterManager".to_string(),
1332 ))
1333 }
1334
1335 async fn delete_channel(&self, _id: &str) -> Result<DeleteChannelResponse, RuntimeError> {
1336 Err(RuntimeError::Internal(
1337 "Channel API requires a running ChannelAdapterManager".to_string(),
1338 ))
1339 }
1340
1341 async fn start_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1342 Err(RuntimeError::Internal(
1343 "Channel API requires a running ChannelAdapterManager".to_string(),
1344 ))
1345 }
1346
1347 async fn stop_channel(&self, _id: &str) -> Result<ChannelActionResponse, RuntimeError> {
1348 Err(RuntimeError::Internal(
1349 "Channel API requires a running ChannelAdapterManager".to_string(),
1350 ))
1351 }
1352
1353 async fn get_channel_health(&self, _id: &str) -> Result<ChannelHealthResponse, RuntimeError> {
1354 Err(RuntimeError::Internal(
1355 "Channel API requires a running ChannelAdapterManager".to_string(),
1356 ))
1357 }
1358
1359 async fn list_channel_mappings(
1360 &self,
1361 _id: &str,
1362 ) -> Result<Vec<IdentityMappingEntry>, RuntimeError> {
1363 Err(RuntimeError::Internal(
1364 "Channel identity mappings require enterprise edition".to_string(),
1365 ))
1366 }
1367
1368 async fn add_channel_mapping(
1369 &self,
1370 _id: &str,
1371 _request: AddIdentityMappingRequest,
1372 ) -> Result<IdentityMappingEntry, RuntimeError> {
1373 Err(RuntimeError::Internal(
1374 "Channel identity mappings require enterprise edition".to_string(),
1375 ))
1376 }
1377
1378 async fn remove_channel_mapping(&self, _id: &str, _user_id: &str) -> Result<(), RuntimeError> {
1379 Err(RuntimeError::Internal(
1380 "Channel identity mappings require enterprise edition".to_string(),
1381 ))
1382 }
1383
1384 async fn get_channel_audit(
1385 &self,
1386 _id: &str,
1387 _limit: usize,
1388 ) -> Result<ChannelAuditResponse, RuntimeError> {
1389 Err(RuntimeError::Internal(
1390 "Channel audit log requires enterprise edition".to_string(),
1391 ))
1392 }
1393
1394 async fn update_agent_heartbeat(
1397 &self,
1398 agent_id: AgentId,
1399 heartbeat: api::types::HeartbeatRequest,
1400 ) -> Result<(), RuntimeError> {
1401 self.verify_agentpin_for_agent(heartbeat.agentpin_jwt.as_deref(), agent_id)
1403 .await?;
1404
1405 let ext_agents = self.scheduler.external_agents();
1406 let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1407 RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1408 })?;
1409
1410 entry.last_heartbeat = Some(chrono::Utc::now());
1411 entry.reported_state = heartbeat.state;
1412
1413 if let Some(metadata) = heartbeat.metadata {
1414 entry.metadata.extend(metadata);
1415 }
1416 if let Some(last_result) = heartbeat.last_result {
1417 entry.last_result = Some(last_result);
1418 }
1419
1420 Ok(())
1421 }
1422
1423 async fn push_agent_event(
1424 &self,
1425 agent_id: AgentId,
1426 event: api::types::PushEventRequest,
1427 ) -> Result<(), RuntimeError> {
1428 self.verify_agentpin_for_agent(event.agentpin_jwt.as_deref(), agent_id)
1430 .await?;
1431
1432 let ext_agents = self.scheduler.external_agents();
1433 let mut entry = ext_agents.get_mut(&agent_id).ok_or_else(|| {
1434 RuntimeError::Internal(format!("Agent {} is not an external agent", agent_id))
1435 })?;
1436
1437 entry.push_event(api::types::AgentEvent {
1438 event_type: event.event_type,
1439 payload: event.payload,
1440 timestamp: chrono::Utc::now(),
1441 });
1442
1443 Ok(())
1444 }
1445
1446 async fn check_unreachable_agents(&self) {
1447 self.scheduler.check_unreachable_agents();
1448 }
1449
1450 async fn send_agent_message(
1451 &self,
1452 recipient: crate::types::AgentId,
1453 request: api::types::SendMessageRequest,
1454 ) -> Result<api::types::SendMessageResponse, crate::types::RuntimeError> {
1455 self.verify_agentpin_for_agent(request.agentpin_jwt.as_deref(), request.sender)
1459 .await?;
1460
1461 const MAX_TTL_SECS: u64 = 24 * 3600;
1476 const DEFAULT_TTL_SECS: u64 = 300;
1477 let requested_secs = request.ttl_seconds.unwrap_or(DEFAULT_TTL_SECS);
1478 let configured_secs = self.config.read().await.communication.message_ttl.as_secs();
1479 let ttl_secs = requested_secs.min(configured_secs).clamp(1, MAX_TTL_SECS);
1480 let ttl = std::time::Duration::from_secs(ttl_secs);
1481
1482 if let Some(ref topic) = request.topic {
1484 let msg = self.communication.create_internal_message(
1485 request.sender,
1486 recipient,
1487 bytes::Bytes::from(request.payload.into_bytes()),
1488 crate::types::communication::MessageType::Publish(topic.clone()),
1489 ttl,
1490 );
1491 let message_id = msg.id;
1492 self.communication
1493 .publish(topic.clone(), msg)
1494 .await
1495 .map_err(crate::types::RuntimeError::Communication)?;
1496 Ok(api::types::SendMessageResponse {
1497 message_id: message_id.0.to_string(),
1498 status: "pending".to_string(),
1499 })
1500 } else {
1501 let msg = self.communication.create_internal_message(
1502 request.sender,
1503 recipient,
1504 bytes::Bytes::from(request.payload.into_bytes()),
1505 crate::types::communication::MessageType::Direct(recipient),
1506 ttl,
1507 );
1508 let message_id = self
1509 .communication
1510 .send_message(msg)
1511 .await
1512 .map_err(crate::types::RuntimeError::Communication)?;
1513 Ok(api::types::SendMessageResponse {
1514 message_id: message_id.0.to_string(),
1515 status: "pending".to_string(),
1516 })
1517 }
1518 }
1519
1520 async fn receive_agent_messages(
1521 &self,
1522 agent_id: crate::types::AgentId,
1523 ) -> Result<api::types::ReceiveMessagesResponse, crate::types::RuntimeError> {
1524 let messages = self
1525 .communication
1526 .receive_messages(agent_id)
1527 .await
1528 .map_err(crate::types::RuntimeError::Communication)?;
1529
1530 let envelopes = messages
1531 .into_iter()
1532 .map(|m| {
1533 let timestamp_secs = m
1534 .timestamp
1535 .duration_since(std::time::UNIX_EPOCH)
1536 .map(|d: std::time::Duration| d.as_secs())
1537 .unwrap_or(0);
1538 let ttl_seconds = m.ttl.as_secs();
1539 let (message_type, topic) = match &m.message_type {
1540 crate::types::communication::MessageType::Direct(_) => {
1541 ("direct".to_string(), None)
1542 }
1543 crate::types::communication::MessageType::Publish(t) => {
1544 ("publish".to_string(), Some(t.clone()))
1545 }
1546 crate::types::communication::MessageType::Subscribe(t) => {
1547 ("subscribe".to_string(), Some(t.clone()))
1548 }
1549 crate::types::communication::MessageType::Broadcast => {
1550 ("broadcast".to_string(), None)
1551 }
1552 crate::types::communication::MessageType::Request(_) => {
1553 ("request".to_string(), None)
1554 }
1555 crate::types::communication::MessageType::Response(_) => {
1556 ("response".to_string(), None)
1557 }
1558 };
1559 let payload = String::from_utf8_lossy(&m.payload.data).to_string();
1565 api::types::MessageEnvelope {
1566 message_id: m.id.0.to_string(),
1567 sender: m.sender,
1568 recipient: m.recipient,
1569 topic,
1570 payload,
1571 message_type,
1572 timestamp_secs,
1573 ttl_seconds,
1574 }
1575 })
1576 .collect();
1577
1578 Ok(api::types::ReceiveMessagesResponse {
1579 messages: envelopes,
1580 })
1581 }
1582
1583 async fn get_message_status(
1584 &self,
1585 message_id: &str,
1586 ) -> Result<api::types::MessageStatusResponse, crate::types::RuntimeError> {
1587 let uuid = uuid::Uuid::parse_str(message_id).map_err(|_| {
1588 crate::types::RuntimeError::Communication(
1589 crate::types::CommunicationError::InvalidFormat(format!(
1590 "Invalid message ID: {}",
1591 message_id
1592 )),
1593 )
1594 })?;
1595 let mid = crate::types::MessageId(uuid);
1596 let status = self
1597 .communication
1598 .get_delivery_status(mid)
1599 .await
1600 .map_err(crate::types::RuntimeError::Communication)?;
1601 let status_str = match status {
1602 crate::communication::DeliveryStatus::Pending => "pending",
1603 crate::communication::DeliveryStatus::Delivered => "delivered",
1604 crate::communication::DeliveryStatus::Failed => "failed",
1605 crate::communication::DeliveryStatus::Expired => "expired",
1606 };
1607 Ok(api::types::MessageStatusResponse {
1608 message_id: message_id.to_string(),
1609 status: status_str.to_string(),
1610 })
1611 }
1612}