1use async_trait::async_trait;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use tracing::{debug, info, warn};
12
13use crate::execution_monitor::{CorrectionStrategy, ExecutionMonitor};
14use crate::permission::types::{PermissionAction, PermissionRequest};
15use crate::permission::PermissionManager;
16use crate::planner::{DecompositionStrategy, ExecutionPlan, TaskDecomposer};
17use crate::session_manager::{ConcurrentSessionManager, SessionConfig, SessionManagerTrait};
18use crate::tool_registry::{ToolRegistry, ToolRegistryTrait};
19use crate::types::{
20 AgentId, AgentState, Layer2Error, Layer2Result, Message, MessageRole, SessionId, ToolCall,
21 ToolResult,
22};
23
24#[derive(Debug, Clone)]
26pub struct AgentResult {
27 pub session_id: SessionId,
28 pub final_state: AgentState,
29 pub messages: Vec<Message>,
30 pub tool_calls: Vec<ToolCall>,
31 pub tool_results: Vec<ToolResult>,
32 pub iterations: i32,
33 pub tokens_used: i64,
34}
35
36#[derive(Debug, Clone)]
38pub struct AgentConfig {
39 pub agent_id: AgentId,
40 pub model: String,
41 pub temperature: f32,
42 pub max_iterations: i32,
43 pub system_prompt: Option<String>,
44}
45
46impl Default for AgentConfig {
47 fn default() -> Self {
48 Self {
49 agent_id: AgentId::new(),
50 model: "claude-sonnet-4-6".to_string(),
51 temperature: 0.7,
52 max_iterations: 100,
53 system_prompt: None,
54 }
55 }
56}
57
58impl From<&AgentConfig> for SessionConfig {
59 fn from(config: &AgentConfig) -> Self {
60 SessionConfig {
61 model: config.model.clone(),
62 temperature: config.temperature,
63 max_iterations: config.max_iterations,
64 system_prompt: config.system_prompt.clone(),
65 ..Default::default()
66 }
67 }
68}
69
70#[async_trait]
74pub trait AgentRuntimeTrait: Send + Sync {
75 async fn run(&self, task: &str, config: AgentConfig) -> Layer2Result<AgentResult>;
84
85 async fn run_stream(
87 &self,
88 task: &str,
89 config: AgentConfig,
90 callback: &dyn AgentLoopCallback,
91 ) -> Layer2Result<AgentResult>;
92
93 async fn run_stream_abortable(
95 &self,
96 task: &str,
97 config: AgentConfig,
98 callback: &dyn AgentLoopCallback,
99 abort_flag: Arc<AtomicBool>,
100 ) -> Layer2Result<AgentResult>;
101
102 async fn start(&self, task: &str, config: AgentConfig) -> Layer2Result<SessionId>;
111
112 async fn pause(&self, session_id: &SessionId) -> Layer2Result<()>;
117
118 async fn resume(&self, session_id: &SessionId) -> Layer2Result<()>;
123
124 async fn stop(&self, session_id: &SessionId) -> Layer2Result<()>;
129
130 fn status(&self, session_id: &SessionId) -> Layer2Result<AgentState>;
135
136 async fn send_message(&self, session_id: &SessionId, message: &str) -> Layer2Result<()>;
142
143 async fn submit_tool_result(
149 &self,
150 session_id: &SessionId,
151 tool_call_id: &str,
152 result: ToolResult,
153 ) -> Layer2Result<()>;
154}
155
156#[async_trait]
160pub trait AgentLoopCallback: Send + Sync {
161 async fn before_iteration(&self, session_id: &SessionId, iteration: i32) -> Layer2Result<bool>;
163
164 async fn after_iteration(
166 &self,
167 session_id: &SessionId,
168 iteration: i32,
169 result: &IterationResult,
170 ) -> Layer2Result<()>;
171
172 async fn before_tool_call(
174 &self,
175 session_id: &SessionId,
176 tool_call: &ToolCall,
177 ) -> Layer2Result<bool>;
178
179 async fn after_tool_call(
181 &self,
182 session_id: &SessionId,
183 tool_call: &ToolCall,
184 result: &ToolResult,
185 ) -> Layer2Result<()>;
186}
187
188#[derive(Debug, Clone)]
190pub struct IterationResult {
191 pub iteration: i32,
192 pub state: AgentState,
193 pub message: Option<Message>,
194 pub tool_calls: Vec<ToolCall>,
195 pub should_continue: bool,
196}
197
198pub struct AgentRuntime {
205 session_manager: Arc<ConcurrentSessionManager>,
206 tool_registry: Arc<ToolRegistry>,
207 permission_manager: Option<Arc<PermissionManager>>,
208 task_decomposer: Option<TaskDecomposer>,
210 llm_client: Option<Arc<sh_layer1::LlmClient>>,
212}
213
214impl AgentRuntime {
215 pub fn new(
217 session_manager: Arc<ConcurrentSessionManager>,
218 tool_registry: Arc<ToolRegistry>,
219 ) -> Self {
220 Self {
221 session_manager,
222 tool_registry,
223 permission_manager: None,
224 task_decomposer: None,
225 llm_client: None,
226 }
227 }
228
229 pub fn with_permissions(
231 session_manager: Arc<ConcurrentSessionManager>,
232 tool_registry: Arc<ToolRegistry>,
233 permission_manager: Arc<PermissionManager>,
234 ) -> Self {
235 Self {
236 session_manager,
237 tool_registry,
238 permission_manager: Some(permission_manager),
239 task_decomposer: None,
240 llm_client: None,
241 }
242 }
243
244 pub fn with_decomposer(
246 session_manager: Arc<ConcurrentSessionManager>,
247 tool_registry: Arc<ToolRegistry>,
248 strategy: DecompositionStrategy,
249 ) -> Self {
250 Self {
251 session_manager,
252 tool_registry,
253 permission_manager: None,
254 task_decomposer: Some(TaskDecomposer::new().with_strategy(strategy)),
255 llm_client: None,
256 }
257 }
258
259 pub fn with_defaults() -> Self {
261 Self {
262 session_manager: Arc::new(ConcurrentSessionManager::default_config()),
263 tool_registry: Arc::new(ToolRegistry::new()),
264 permission_manager: None,
265 task_decomposer: Some(TaskDecomposer::new()),
266 llm_client: None,
267 }
268 }
269
270 pub fn with_llm_client(
272 session_manager: Arc<ConcurrentSessionManager>,
273 tool_registry: Arc<ToolRegistry>,
274 llm_client: Arc<sh_layer1::LlmClient>,
275 ) -> Self {
276 Self {
277 session_manager,
278 tool_registry,
279 permission_manager: None,
280 task_decomposer: Some(TaskDecomposer::new()),
281 llm_client: Some(llm_client),
282 }
283 }
284
285 pub fn set_permission_manager(&mut self, manager: Arc<PermissionManager>) {
287 self.permission_manager = Some(manager);
288 }
289
290 pub fn set_decomposition_strategy(&mut self, strategy: DecompositionStrategy) {
292 self.task_decomposer = Some(TaskDecomposer::new().with_strategy(strategy));
293 }
294
295 pub fn set_llm_client(&mut self, client: Arc<sh_layer1::LlmClient>) {
297 self.llm_client = Some(client);
298 }
299
300 pub fn permission_manager(&self) -> Option<&Arc<PermissionManager>> {
302 self.permission_manager.as_ref()
303 }
304
305 pub fn task_decomposer(&self) -> Option<&TaskDecomposer> {
307 self.task_decomposer.as_ref()
308 }
309
310 pub fn llm_client(&self) -> Option<&Arc<sh_layer1::LlmClient>> {
312 self.llm_client.as_ref()
313 }
314
315 pub fn decompose_task(&self, task: &str) -> Layer2Result<Option<ExecutionPlan>> {
320 if let Some(decomposer) = &self.task_decomposer {
321 let plan = decomposer.decompose(task)?;
322 info!(
323 task = %task,
324 subtasks = plan.subtasks.len(),
325 strategy = ?plan.strategy,
326 risk = ?plan.risk_level,
327 "Task decomposed into execution plan"
328 );
329 Ok(Some(plan))
330 } else {
331 Ok(None)
332 }
333 }
334
335 pub fn create_monitor(&self, plan: ExecutionPlan) -> ExecutionMonitor {
339 ExecutionMonitor::new(plan)
340 }
341
342 pub async fn run_with_plan(
347 &self,
348 task: &str,
349 config: AgentConfig,
350 ) -> Layer2Result<AgentResult> {
351 let plan_option = self.decompose_task(task)?;
353
354 if let Some(plan) = plan_option {
356 self.run_with_execution_plan(plan, config).await
357 } else {
358 self.run(task, config).await
360 }
361 }
362
363 async fn run_with_execution_plan(
365 &self,
366 plan: ExecutionPlan,
367 config: AgentConfig,
368 ) -> Layer2Result<AgentResult> {
369 let monitor = self.create_monitor(plan.clone());
370 monitor.start().await?;
371
372 info!(
373 plan_id = %plan.id,
374 steps = plan.subtasks.len(),
375 "Starting planned execution"
376 );
377
378 let mut all_messages = Vec::new();
380 let mut all_tool_calls = Vec::new();
381 let mut all_tool_results = Vec::new();
382 let mut total_iterations = 0;
383 let mut total_tokens = 0i64;
384
385 for subtask_id in &plan.execution_order {
386 if let Some(subtask) = plan.subtasks.iter().find(|s| &s.id == subtask_id) {
387 let subtask_result = self.run(&subtask.description, config.clone()).await;
392
393 match subtask_result {
394 Ok(result) => {
395 monitor
396 .report_step_completed(subtask_id, result.final_state.to_string())
397 .await?;
398 all_messages.extend(result.messages);
399 all_tool_calls.extend(result.tool_calls);
400 all_tool_results.extend(result.tool_results);
401 total_iterations += result.iterations;
402 total_tokens += result.tokens_used;
403 }
404 Err(e) => {
405 let error_msg = e.to_string();
406 let decision = monitor
407 .report_step_failed(subtask_id, error_msg.clone())
408 .await?;
409
410 if decision.should_continue {
412 match &decision.strategy {
413 CorrectionStrategy::Retry { max_attempts } => {
414 for attempt in 1..=*max_attempts {
416 warn!(
417 subtask_id = %subtask_id,
418 attempt = attempt,
419 max = max_attempts,
420 "Retrying subtask"
421 );
422 let retry_result =
423 self.run(&subtask.description, config.clone()).await;
424 if retry_result.is_ok() {
425 let result = retry_result.unwrap();
426 monitor
427 .report_step_completed(
428 subtask_id,
429 format!("Retry {} succeeded", attempt),
430 )
431 .await?;
432 all_messages.extend(result.messages);
433 all_tool_calls.extend(result.tool_calls);
434 all_tool_results.extend(result.tool_results);
435 total_iterations += result.iterations;
436 total_tokens += result.tokens_used;
437 break;
438 }
439 }
440 }
441 CorrectionStrategy::Skip => {
442 monitor
443 .report_step_completed(subtask_id, "[SKIPPED]".to_string())
444 .await?;
445 }
446 _ => {
447 monitor
449 .report_step_completed(
450 subtask_id,
451 format!(
452 "[HANDLED] {}",
453 decision.strategy.clone().debug_name()
454 ),
455 )
456 .await?;
457 }
458 }
459 } else {
460 return Err(e);
462 }
463 }
464 }
465 }
466 }
467
468 let summary = monitor.complete().await?;
469 info!(
470 plan_id = %plan.id,
471 completed = summary.completed_steps,
472 failed = summary.failed_steps,
473 corrections = summary.correction_count,
474 duration_ms = summary.duration.as_millis(),
475 "Planned execution completed"
476 );
477
478 Ok(AgentResult {
479 session_id: SessionId::new(),
480 final_state: if summary.failed_steps > 0 && summary.completed_steps == 0 {
481 AgentState::Error
482 } else {
483 AgentState::Completed
484 },
485 messages: all_messages,
486 tool_calls: all_tool_calls,
487 tool_results: all_tool_results,
488 iterations: total_iterations,
489 tokens_used: total_tokens,
490 })
491 }
492
493 pub fn session_manager(&self) -> &Arc<ConcurrentSessionManager> {
495 &self.session_manager
496 }
497
498 pub fn tool_registry(&self) -> &Arc<ToolRegistry> {
500 &self.tool_registry
501 }
502
503 fn validate_transition(current: AgentState, target: AgentState) -> Layer2Result<()> {
505 let valid = match (current, target) {
506 (AgentState::Idle, AgentState::Running) => true,
508 (AgentState::Running, AgentState::ToolCalling) => true,
510 (AgentState::Running, AgentState::WaitingTool) => true,
512 (AgentState::Running, AgentState::Completed) => true,
514 (AgentState::Running, AgentState::Stopped) => true,
516 (AgentState::Running, AgentState::Error) => true,
518 (AgentState::ToolCalling, AgentState::WaitingTool) => true,
520 (AgentState::ToolCalling, AgentState::Running) => true,
522 (AgentState::ToolCalling, AgentState::Error) => true,
524 (AgentState::WaitingTool, AgentState::Running) => true,
526 (AgentState::WaitingTool, AgentState::Stopped) => true,
528 (AgentState::WaitingTool, AgentState::Error) => true,
530 (AgentState::Stopped, AgentState::Running) => true,
532 (AgentState::Completed, AgentState::Idle) => true,
534 (_, _) if current == target => true,
536 _ => false,
537 };
538
539 if valid {
540 Ok(())
541 } else {
542 Err(Layer2Error::InvalidStateTransition {
543 from: current,
544 to: target,
545 }
546 .into())
547 }
548 }
549
550 async fn require_session(&self, session_id: &SessionId) -> Layer2Result<()> {
552 let session = self.session_manager.get(session_id).await?;
553 if session.is_some() {
554 Ok(())
555 } else {
556 Err(Layer2Error::SessionNotFound(session_id.clone()).into())
557 }
558 }
559
560 async fn execute_pending_tool_calls(&self, session_id: &SessionId) -> Layer2Result<()> {
563 let pending: Vec<ToolCall> = self
565 .session_manager
566 .read(session_id, |s| s.tool_calls_pending.clone())
567 .await?
568 .unwrap_or_default();
569
570 if pending.is_empty() {
571 return Ok(());
572 }
573
574 debug!(
575 session_id = %session_id,
576 count = pending.len(),
577 "Executing pending tool calls"
578 );
579
580 let mut results = Vec::with_capacity(pending.len());
582 for tc in &pending {
583 if let Some(pm) = &self.permission_manager {
585 let request = PermissionRequest::new(PermissionAction::Custom {
586 description: format!("Execute tool: {} with args: {}", tc.name, tc.arguments),
587 });
588
589 match pm.check_permission(request) {
590 Ok(response) => {
591 if !response.decision.is_allowed() {
592 warn!(
593 tool = %tc.name,
594 tool_call_id = %tc.id,
595 "Tool execution denied by permission system"
596 );
597 results.push(ToolResult {
598 tool_call_id: tc.id.clone(),
599 name: tc.name.clone(),
600 content: "Tool execution denied by permission system".to_string(),
601 is_error: true,
602 });
603 continue;
604 }
605 }
606 Err(e) => {
607 warn!(
608 tool = %tc.name,
609 tool_call_id = %tc.id,
610 error = %e,
611 "Permission check failed"
612 );
613 results.push(ToolResult {
614 tool_call_id: tc.id.clone(),
615 name: tc.name.clone(),
616 content: format!("Permission check failed: {}", e),
617 is_error: true,
618 });
619 continue;
620 }
621 }
622 }
623
624 let result = match self.tool_registry.execute(&tc.name, &tc.arguments).await {
625 Ok(tool_result) => tool_result,
626 Err(e) => {
627 warn!(
628 tool = %tc.name,
629 tool_call_id = %tc.id,
630 error = %e,
631 "Tool execution failed"
632 );
633 ToolResult {
634 tool_call_id: tc.id.clone(),
635 name: tc.name.clone(),
636 content: format!("Tool execution error: {}", e),
637 is_error: true,
638 }
639 }
640 };
641 results.push(result);
642 }
643
644 self.session_manager
646 .update(session_id, |s| {
647 s.tool_results_cache.extend(results);
648 s.tool_calls_pending.clear();
649 })
650 .await?;
651
652 Ok(())
653 }
654
655 async fn simulate_llm_step(
660 &self,
661 session_id: &SessionId,
662 task: &str,
663 iteration: i32,
664 max_iterations: i32,
665 ) -> Layer2Result<IterationResult> {
666 let tools = self.tool_registry.list();
667
668 let has_pending_results: bool = self
670 .session_manager
671 .read(session_id, |s| !s.tool_results_cache.is_empty())
672 .await?
673 .unwrap_or(false);
674
675 let should_continue = iteration < max_iterations;
676
677 if has_pending_results {
679 let tool_results: Vec<ToolResult> = self
680 .session_manager
681 .read(session_id, |s| s.tool_results_cache.clone())
682 .await?
683 .unwrap_or_default();
684
685 let summary: Vec<String> = tool_results
687 .iter()
688 .map(|r| {
689 if r.is_error {
690 format!("Tool {} failed: {}", r.name, r.content)
691 } else {
692 format!("Tool {} succeeded: {}", r.name, r.content)
693 }
694 })
695 .collect();
696
697 let response = if !should_continue {
698 format!(
699 "I've processed the tool results. Task '{}' is now complete.\n{}",
700 task,
701 summary.join("\n")
702 )
703 } else {
704 format!(
705 "Processing tool results, continuing...\n{}",
706 summary.join("\n")
707 )
708 };
709
710 self.session_manager
712 .update(session_id, |s| {
713 s.tool_results_cache.clear();
714 })
715 .await?;
716
717 return Ok(IterationResult {
718 iteration,
719 state: if should_continue {
720 AgentState::Running
721 } else {
722 AgentState::Completed
723 },
724 message: Some(Message::assistant(&response)),
725 tool_calls: Vec::new(),
726 should_continue,
727 });
728 }
729
730 if iteration == 1 {
732 let response = format!("Starting task: {}", task);
733 return Ok(IterationResult {
734 iteration,
735 state: AgentState::Running,
736 message: Some(Message::assistant(&response)),
737 tool_calls: Vec::new(),
738 should_continue: true,
739 });
740 }
741
742 if !tools.is_empty() && iteration <= 2 {
744 let tool_name = &tools[0];
746 let tool_call = ToolCall {
747 id: sh_layer1::generate_prefixed_id("tc"),
748 name: tool_name.clone(),
749 arguments: serde_json::json!({"task": task}).to_string(),
750 };
751
752 return Ok(IterationResult {
753 iteration,
754 state: AgentState::ToolCalling,
755 message: Some(Message::assistant(format!(
756 "I'll use the {} tool to help with this task.",
757 tool_name
758 ))),
759 tool_calls: vec![tool_call],
760 should_continue: true,
761 });
762 }
763
764 let response = format!("Task '{}' has been completed.", task);
766 Ok(IterationResult {
767 iteration,
768 state: AgentState::Completed,
769 message: Some(Message::assistant(&response)),
770 tool_calls: Vec::new(),
771 should_continue: false,
772 })
773 }
774
775 async fn real_llm_step(
779 &self,
780 session_id: &SessionId,
781 task: &str,
782 iteration: i32,
783 max_iterations: i32,
784 config: &AgentConfig,
785 abort_flag: Option<Arc<AtomicBool>>,
786 ) -> Layer2Result<IterationResult> {
787 use sh_layer1::{LlmClientTrait, LlmRequestConfig};
788
789 let llm_client = self
790 .llm_client
791 .as_ref()
792 .ok_or_else(|| anyhow::anyhow!("LLM client not configured"))?;
793
794 let session_messages: Vec<Message> = self
796 .session_manager
797 .read(session_id, |s| s.messages.clone())
798 .await?
799 .unwrap_or_default();
800
801 let mut llm_messages: Vec<sh_layer1::Message> = session_messages
803 .iter()
804 .map(|m| sh_layer1::Message {
805 role: match m.role {
806 MessageRole::System => sh_layer1::MessageRole::System,
807 MessageRole::User => sh_layer1::MessageRole::User,
808 MessageRole::Assistant => sh_layer1::MessageRole::Assistant,
809 MessageRole::Tool => sh_layer1::MessageRole::User, },
811 content: m.content.clone(),
812 })
813 .collect();
814
815 if iteration == 1 {
817 llm_messages.push(sh_layer1::Message {
818 role: sh_layer1::MessageRole::User,
819 content: task.to_string(),
820 });
821 }
822
823 let request_config = LlmRequestConfig {
825 model: config.model.clone(),
826 max_tokens: 4096,
827 temperature: config.temperature,
828 system_prompt: config.system_prompt.clone(),
829 stop_sequences: vec!["\n\n\n".to_string()],
830 };
831
832 let response = if let Some(flag) = abort_flag {
834 llm_client
835 .send_stream_abortable(llm_messages, &request_config, flag)
836 .await
837 .map_err(|e| anyhow::anyhow!("LLM stream error: {}", e))?
838 } else {
839 llm_client
840 .send(llm_messages, &request_config)
841 .await
842 .map_err(|e| anyhow::anyhow!("LLM error: {}", e))?
843 };
844
845 let tokens_used = response.usage.input_tokens as i64 + response.usage.output_tokens as i64;
847 self.session_manager
848 .update(session_id, |s| {
849 s.tokens_total += tokens_used;
850 })
851 .await?;
852
853 let tool_calls = self.parse_tool_calls_from_response(&response.content);
855
856 let state = if !tool_calls.is_empty() {
858 AgentState::ToolCalling
859 } else if iteration >= max_iterations {
860 AgentState::Completed
861 } else {
862 AgentState::Running
863 };
864
865 let should_continue = iteration < max_iterations && state != AgentState::Completed;
866
867 Ok(IterationResult {
868 iteration,
869 state,
870 message: Some(Message::assistant(&response.content)),
871 tool_calls,
872 should_continue,
873 })
874 }
875
876 fn parse_tool_calls_from_response(&self, content: &str) -> Vec<ToolCall> {
880 let mut tool_calls = Vec::new();
881
882 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(content) {
885 if let Some(content_array) = json_value.get("content").and_then(|c| c.as_array()) {
887 for block in content_array {
888 if block.get("type").and_then(|t| t.as_str()) == Some("tool_use") {
889 if let (Some(name), Some(id), Some(input)) = (
890 block.get("name").and_then(|n| n.as_str()),
891 block.get("id").and_then(|i| i.as_str()),
892 block.get("input"),
893 ) {
894 tool_calls.push(ToolCall {
895 id: id.to_string(),
896 name: name.to_string(),
897 arguments: input.to_string(),
898 });
899 }
900 }
901 }
902 }
903 }
904
905 if tool_calls.is_empty() {
908 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(content) {
909 if let Some(func_call) = json_value.get("function_call") {
910 if let (Some(name), Some(args)) = (
911 func_call.get("name").and_then(|n| n.as_str()),
912 func_call.get("arguments").and_then(|a| a.as_str()),
913 ) {
914 tool_calls.push(ToolCall {
915 id: sh_layer1::generate_prefixed_id("tc"),
916 name: name.to_string(),
917 arguments: args.to_string(),
918 });
919 }
920 }
921 }
922 }
923
924 if tool_calls.is_empty() {
927 let re = regex::Regex::new(r"```tool\n(\{.*?\})\n```").unwrap();
928 for cap in re.captures_iter(content) {
929 if let Ok(tool_json) = serde_json::from_str::<serde_json::Value>(&cap[1]) {
930 if let Some(name) = tool_json.get("name").and_then(|n| n.as_str()) {
931 let args = tool_json
932 .get("arguments")
933 .cloned()
934 .unwrap_or(serde_json::Value::Null);
935 tool_calls.push(ToolCall {
936 id: sh_layer1::generate_prefixed_id("tc"),
937 name: name.to_string(),
938 arguments: args.to_string(),
939 });
940 }
941 }
942 }
943 }
944
945 tool_calls
946 }
947
948 async fn llm_step(
952 &self,
953 session_id: &SessionId,
954 task: &str,
955 iteration: i32,
956 max_iterations: i32,
957 config: &AgentConfig,
958 abort_flag: Option<Arc<AtomicBool>>,
959 ) -> Layer2Result<IterationResult> {
960 if self.llm_client.is_some() {
961 self.real_llm_step(
962 session_id,
963 task,
964 iteration,
965 max_iterations,
966 config,
967 abort_flag,
968 )
969 .await
970 } else {
971 self.simulate_llm_step(session_id, task, iteration, max_iterations)
972 .await
973 }
974 }
975}
976
977impl Default for AgentRuntime {
978 fn default() -> Self {
979 Self::with_defaults()
980 }
981}
982
983#[async_trait]
984impl AgentRuntimeTrait for AgentRuntime {
985 async fn run(&self, task: &str, config: AgentConfig) -> Layer2Result<AgentResult> {
989 info!(task = %task, agent_id = %config.agent_id, "Starting agent run");
990
991 let session_config = SessionConfig::from(&config);
993 let session_id = self.session_manager.create(session_config).await?;
994
995 let agent_id = config.agent_id.clone();
997 self.session_manager
998 .update(&session_id, |s| {
999 s.agent_id = agent_id;
1000 })
1001 .await?;
1002
1003 if let Some(ref prompt) = config.system_prompt {
1005 self.session_manager
1006 .add_message(&session_id, Message::system(prompt))
1007 .await?;
1008 }
1009
1010 self.session_manager
1012 .add_message(&session_id, Message::user(task))
1013 .await?;
1014
1015 self.session_manager
1017 .set_state(&session_id, AgentState::Running)
1018 .await?;
1019
1020 let mut iterations = 0;
1022 let max_iterations = config.max_iterations;
1023
1024 loop {
1025 iterations += 1;
1026
1027 if iterations > max_iterations {
1028 warn!(
1029 session_id = %session_id,
1030 max = max_iterations,
1031 "Max iterations reached"
1032 );
1033 self.session_manager
1034 .set_state(&session_id, AgentState::Error)
1035 .await?;
1036 return Err(Layer2Error::MaxIterations(max_iterations).into());
1037 }
1038
1039 let can_continue: bool = self
1041 .session_manager
1042 .read(&session_id, |s| s.can_continue())
1043 .await?
1044 .unwrap_or(false);
1045
1046 if !can_continue {
1047 let current_state: AgentState = self
1048 .session_manager
1049 .read(&session_id, |s| s.state)
1050 .await?
1051 .unwrap_or(AgentState::Stopped);
1052
1053 if current_state == AgentState::Stopped {
1054 info!(session_id = %session_id, "Agent stopped by user");
1055 break;
1056 }
1057 break;
1059 }
1060
1061 let step_result = self
1063 .llm_step(&session_id, task, iterations, max_iterations, &config, None)
1064 .await?;
1065
1066 if let Some(msg) = step_result.message {
1068 self.session_manager.add_message(&session_id, msg).await?;
1069 }
1070
1071 if !step_result.tool_calls.is_empty() {
1073 let tool_calls = step_result.tool_calls.clone();
1075 self.session_manager
1076 .update(&session_id, |s| {
1077 s.tool_calls_pending = tool_calls;
1078 s.state = AgentState::ToolCalling;
1079 })
1080 .await?;
1081
1082 self.execute_pending_tool_calls(&session_id).await?;
1084
1085 self.session_manager
1087 .set_state(&session_id, AgentState::WaitingTool)
1088 .await?;
1089 self.session_manager
1090 .set_state(&session_id, AgentState::Running)
1091 .await?;
1092 } else {
1093 self.session_manager
1095 .set_state(&session_id, step_result.state)
1096 .await?;
1097 }
1098
1099 if !step_result.should_continue {
1101 break;
1102 }
1103 }
1104
1105 let session = self
1107 .session_manager
1108 .get(&session_id)
1109 .await?
1110 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1111
1112 let tokens_used = session.tokens_total;
1113
1114 Ok(AgentResult {
1115 session_id: session.session_id.clone(),
1116 final_state: session.state,
1117 messages: session.messages,
1118 tool_calls: session.tool_calls_pending,
1119 tool_results: session.tool_results_cache,
1120 iterations,
1121 tokens_used,
1122 })
1123 }
1124
1125 async fn run_stream(
1129 &self,
1130 task: &str,
1131 config: AgentConfig,
1132 callback: &dyn AgentLoopCallback,
1133 ) -> Layer2Result<AgentResult> {
1134 info!(task = %task, agent_id = %config.agent_id, "Starting agent run_stream");
1135
1136 let session_config = SessionConfig::from(&config);
1138 let session_id = self.session_manager.create(session_config).await?;
1139
1140 let agent_id = config.agent_id.clone();
1142 self.session_manager
1143 .update(&session_id, |s| {
1144 s.agent_id = agent_id;
1145 })
1146 .await?;
1147
1148 if let Some(ref prompt) = config.system_prompt {
1150 self.session_manager
1151 .add_message(&session_id, Message::system(prompt))
1152 .await?;
1153 }
1154
1155 self.session_manager
1157 .add_message(&session_id, Message::user(task))
1158 .await?;
1159
1160 self.session_manager
1162 .set_state(&session_id, AgentState::Running)
1163 .await?;
1164
1165 let mut iterations = 0;
1167 let max_iterations = config.max_iterations;
1168
1169 loop {
1170 iterations += 1;
1171
1172 if iterations > max_iterations {
1173 warn!(
1174 session_id = %session_id,
1175 max = max_iterations,
1176 "Max iterations reached"
1177 );
1178 self.session_manager
1179 .set_state(&session_id, AgentState::Error)
1180 .await?;
1181 return Err(Layer2Error::MaxIterations(max_iterations).into());
1182 }
1183
1184 let should_continue_iter = callback.before_iteration(&session_id, iterations).await?;
1186 if !should_continue_iter {
1187 info!(session_id = %session_id, "Callback requested stop");
1188 break;
1189 }
1190
1191 let can_continue: bool = self
1193 .session_manager
1194 .read(&session_id, |s| s.can_continue())
1195 .await?
1196 .unwrap_or(false);
1197
1198 if !can_continue {
1199 let current_state: AgentState = self
1200 .session_manager
1201 .read(&session_id, |s| s.state)
1202 .await?
1203 .unwrap_or(AgentState::Stopped);
1204
1205 if current_state == AgentState::Stopped {
1206 info!(session_id = %session_id, "Agent stopped by user");
1207 break;
1208 }
1209 break;
1210 }
1211
1212 let step_result = self
1214 .llm_step(&session_id, task, iterations, max_iterations, &config, None)
1215 .await?;
1216
1217 if let Some(msg) = step_result.message.clone() {
1219 self.session_manager.add_message(&session_id, msg).await?;
1220 }
1221
1222 if !step_result.tool_calls.is_empty() {
1224 let tool_calls = step_result.tool_calls.clone();
1225
1226 for tc in &tool_calls {
1228 let should_execute = callback.before_tool_call(&session_id, tc).await?;
1229 if !should_execute {
1230 info!(tool_call_id = %tc.id, "Callback rejected tool call");
1231 continue;
1232 }
1233 }
1234
1235 self.session_manager
1237 .update(&session_id, |s| {
1238 s.tool_calls_pending = tool_calls;
1239 s.state = AgentState::ToolCalling;
1240 })
1241 .await?;
1242
1243 self.execute_pending_tool_calls(&session_id).await?;
1245
1246 let results: Vec<ToolResult> = self
1248 .session_manager
1249 .read(&session_id, |s| s.tool_results_cache.clone())
1250 .await?
1251 .unwrap_or_default();
1252
1253 for tc in &step_result.tool_calls {
1255 if let Some(result) = results.iter().find(|r| r.tool_call_id == tc.id) {
1256 callback.after_tool_call(&session_id, tc, result).await?;
1257 }
1258 }
1259
1260 self.session_manager
1262 .set_state(&session_id, AgentState::WaitingTool)
1263 .await?;
1264 self.session_manager
1265 .set_state(&session_id, AgentState::Running)
1266 .await?;
1267 } else {
1268 self.session_manager
1269 .set_state(&session_id, step_result.state)
1270 .await?;
1271 }
1272
1273 let iter_result = IterationResult {
1275 iteration: iterations,
1276 state: self
1277 .session_manager
1278 .read(&session_id, |s| s.state)
1279 .await?
1280 .unwrap_or(AgentState::Running),
1281 message: step_result.message,
1282 tool_calls: step_result.tool_calls,
1283 should_continue: step_result.should_continue,
1284 };
1285
1286 callback
1288 .after_iteration(&session_id, iterations, &iter_result)
1289 .await?;
1290
1291 if !iter_result.should_continue {
1292 break;
1293 }
1294 }
1295
1296 let session = self
1298 .session_manager
1299 .get(&session_id)
1300 .await?
1301 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1302
1303 let tokens_used = session.tokens_total;
1304
1305 Ok(AgentResult {
1306 session_id: session.session_id.clone(),
1307 final_state: session.state,
1308 messages: session.messages,
1309 tool_calls: session.tool_calls_pending,
1310 tool_results: session.tool_results_cache,
1311 iterations,
1312 tokens_used,
1313 })
1314 }
1315
1316 async fn run_stream_abortable(
1320 &self,
1321 task: &str,
1322 config: AgentConfig,
1323 callback: &dyn AgentLoopCallback,
1324 abort_flag: Arc<AtomicBool>,
1325 ) -> Layer2Result<AgentResult> {
1326 info!(task = %task, agent_id = %config.agent_id, "Starting agent run_stream_abortable");
1327
1328 let session_config = SessionConfig::from(&config);
1330 let session_id = self.session_manager.create(session_config).await?;
1331
1332 let agent_id = config.agent_id.clone();
1334 self.session_manager
1335 .update(&session_id, |s| {
1336 s.agent_id = agent_id;
1337 })
1338 .await?;
1339
1340 if let Some(ref prompt) = config.system_prompt {
1342 self.session_manager
1343 .add_message(&session_id, Message::system(prompt))
1344 .await?;
1345 }
1346
1347 self.session_manager
1349 .add_message(&session_id, Message::user(task))
1350 .await?;
1351
1352 self.session_manager
1354 .set_state(&session_id, AgentState::Running)
1355 .await?;
1356
1357 let mut iterations = 0;
1359 let max_iterations = config.max_iterations;
1360
1361 loop {
1362 if abort_flag.load(Ordering::Relaxed) {
1364 info!(session_id = %session_id, "Abort flag set, stopping agent");
1365 self.session_manager
1366 .set_state(&session_id, AgentState::Stopped)
1367 .await?;
1368 break;
1369 }
1370
1371 iterations += 1;
1372
1373 if iterations > max_iterations {
1374 warn!(
1375 session_id = %session_id,
1376 max = max_iterations,
1377 "Max iterations reached"
1378 );
1379 self.session_manager
1380 .set_state(&session_id, AgentState::Error)
1381 .await?;
1382 return Err(Layer2Error::MaxIterations(max_iterations).into());
1383 }
1384
1385 let should_continue_iter = callback.before_iteration(&session_id, iterations).await?;
1387 if !should_continue_iter {
1388 info!(session_id = %session_id, "Callback requested stop");
1389 break;
1390 }
1391
1392 if abort_flag.load(Ordering::Relaxed) {
1394 info!(session_id = %session_id, "Abort flag set after callback, stopping agent");
1395 self.session_manager
1396 .set_state(&session_id, AgentState::Stopped)
1397 .await?;
1398 break;
1399 }
1400
1401 let can_continue: bool = self
1403 .session_manager
1404 .read(&session_id, |s| s.can_continue())
1405 .await?
1406 .unwrap_or(false);
1407
1408 if !can_continue {
1409 let current_state: AgentState = self
1410 .session_manager
1411 .read(&session_id, |s| s.state)
1412 .await?
1413 .unwrap_or(AgentState::Stopped);
1414
1415 if current_state == AgentState::Stopped {
1416 info!(session_id = %session_id, "Agent stopped by user");
1417 break;
1418 }
1419 break;
1420 }
1421
1422 let step_result = self
1424 .llm_step(
1425 &session_id,
1426 task,
1427 iterations,
1428 max_iterations,
1429 &config,
1430 Some(abort_flag.clone()),
1431 )
1432 .await?;
1433
1434 if let Some(msg) = step_result.message.clone() {
1436 self.session_manager.add_message(&session_id, msg).await?;
1437 }
1438
1439 if !step_result.tool_calls.is_empty() {
1441 let tool_calls = step_result.tool_calls.clone();
1442
1443 if abort_flag.load(Ordering::Relaxed) {
1445 info!(session_id = %session_id, "Abort flag set before tool calls");
1446 self.session_manager
1447 .set_state(&session_id, AgentState::Stopped)
1448 .await?;
1449 break;
1450 }
1451
1452 for tc in &tool_calls {
1454 let should_execute = callback.before_tool_call(&session_id, tc).await?;
1455 if !should_execute {
1456 info!(tool_call_id = %tc.id, "Callback rejected tool call");
1457 continue;
1458 }
1459 }
1460
1461 self.session_manager
1463 .update(&session_id, |s| {
1464 s.tool_calls_pending = tool_calls;
1465 s.state = AgentState::ToolCalling;
1466 })
1467 .await?;
1468
1469 self.execute_pending_tool_calls(&session_id).await?;
1471
1472 let results: Vec<ToolResult> = self
1474 .session_manager
1475 .read(&session_id, |s| s.tool_results_cache.clone())
1476 .await?
1477 .unwrap_or_default();
1478
1479 for tc in &step_result.tool_calls {
1481 if let Some(result) = results.iter().find(|r| r.tool_call_id == tc.id) {
1482 callback.after_tool_call(&session_id, tc, result).await?;
1483 }
1484 }
1485
1486 self.session_manager
1488 .set_state(&session_id, AgentState::WaitingTool)
1489 .await?;
1490 self.session_manager
1491 .set_state(&session_id, AgentState::Running)
1492 .await?;
1493 } else {
1494 self.session_manager
1495 .set_state(&session_id, step_result.state)
1496 .await?;
1497 }
1498
1499 let iter_result = IterationResult {
1501 iteration: iterations,
1502 state: self
1503 .session_manager
1504 .read(&session_id, |s| s.state)
1505 .await?
1506 .unwrap_or(AgentState::Running),
1507 message: step_result.message,
1508 tool_calls: step_result.tool_calls,
1509 should_continue: step_result.should_continue,
1510 };
1511
1512 callback
1514 .after_iteration(&session_id, iterations, &iter_result)
1515 .await?;
1516
1517 if !iter_result.should_continue {
1518 break;
1519 }
1520 }
1521
1522 let session = self
1524 .session_manager
1525 .get(&session_id)
1526 .await?
1527 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1528
1529 let tokens_used = session.tokens_total;
1530
1531 Ok(AgentResult {
1532 session_id: session.session_id.clone(),
1533 final_state: session.state,
1534 messages: session.messages,
1535 tool_calls: session.tool_calls_pending,
1536 tool_results: session.tool_results_cache,
1537 iterations,
1538 tokens_used,
1539 })
1540 }
1541
1542 async fn start(&self, task: &str, config: AgentConfig) -> Layer2Result<SessionId> {
1547 info!(task = %task, agent_id = %config.agent_id, "Starting agent session");
1548
1549 let session_config = SessionConfig::from(&config);
1550 let session_id = self.session_manager.create(session_config).await?;
1551
1552 let agent_id = config.agent_id.clone();
1554 self.session_manager
1555 .update(&session_id, |s| {
1556 s.agent_id = agent_id;
1557 })
1558 .await?;
1559
1560 if let Some(ref prompt) = config.system_prompt {
1562 self.session_manager
1563 .add_message(&session_id, Message::system(prompt))
1564 .await?;
1565 }
1566
1567 self.session_manager
1569 .add_message(&session_id, Message::user(task))
1570 .await?;
1571
1572 self.session_manager
1574 .set_state(&session_id, AgentState::Running)
1575 .await?;
1576
1577 Ok(session_id)
1578 }
1579
1580 async fn pause(&self, session_id: &SessionId) -> Layer2Result<()> {
1585 self.require_session(session_id).await?;
1586
1587 let current_state: AgentState =
1588 self.session_manager
1589 .read(session_id, |s| s.state)
1590 .await?
1591 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1592
1593 match current_state {
1594 AgentState::Running | AgentState::ToolCalling | AgentState::WaitingTool => {
1595 AgentRuntime::validate_transition(current_state, AgentState::Stopped)?;
1596 self.session_manager
1597 .set_state(session_id, AgentState::Stopped)
1598 .await?;
1599 info!(session_id = %session_id, "Agent paused");
1600 Ok(())
1601 }
1602 AgentState::Stopped => {
1603 debug!(session_id = %session_id, "Agent already paused");
1605 Ok(())
1606 }
1607 other => Err(Layer2Error::InvalidStateTransition {
1608 from: other,
1609 to: AgentState::Stopped,
1610 }
1611 .into()),
1612 }
1613 }
1614
1615 async fn resume(&self, session_id: &SessionId) -> Layer2Result<()> {
1619 self.require_session(session_id).await?;
1620
1621 let current_state: AgentState =
1622 self.session_manager
1623 .read(session_id, |s| s.state)
1624 .await?
1625 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1626
1627 match current_state {
1628 AgentState::Stopped => {
1629 AgentRuntime::validate_transition(current_state, AgentState::Running)?;
1630 self.session_manager
1631 .set_state(session_id, AgentState::Running)
1632 .await?;
1633 info!(session_id = %session_id, "Agent resumed");
1634 Ok(())
1635 }
1636 AgentState::Running => {
1637 debug!(session_id = %session_id, "Agent already running");
1639 Ok(())
1640 }
1641 other => Err(Layer2Error::InvalidStateTransition {
1642 from: other,
1643 to: AgentState::Running,
1644 }
1645 .into()),
1646 }
1647 }
1648
1649 async fn stop(&self, session_id: &SessionId) -> Layer2Result<()> {
1654 self.require_session(session_id).await?;
1655
1656 let current_state: AgentState =
1657 self.session_manager
1658 .read(session_id, |s| s.state)
1659 .await?
1660 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1661
1662 match current_state {
1663 AgentState::Running
1664 | AgentState::ToolCalling
1665 | AgentState::WaitingTool
1666 | AgentState::Stopped => {
1667 self.session_manager
1668 .set_state(session_id, AgentState::Stopped)
1669 .await?;
1670 info!(session_id = %session_id, "Agent stopped");
1671 Ok(())
1672 }
1673 AgentState::Idle | AgentState::Completed | AgentState::Error => {
1674 Err(Layer2Error::InvalidStateTransition {
1675 from: current_state,
1676 to: AgentState::Stopped,
1677 }
1678 .into())
1679 }
1680 }
1681 }
1682
1683 fn status(&self, session_id: &SessionId) -> Layer2Result<AgentState> {
1685 self.session_manager
1689 .get_state_sync(session_id)
1690 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()).into())
1691 }
1692
1693 async fn send_message(&self, session_id: &SessionId, message: &str) -> Layer2Result<()> {
1697 self.require_session(session_id).await?;
1698
1699 let current_state: AgentState =
1700 self.session_manager
1701 .read(session_id, |s| s.state)
1702 .await?
1703 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1704
1705 match current_state {
1707 AgentState::Running
1708 | AgentState::WaitingTool
1709 | AgentState::Stopped
1710 | AgentState::Idle
1711 | AgentState::ToolCalling => {
1712 self.session_manager
1713 .add_message(session_id, Message::user(message))
1714 .await?;
1715 debug!(
1716 session_id = %session_id,
1717 msg_len = message.len(),
1718 "Message sent to agent"
1719 );
1720 Ok(())
1721 }
1722 AgentState::Completed | AgentState::Error => {
1723 Err(Layer2Error::InvalidStateTransition {
1724 from: current_state,
1725 to: current_state, }
1727 .into())
1728 }
1729 }
1730 }
1731
1732 async fn submit_tool_result(
1737 &self,
1738 session_id: &SessionId,
1739 tool_call_id: &str,
1740 result: ToolResult,
1741 ) -> Layer2Result<()> {
1742 self.require_session(session_id).await?;
1743
1744 let current_state: AgentState =
1745 self.session_manager
1746 .read(session_id, |s| s.state)
1747 .await?
1748 .ok_or_else(|| Layer2Error::SessionNotFound(session_id.clone()))?;
1749
1750 match current_state {
1751 AgentState::WaitingTool | AgentState::ToolCalling | AgentState::Running => {
1752 let _pending_ids: Vec<String> = self
1754 .session_manager
1755 .read(session_id, |s| {
1756 s.tool_calls_pending
1757 .iter()
1758 .map(|tc| tc.id.clone())
1759 .collect()
1760 })
1761 .await?
1762 .unwrap_or_default();
1763
1764 self.session_manager
1766 .update(session_id, |s| {
1767 s.tool_calls_pending.retain(|tc| tc.id != tool_call_id);
1769 s.tool_results_cache.push(result);
1771
1772 if s.tool_calls_pending.is_empty() {
1774 s.state = AgentState::Running;
1775 }
1776 })
1777 .await?;
1778
1779 debug!(
1780 session_id = %session_id,
1781 tool_call_id = %tool_call_id,
1782 "Tool result submitted"
1783 );
1784 Ok(())
1785 }
1786 other => Err(Layer2Error::InvalidStateTransition {
1787 from: other,
1788 to: AgentState::Running,
1789 }
1790 .into()),
1791 }
1792 }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797 use super::*;
1798 use crate::tool_registry::Tool;
1799 use crate::types::MessageRole;
1800
1801 struct MockTool {
1803 name: String,
1804 description: String,
1805 }
1806
1807 impl MockTool {
1808 fn new(name: &str) -> Self {
1809 Self {
1810 name: name.to_string(),
1811 description: format!("Mock tool: {}", name),
1812 }
1813 }
1814 }
1815
1816 #[async_trait]
1817 impl Tool for MockTool {
1818 fn name(&self) -> &str {
1819 &self.name
1820 }
1821
1822 fn description(&self) -> &str {
1823 &self.description
1824 }
1825
1826 fn parameters(&self) -> serde_json::Value {
1827 serde_json::json!({
1828 "type": "object",
1829 "properties": {
1830 "input": {
1831 "type": "string"
1832 }
1833 }
1834 })
1835 }
1836
1837 async fn execute(&self, args: &str) -> Layer2Result<ToolResult> {
1838 Ok(ToolResult {
1839 tool_call_id: "mock_id".to_string(),
1840 name: self.name.clone(),
1841 content: format!("Executed with args: {}", args),
1842 is_error: false,
1843 })
1844 }
1845 }
1846
1847 #[test]
1848 fn test_agent_config_default() {
1849 let config = AgentConfig::default();
1850 assert_eq!(config.model, "claude-sonnet-4-6");
1851 assert_eq!(config.max_iterations, 100);
1852 assert_eq!(config.temperature, 0.7);
1853 }
1854
1855 #[test]
1856 fn test_agent_runtime_creation() {
1857 let runtime = AgentRuntime::with_defaults();
1858 assert!(runtime.session_manager().stats().total_sessions == 0);
1859 assert!(runtime.tool_registry().count() == 0);
1860 }
1861
1862 #[test]
1863 fn test_agent_config_to_session_config() {
1864 let agent_config = AgentConfig {
1865 agent_id: AgentId::new(),
1866 model: "custom-model".to_string(),
1867 temperature: 0.5,
1868 max_iterations: 50,
1869 system_prompt: Some("Custom prompt".to_string()),
1870 };
1871
1872 let session_config = SessionConfig::from(&agent_config);
1873 assert_eq!(session_config.model, "custom-model");
1874 assert_eq!(session_config.temperature, 0.5);
1875 assert_eq!(session_config.max_iterations, 50);
1876 assert_eq!(
1877 session_config.system_prompt,
1878 Some("Custom prompt".to_string())
1879 );
1880 }
1881
1882 #[test]
1883 fn test_state_transition_validation() {
1884 assert!(AgentRuntime::validate_transition(AgentState::Idle, AgentState::Running).is_ok());
1886 assert!(
1887 AgentRuntime::validate_transition(AgentState::Running, AgentState::ToolCalling).is_ok()
1888 );
1889 assert!(
1890 AgentRuntime::validate_transition(AgentState::Running, AgentState::Stopped).is_ok()
1891 );
1892 assert!(
1893 AgentRuntime::validate_transition(AgentState::Stopped, AgentState::Running).is_ok()
1894 );
1895 assert!(
1896 AgentRuntime::validate_transition(AgentState::Running, AgentState::Completed).is_ok()
1897 );
1898 assert!(
1899 AgentRuntime::validate_transition(AgentState::Running, AgentState::Running).is_ok()
1900 );
1901
1902 assert!(
1904 AgentRuntime::validate_transition(AgentState::Idle, AgentState::ToolCalling).is_err()
1905 );
1906 assert!(
1907 AgentRuntime::validate_transition(AgentState::Completed, AgentState::Running).is_err()
1908 );
1909 assert!(AgentRuntime::validate_transition(AgentState::Error, AgentState::Running).is_err());
1910 }
1911
1912 #[tokio::test]
1913 async fn test_agent_run_basic() {
1914 let runtime = AgentRuntime::with_defaults();
1915 let config = AgentConfig {
1916 max_iterations: 5,
1917 ..Default::default()
1918 };
1919
1920 let result = runtime.run("Test task", config).await;
1921 assert!(result.is_ok());
1922
1923 let agent_result = result.unwrap();
1924 assert!(!agent_result.session_id.0.is_empty());
1925 assert!(agent_result.iterations > 0);
1926 assert!(agent_result.iterations <= 5);
1927 assert!(!agent_result.messages.is_empty());
1929 }
1930
1931 #[tokio::test]
1932 async fn test_agent_run_with_tools() {
1933 let runtime = AgentRuntime::with_defaults();
1934
1935 runtime
1937 .tool_registry()
1938 .register(Box::new(MockTool::new("test_tool")))
1939 .unwrap();
1940
1941 assert!(runtime.tool_registry().count() == 1);
1942
1943 let config = AgentConfig {
1944 max_iterations: 10,
1945 ..Default::default()
1946 };
1947
1948 let result = runtime.run("Test task with tools", config).await;
1949 assert!(result.is_ok());
1950
1951 let agent_result = result.unwrap();
1952 assert!(!agent_result.tool_results.is_empty() || agent_result.tool_calls.is_empty());
1954 }
1955
1956 #[tokio::test]
1957 async fn test_agent_start_creates_session() {
1958 let runtime = AgentRuntime::with_defaults();
1959 let config = AgentConfig::default();
1960
1961 let session_id = runtime.start("Test task", config).await.unwrap();
1962
1963 let session = runtime.session_manager().get(&session_id).await.unwrap();
1965 assert!(session.is_some());
1966
1967 let session = session.unwrap();
1968 assert_eq!(session.state, AgentState::Running);
1969 assert!(!session.messages.is_empty());
1970 }
1971
1972 #[tokio::test]
1973 async fn test_agent_start_with_system_prompt() {
1974 let runtime = AgentRuntime::with_defaults();
1975 let config = AgentConfig {
1976 system_prompt: Some("You are a helpful assistant".to_string()),
1977 ..Default::default()
1978 };
1979
1980 let session_id = runtime.start("Test task", config).await.unwrap();
1981
1982 let messages = runtime
1983 .session_manager()
1984 .get_messages(&session_id)
1985 .await
1986 .unwrap()
1987 .unwrap();
1988
1989 assert!(messages.len() >= 2);
1991 assert_eq!(messages[0].role, MessageRole::System);
1992 assert_eq!(messages[0].content, "You are a helpful assistant");
1993 }
1994
1995 #[tokio::test]
1996 async fn test_agent_pause_resume() {
1997 let runtime = AgentRuntime::with_defaults();
1998 let config = AgentConfig::default();
1999
2000 let session_id = runtime.start("Test task", config).await.unwrap();
2001
2002 let pause_result = runtime.pause(&session_id).await;
2004 assert!(pause_result.is_ok());
2005
2006 let state = runtime
2007 .session_manager()
2008 .get_state(&session_id)
2009 .await
2010 .unwrap()
2011 .unwrap();
2012 assert_eq!(state, AgentState::Stopped);
2013
2014 let resume_result = runtime.resume(&session_id).await;
2016 assert!(resume_result.is_ok());
2017
2018 let state = runtime
2019 .session_manager()
2020 .get_state(&session_id)
2021 .await
2022 .unwrap()
2023 .unwrap();
2024 assert_eq!(state, AgentState::Running);
2025
2026 runtime.pause(&session_id).await.unwrap();
2028 runtime.pause(&session_id).await.unwrap();
2029 let state = runtime
2030 .session_manager()
2031 .get_state(&session_id)
2032 .await
2033 .unwrap()
2034 .unwrap();
2035 assert_eq!(state, AgentState::Stopped);
2036 }
2037
2038 #[tokio::test]
2039 async fn test_agent_stop() {
2040 let runtime = AgentRuntime::with_defaults();
2041 let config = AgentConfig::default();
2042
2043 let session_id = runtime.start("Test task", config).await.unwrap();
2044
2045 runtime.stop(&session_id).await.unwrap();
2047
2048 let state = runtime
2049 .session_manager()
2050 .get_state(&session_id)
2051 .await
2052 .unwrap()
2053 .unwrap();
2054 assert_eq!(state, AgentState::Stopped);
2055 }
2056
2057 #[tokio::test]
2058 async fn test_agent_pause_nonexistent_session() {
2059 let runtime = AgentRuntime::with_defaults();
2060 let fake_id = SessionId::new();
2061
2062 let result = runtime.pause(&fake_id).await;
2063 assert!(result.is_err());
2064 let err = result.unwrap_err();
2066 let err_str = err.to_string();
2067 assert!(err_str.contains("Session not found"));
2068 }
2069
2070 #[tokio::test]
2071 async fn test_agent_status() {
2072 let runtime = AgentRuntime::with_defaults();
2073 let config = AgentConfig::default();
2074
2075 let session_id = runtime.start("Test task", config).await.unwrap();
2076
2077 let status = runtime.status(&session_id);
2078 assert!(status.is_ok());
2079 assert_eq!(status.unwrap(), AgentState::Running);
2080
2081 runtime.pause(&session_id).await.unwrap();
2082 let status = runtime.status(&session_id);
2083 assert!(status.is_ok());
2084 assert_eq!(status.unwrap(), AgentState::Stopped);
2085 }
2086
2087 #[tokio::test]
2088 async fn test_agent_send_message() {
2089 let runtime = AgentRuntime::with_defaults();
2090 let config = AgentConfig::default();
2091
2092 let session_id = runtime.start("Test task", config).await.unwrap();
2093
2094 runtime
2096 .send_message(&session_id, "Additional message")
2097 .await
2098 .unwrap();
2099
2100 let messages = runtime
2101 .session_manager()
2102 .get_messages(&session_id)
2103 .await
2104 .unwrap()
2105 .unwrap();
2106
2107 assert!(messages.len() >= 2);
2109 let last_user_msg = messages.iter().rev().find(|m| m.role == MessageRole::User);
2110 assert!(last_user_msg.is_some());
2111 assert_eq!(last_user_msg.unwrap().content, "Additional message");
2112 }
2113
2114 #[tokio::test]
2115 async fn test_agent_submit_tool_result() {
2116 let runtime = AgentRuntime::with_defaults();
2117 let config = AgentConfig::default();
2118
2119 let session_id = runtime.start("Test task", config).await.unwrap();
2120
2121 runtime
2123 .session_manager()
2124 .update(&session_id, |s| {
2125 s.tool_calls_pending.push(ToolCall {
2126 id: "tc_123".to_string(),
2127 name: "test_tool".to_string(),
2128 arguments: "{}".to_string(),
2129 });
2130 s.state = AgentState::WaitingTool;
2131 })
2132 .await
2133 .unwrap();
2134
2135 let tool_result = ToolResult {
2137 tool_call_id: "tc_123".to_string(),
2138 name: "test_tool".to_string(),
2139 content: "Tool executed successfully".to_string(),
2140 is_error: false,
2141 };
2142
2143 runtime
2144 .submit_tool_result(&session_id, "tc_123", tool_result)
2145 .await
2146 .unwrap();
2147
2148 let pending_count: usize = runtime
2150 .session_manager()
2151 .read(&session_id, |s| s.tool_calls_pending.len())
2152 .await
2153 .unwrap()
2154 .unwrap_or(0);
2155 assert_eq!(pending_count, 0);
2156
2157 let cached_results: Vec<ToolResult> = runtime
2159 .session_manager()
2160 .read(&session_id, |s| s.tool_results_cache.clone())
2161 .await
2162 .unwrap()
2163 .unwrap_or_default();
2164 assert_eq!(cached_results.len(), 1);
2165 assert_eq!(cached_results[0].tool_call_id, "tc_123");
2166
2167 let state = runtime
2169 .session_manager()
2170 .get_state(&session_id)
2171 .await
2172 .unwrap()
2173 .unwrap();
2174 assert_eq!(state, AgentState::Running);
2175 }
2176
2177 #[tokio::test]
2178 async fn test_agent_run_respects_stopped_state() {
2179 let runtime = AgentRuntime::with_defaults();
2180 let config = AgentConfig {
2181 max_iterations: 100,
2182 ..Default::default()
2183 };
2184
2185 let session_id = runtime.start("Test task", config.clone()).await.unwrap();
2187
2188 runtime.stop(&session_id).await.unwrap();
2190
2191 let state = runtime
2195 .session_manager()
2196 .get_state(&session_id)
2197 .await
2198 .unwrap()
2199 .unwrap();
2200 assert_eq!(state, AgentState::Stopped);
2201 }
2202
2203 #[tokio::test]
2204 async fn test_agent_run_max_iterations() {
2205 let runtime = AgentRuntime::with_defaults();
2206 let config = AgentConfig {
2207 max_iterations: 3,
2208 ..Default::default()
2209 };
2210
2211 let result = runtime.run("Test task", config).await.unwrap();
2212
2213 assert!(result.iterations <= 3);
2215 assert_eq!(result.final_state, AgentState::Completed);
2217 }
2218
2219 #[test]
2220 fn test_iteration_result_creation() {
2221 let result = IterationResult {
2222 iteration: 1,
2223 state: AgentState::Running,
2224 message: Some(Message::assistant("Test")),
2225 tool_calls: vec![ToolCall {
2226 id: "tc_1".to_string(),
2227 name: "test_tool".to_string(),
2228 arguments: "{}".to_string(),
2229 }],
2230 should_continue: true,
2231 };
2232
2233 assert_eq!(result.iteration, 1);
2234 assert_eq!(result.state, AgentState::Running);
2235 assert!(result.message.is_some());
2236 assert_eq!(result.tool_calls.len(), 1);
2237 assert!(result.should_continue);
2238 }
2239
2240 #[test]
2241 fn test_agent_runtime_with_permission_manager() {
2242 use crate::permission::policy::PermissionPolicy;
2243
2244 let policy = PermissionPolicy::trusted();
2245 let pm = Arc::new(PermissionManager::new(policy));
2246
2247 let session_manager = Arc::new(ConcurrentSessionManager::default_config());
2248 let tool_registry = Arc::new(ToolRegistry::new());
2249
2250 let runtime = AgentRuntime::with_permissions(session_manager, tool_registry, pm.clone());
2251
2252 assert!(runtime.permission_manager().is_some());
2253 assert_eq!(
2254 runtime.permission_manager().unwrap().security_level(),
2255 crate::permission::policy::SecurityLevel::Trusted
2256 );
2257 }
2258
2259 #[test]
2260 fn test_agent_runtime_set_permission_manager() {
2261 use crate::permission::policy::PermissionPolicy;
2262
2263 let mut runtime = AgentRuntime::with_defaults();
2264 assert!(runtime.permission_manager().is_none());
2265
2266 let policy = PermissionPolicy::default();
2267 let pm = Arc::new(PermissionManager::new(policy));
2268 runtime.set_permission_manager(pm);
2269
2270 assert!(runtime.permission_manager().is_some());
2271 assert_eq!(
2272 runtime.permission_manager().unwrap().security_level(),
2273 crate::permission::policy::SecurityLevel::Standard
2274 );
2275 }
2276
2277 #[test]
2278 fn test_agent_runtime_has_decomposer() {
2279 let runtime = AgentRuntime::with_defaults();
2280 assert!(runtime.task_decomposer().is_some());
2282 }
2283
2284 #[test]
2285 fn test_agent_decompose_task() {
2286 let runtime = AgentRuntime::with_defaults();
2287
2288 let plan = runtime.decompose_task("Read a file and write output");
2290 assert!(plan.is_ok());
2291 let plan = plan.unwrap();
2292 assert!(plan.is_some());
2293
2294 let plan = plan.unwrap();
2295 assert!(!plan.subtasks.is_empty());
2296 assert!(!plan.execution_order.is_empty());
2297 }
2298
2299 #[test]
2300 fn test_agent_set_decomposition_strategy() {
2301 let mut runtime = AgentRuntime::with_defaults();
2302 runtime.set_decomposition_strategy(DecompositionStrategy::Parallel);
2303
2304 let decomposer = runtime.task_decomposer().unwrap();
2305 let plan = decomposer.decompose("Task A and Task B").unwrap();
2306 assert_eq!(plan.strategy, DecompositionStrategy::Parallel);
2307 }
2308
2309 #[tokio::test]
2310 async fn test_agent_run_with_plan_simple() {
2311 let runtime = AgentRuntime::with_defaults();
2312 let config = AgentConfig {
2313 max_iterations: 5,
2314 ..Default::default()
2315 };
2316
2317 let result = runtime.run_with_plan("Simple task", config).await;
2319 assert!(result.is_ok());
2320
2321 let agent_result = result.unwrap();
2322 assert_eq!(agent_result.final_state, AgentState::Completed);
2323 }
2324}