1use super::agent::{Agent, QueueMode};
8use super::profile::AgentProfile;
9use crate::agent_loop::{
10 agent_loop, agent_loop_continue, AfterCompactionEndFn, AfterLoopFn, AfterToolExecutionFn,
11 AfterToolExecutionUpdateFn, AfterTurnFn, AgentLoopConfig, BeforeCompactionStartFn,
12 BeforeLoopFn, BeforeToolExecutionFn, BeforeToolExecutionUpdateFn, BeforeTurnFn, ConvertToLlmFn,
13 OnErrorFn, TransformContextFn,
14};
15use crate::context::{CompactionStrategy, ContextConfig, ExecutionLimits};
16use crate::mcp::{McpClient, McpError, McpToolAdapter};
17use crate::provider::context_translation::ContextTranslationStrategy;
18use crate::provider::{ModelConfig, StreamProvider};
19use crate::types::*;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25fn lock_queue<T>(q: &Mutex<Vec<T>>) -> std::sync::MutexGuard<'_, Vec<T>> {
32 match q.lock() {
33 Ok(g) => g,
34 Err(poison) => {
35 tracing::warn!(
36 "BasicAgent: queue mutex was poisoned; recovering inner Vec. \
37 A prior hook or tool callback panicked while holding the lock."
38 );
39 poison.into_inner()
40 }
41 }
42}
43
44pub struct BasicAgent {
80 pub system_prompt: String,
82 pub model_config: ModelConfig, pub provider_override: Option<Arc<dyn StreamProvider>>,
86 pub thinking_level: ThinkingLevel,
87 pub max_tokens: Option<u32>, pub temperature: Option<f32>, messages: Vec<AgentMessage>, tools: Vec<Arc<dyn AgentTool>>,
109
110 steering_queue: Arc<Mutex<Vec<AgentMessage>>>,
134 follow_up_queue: Arc<Mutex<Vec<AgentMessage>>>,
135 steering_mode: QueueMode,
136 follow_up_mode: QueueMode,
137
138 pub context_config: Option<ContextConfig>,
140 pub execution_limits: Option<ExecutionLimits>,
141 pub cache_config: CacheConfig,
142 pub tool_execution: ToolExecutionStrategy,
143 pub tool_timeout: Option<std::time::Duration>,
144 pub response_format: crate::provider::ResponseFormat,
145 pub retry_config: crate::provider::retry::RetryConfig,
146
147 before_turn: Option<BeforeTurnFn>,
149 after_turn: Option<AfterTurnFn>,
150 on_error: Option<OnErrorFn>,
151
152 input_filters: Vec<Arc<dyn InputFilter>>,
154
155 before_loop: Option<BeforeLoopFn>,
157 after_loop: Option<AfterLoopFn>,
158 before_tool_execution: Option<BeforeToolExecutionFn>,
159 after_tool_execution: Option<AfterToolExecutionFn>,
160 before_tool_execution_update: Option<BeforeToolExecutionUpdateFn>,
161 after_tool_execution_update: Option<AfterToolExecutionUpdateFn>,
162 convert_to_llm: Option<ConvertToLlmFn>,
163 transform_context: Option<TransformContextFn>,
164 before_compaction_start: Option<BeforeCompactionStartFn>,
165 after_compaction_end: Option<AfterCompactionEndFn>,
166 context_translation: Option<Arc<dyn ContextTranslationStrategy>>,
167 prun_pending: Option<Arc<Mutex<Vec<crate::tools::prun::PrunRequest>>>>,
168 revert_pending: Option<Arc<Mutex<Vec<crate::tools::revert::RevertRequest>>>>,
169
170 revert_render_policy: RevertRenderPolicy,
174
175 current_tool: Arc<Mutex<Option<crate::context::CurrentToolExecution>>>,
179
180 config_id: Option<String>,
182 profile: Option<AgentProfile>,
183 workspace: Option<std::path::PathBuf>,
184
185 cancel: Option<CancellationToken>,
187 is_streaming: bool, agent_id: String,
215 session_id: String,
216 loop_counters: HashMap<String, usize>,
217 last_loop_id: Option<String>,
218 last_active_at: Option<chrono::DateTime<chrono::Utc>>,
221 session: Option<crate::session::Session>,
223}
224
225impl BasicAgent {
226 pub fn new(model_config: ModelConfig) -> Self {
227 Self {
228 model_config,
229 provider_override: None,
230 system_prompt: String::new(),
231 thinking_level: ThinkingLevel::Off,
232 max_tokens: None,
233 temperature: None,
234 messages: Vec::new(),
235 tools: Vec::new(),
236 steering_queue: Arc::new(Mutex::new(Vec::new())), follow_up_queue: Arc::new(Mutex::new(Vec::new())),
238 steering_mode: QueueMode::OneAtATime,
239 follow_up_mode: QueueMode::OneAtATime,
240 context_config: Some(ContextConfig::default()), execution_limits: Some(ExecutionLimits::default()), cache_config: CacheConfig::default(),
243 tool_execution: ToolExecutionStrategy::default(), tool_timeout: None,
245 response_format: crate::provider::ResponseFormat::Text,
246 retry_config: crate::provider::retry::RetryConfig::default(), before_turn: None,
248 after_turn: None,
249 on_error: None,
250 input_filters: Vec::new(),
251 before_loop: None,
252 after_loop: None,
253 before_tool_execution: None,
254 after_tool_execution: None,
255 before_tool_execution_update: None,
256 after_tool_execution_update: None,
257 convert_to_llm: None,
258 transform_context: None,
259 before_compaction_start: None,
260 after_compaction_end: None,
261 context_translation: None,
262 prun_pending: None,
263 revert_pending: None,
264 revert_render_policy: RevertRenderPolicy::default(),
265 current_tool: Arc::new(Mutex::new(None)),
266 config_id: None,
267 profile: None,
268 workspace: None,
269 cancel: None,
270 is_streaming: false,
271 agent_id: uuid::Uuid::new_v4().to_string(),
272 session_id: uuid::Uuid::new_v4().to_string(),
273 loop_counters: HashMap::new(),
274 last_loop_id: None,
275 last_active_at: None,
276 session: None,
277 }
278 }
279
280 pub fn with_session(mut self, session: crate::session::Session) -> Self {
282 self.session = Some(session);
283 self
284 }
285
286 pub fn take_session(&mut self) -> Option<crate::session::Session> {
288 self.session.take()
289 }
290
291 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
311 self.system_prompt = prompt.into();
312 self
313 }
314
315 pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
316 self.thinking_level = level;
317 self
318 }
319
320 pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
321 self.tools = tools;
322 self
323 }
324
325 pub fn tools(&self) -> &[Arc<dyn AgentTool>] {
330 &self.tools
331 }
332
333 pub fn current_tool_timeout(&self) -> Option<std::time::Duration> {
353 let guard = self.current_tool.lock().ok()?;
354 guard.as_ref().and_then(|t| t.timeout)
355 }
356
357 pub fn with_model_config(mut self, config: ModelConfig) -> Self {
358 self.model_config = config;
359 self
360 }
361
362 pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
365 self.provider_override = Some(provider);
366 self
367 }
368
369 pub fn with_max_tokens(mut self, max: u32) -> Self {
370 self.max_tokens = Some(max);
371 self
372 }
373
374 pub fn with_context_config(mut self, config: ContextConfig) -> Self {
375 self.context_config = Some(config);
376 self
377 }
378
379 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
380 self.cache_config = config;
381 self
382 }
383
384 pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
385 self.tool_execution = strategy;
386 self
387 }
388
389 pub fn with_tool_timeout(mut self, timeout: std::time::Duration) -> Self {
391 self.tool_timeout = Some(timeout);
392 self
393 }
394
395 pub fn with_response_format(mut self, format: crate::provider::ResponseFormat) -> Self {
397 self.response_format = format;
398 self
399 }
400
401 pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
402 self.retry_config = config;
403 self
404 }
405
406 pub fn with_skills(mut self, skills: crate::context::skills::SkillSet) -> Self {
412 let prompt_fragment = skills.format_for_prompt();
413 if !prompt_fragment.is_empty() {
414 if self.system_prompt.is_empty() {
415 self.system_prompt = prompt_fragment;
416 } else {
417 self.system_prompt = format!("{}\n\n{}", self.system_prompt, prompt_fragment);
418 }
419 }
420 self
421 }
422
423 pub fn with_execution_limits(mut self, limits: ExecutionLimits) -> Self {
424 self.execution_limits = Some(limits);
425 self
426 }
427
428 pub fn with_messages(mut self, msgs: Vec<AgentMessage>) -> Self {
429 self.messages = msgs;
430 self
431 }
432
433 pub fn on_before_turn(
455 mut self,
456 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
457 ) -> Self {
458 self.before_turn = Some(Arc::new(move |msgs, turn| {
459 let r = f(msgs, turn);
460 Box::pin(async move { r })
461 }));
462 self
463 }
464
465 pub fn on_after_turn(
466 mut self,
467 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
468 ) -> Self {
469 self.after_turn = Some(Arc::new(move |msgs, usage| {
470 f(msgs, usage);
471 Box::pin(async move {})
472 }));
473 self
474 }
475
476 pub fn on_error(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
477 self.on_error = Some(Arc::new(move |err| {
478 f(err);
479 Box::pin(async move {})
480 }));
481 self
482 }
483
484 pub fn with_input_filter(mut self, filter: impl InputFilter + 'static) -> Self {
486 self.input_filters.push(Arc::new(filter));
487 self
488 }
489
490 pub fn with_compaction_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
494 if let Some(ref mut ctx) = self.context_config {
495 ctx.compaction.in_memory_strategy = Some(Arc::new(strategy));
496 }
497 self
498 }
499
500 pub fn with_profile(mut self, profile: AgentProfile) -> Self {
504 if let Some(ref prompt) = profile.system_prompt {
506 if self.system_prompt.is_empty() {
507 self.system_prompt = prompt.clone();
508 }
509 }
510 if let Some(level) = profile.thinking_level {
511 if self.thinking_level == ThinkingLevel::Off {
512 self.thinking_level = level;
513 }
514 }
515 if let Some(temp) = profile.temperature {
516 if self.temperature.is_none() {
517 self.temperature = Some(temp);
518 }
519 }
520 if let Some(max) = profile.max_tokens {
521 if self.max_tokens.is_none() {
522 self.max_tokens = Some(max);
523 }
524 }
525 if let Some(ref id) = profile.config_id {
526 if self.config_id.is_none() {
527 self.config_id = Some(id.clone());
528 }
529 }
530 self.profile = Some(profile);
531 self
532 }
533
534 pub fn with_temperature(mut self, temp: f32) -> Self {
536 self.temperature = Some(temp);
537 self
538 }
539
540 pub fn with_config_id(mut self, id: impl Into<String>) -> Self {
542 self.config_id = Some(id.into());
543 self
544 }
545
546 pub fn with_workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
549 self.workspace = Some(path.into());
550 self
551 }
552
553 pub fn on_before_loop(
558 mut self,
559 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
560 ) -> Self {
561 self.before_loop = Some(Arc::new(move |msgs, idx| {
562 let r = f(msgs, idx);
563 Box::pin(async move { r })
564 }));
565 self
566 }
567
568 pub fn on_after_loop(
570 mut self,
571 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
572 ) -> Self {
573 self.after_loop = Some(Arc::new(move |msgs, usage| {
574 f(msgs, usage);
575 Box::pin(async move {})
576 }));
577 self
578 }
579
580 pub fn on_before_tool_execution(
582 mut self,
583 f: impl Fn(&str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
584 ) -> Self {
585 self.before_tool_execution = Some(Arc::new(move |name, id, args| {
586 let r = f(name, id, args);
587 Box::pin(async move { r })
588 }));
589 self
590 }
591
592 pub fn on_after_tool_execution(
594 mut self,
595 f: impl Fn(&str, &str, bool) + Send + Sync + 'static,
596 ) -> Self {
597 self.after_tool_execution = Some(Arc::new(move |name, id, is_error| {
598 f(name, id, is_error);
599 Box::pin(async move {})
600 }));
601 self
602 }
603
604 pub fn on_before_tool_execution_update(
611 mut self,
612 f: impl Fn(&str, &str, &str) -> bool + Send + Sync + 'static,
613 ) -> Self {
614 self.before_tool_execution_update = Some(Arc::new(move |name, id, text| {
615 let r = f(name, id, text);
616 Box::pin(async move { r })
617 }));
618 self
619 }
620
621 pub fn on_after_tool_execution_update(
627 mut self,
628 f: impl Fn(&str, &str, &str) + Send + Sync + 'static,
629 ) -> Self {
630 self.after_tool_execution_update = Some(Arc::new(move |name, id, text| {
631 f(name, id, text);
632 Box::pin(async move {})
633 }));
634 self
635 }
636
637 pub fn with_prun_tool(mut self) -> Self {
640 let pending = Arc::new(Mutex::new(Vec::new()));
641 self.tools.push(Arc::new(crate::tools::PrunTool::new(
642 pending.clone(),
643 crate::tools::PrunVariant::Prun,
644 )));
645 self.tools.push(Arc::new(crate::tools::PrunTool::new(
646 pending.clone(),
647 crate::tools::PrunVariant::PrunWithMemo,
648 )));
649 self.prun_pending = Some(pending);
650 self
651 }
652
653 pub fn with_revert_tool(mut self) -> Self {
661 let pending = Arc::new(Mutex::new(Vec::new()));
662 self.tools
663 .push(Arc::new(crate::tools::RevertTool::new(pending.clone())));
664 self.revert_pending = Some(pending);
665 self
666 }
667
668 pub fn with_revert_render_policy(mut self, policy: RevertRenderPolicy) -> Self {
684 self.revert_render_policy = policy;
685 self
686 }
687
688 pub fn with_convert_to_llm(
690 mut self,
691 f: impl Fn(&[AgentMessage]) -> Vec<Message> + Send + Sync + 'static,
692 ) -> Self {
693 self.convert_to_llm = Some(Arc::new(f));
694 self
695 }
696
697 pub fn with_transform_context(
699 mut self,
700 f: impl Fn(Vec<AgentMessage>) -> Vec<AgentMessage> + Send + Sync + 'static,
701 ) -> Self {
702 self.transform_context = Some(Arc::new(f));
703 self
704 }
705
706 pub fn with_block_compaction_strategy(
709 mut self,
710 strategy: impl crate::context::BlockCompactionStrategy + 'static,
711 ) -> Self {
712 if let Some(ref mut ctx) = self.context_config {
713 ctx.compaction.block_strategy = Some(Arc::new(strategy));
714 }
715 self
716 }
717
718 pub fn on_before_compaction_start(
720 mut self,
721 f: impl Fn(usize, usize) -> bool + Send + Sync + 'static,
722 ) -> Self {
723 self.before_compaction_start = Some(Arc::new(move |est, mc| {
724 let r = f(est, mc);
725 Box::pin(async move { r })
726 }));
727 self
728 }
729
730 pub fn on_after_compaction_end(
732 mut self,
733 f: impl Fn(usize, usize, usize, usize) + Send + Sync + 'static,
734 ) -> Self {
735 self.after_compaction_end = Some(Arc::new(move |mb, ma, tb, ta| {
736 f(mb, ma, tb, ta);
737 Box::pin(async move {})
738 }));
739 self
740 }
741
742 pub fn with_context_translation(
744 mut self,
745 strategy: Arc<dyn ContextTranslationStrategy>,
746 ) -> Self {
747 self.context_translation = Some(strategy);
748 self
749 }
750
751 pub fn with_sub_agent(mut self, sub: crate::agents::SubAgentTool) -> Self {
753 self.tools.push(Arc::new(sub));
754 self
755 }
756
757 pub fn without_context_management(mut self) -> Self {
759 self.context_config = None;
760 self.execution_limits = None;
761 self
762 }
763
764 #[cfg(feature = "openapi")]
768 pub async fn with_openapi_file(
769 mut self,
770 path: impl AsRef<std::path::Path>,
771 config: crate::openapi::OpenApiConfig,
772 filter: &crate::openapi::OperationFilter,
773 ) -> Result<Self, crate::openapi::OpenApiError> {
774 let adapters = crate::openapi::OpenApiToolAdapter::from_file(path, config, filter).await?;
775 for adapter in adapters {
776 self.tools.push(Arc::new(adapter));
777 }
778 Ok(self)
779 }
780
781 #[cfg(feature = "openapi")]
783 pub async fn with_openapi_url(
784 mut self,
785 url: &str,
786 config: crate::openapi::OpenApiConfig,
787 filter: &crate::openapi::OperationFilter,
788 ) -> Result<Self, crate::openapi::OpenApiError> {
789 let adapters = crate::openapi::OpenApiToolAdapter::from_url(url, config, filter).await?;
790 for adapter in adapters {
791 self.tools.push(Arc::new(adapter));
792 }
793 Ok(self)
794 }
795
796 #[cfg(feature = "openapi")]
798 pub fn with_openapi_spec(
799 mut self,
800 spec_str: &str,
801 config: crate::openapi::OpenApiConfig,
802 filter: &crate::openapi::OperationFilter,
803 ) -> Result<Self, crate::openapi::OpenApiError> {
804 let adapters = crate::openapi::OpenApiToolAdapter::from_str(spec_str, config, filter)?;
805 for adapter in adapters {
806 self.tools.push(Arc::new(adapter));
807 }
808 Ok(self)
809 }
810
811 pub async fn with_mcp_server_stdio(
815 mut self,
816 command: &str, args: &[&str], env: Option<HashMap<String, String>>, ) -> Result<Self, McpError> {
820 let client = McpClient::connect_stdio(command, args, env).await?;
821 let client = Arc::new(tokio::sync::Mutex::new(client));
822 let adapters = McpToolAdapter::from_client(client).await?;
823 for adapter in adapters {
824 self.tools.push(Arc::new(adapter));
825 }
826 Ok(self)
827 }
828
829 pub async fn with_mcp_server_http(mut self, url: &str) -> Result<Self, McpError> {
831 let client = McpClient::connect_http(url).await?;
832 let client = Arc::new(tokio::sync::Mutex::new(client));
833 let adapters = McpToolAdapter::from_client(client).await?;
834 for adapter in adapters {
835 self.tools.push(Arc::new(adapter));
836 }
837 Ok(self)
838 }
839
840 pub async fn prompt(&mut self, text: impl Into<String>) -> mpsc::UnboundedReceiver<AgentEvent> {
851 let (tx, rx) = mpsc::unbounded_channel();
852 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
853 self.prompt_messages_with_sender(vec![msg], tx).await;
854 rx
855 }
856
857 pub async fn prompt_with_sender(
861 &mut self,
862 text: impl Into<String>,
863 tx: mpsc::UnboundedSender<AgentEvent>,
864 ) {
865 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
866 self.prompt_messages_with_sender(vec![msg], tx).await;
867 }
868
869 fn next_loop_id(&mut self, config: &AgentLoopConfig) -> String {
888 let effective_config_id = if let Some(ref id) = config.config_id {
889 id.clone()
890 } else {
891 let slugify = |s: &str| -> String {
892 s.chars()
893 .map(|c| {
894 if c.is_alphanumeric() || c == '-' {
895 c
896 } else {
897 '-'
898 }
899 })
900 .collect()
901 };
902 let thinking_part = if config.thinking_level != ThinkingLevel::Off {
903 ".thinking"
904 } else {
905 ""
906 };
907 format!(
908 "{}.{}{}",
909 config.model_config.provider,
910 slugify(&config.model_config.id),
911 thinking_part
912 )
913 };
914 let thread_key = format!("{}.{}", self.session_id, effective_config_id);
915 let n = self.loop_counters.entry(thread_key.clone()).or_insert(0);
916 *n += 1;
917 format!("{}.{}", thread_key, n)
918 }
919
920 pub async fn compact_context_with_sender(
959 &mut self,
960 tx: &tokio::sync::mpsc::UnboundedSender<AgentEvent>,
961 ) {
962 let (Some(session), Some(ctx_config)) =
963 (self.session.as_mut(), self.context_config.as_ref())
964 else {
965 return; };
967 let comp = &ctx_config.compaction;
968 let max_tokens = ctx_config.max_context_tokens;
969
970 let loop_id = self
971 .last_loop_id
972 .clone()
973 .unwrap_or_else(|| "compaction".to_string());
974
975 let _ = tx.send(AgentEvent::AgentStart {
976 agent_id: self.agent_id.clone(),
977 session_id: self.session_id.clone(),
978 loop_id: loop_id.clone(),
979 parent_loop_id: self.last_loop_id.clone(),
980 continuation_kind: ContinuationKind::Compaction,
981 timestamp: chrono::Utc::now(),
982 metadata: None,
983 config_snapshot: None, });
985
986 let msgs_before = self.messages.len();
987 let tokens_before = crate::context::total_tokens(&self.messages);
988
989 let _ = tx.send(AgentEvent::CompactionStarted {
990 loop_id: loop_id.clone(),
991 estimated_tokens: tokens_before,
992 message_count: msgs_before,
993 timestamp: chrono::Utc::now(),
994 });
995
996 let strategy: &dyn crate::context::BlockCompactionStrategy =
997 &crate::context::DefaultBlockCompaction;
998 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
999
1000 if let Some(record) = session.get_loop_mut(current_lid) {
1002 record.messages = self.messages.clone();
1003 }
1004
1005 crate::context::compact_session_loops(
1006 session,
1007 current_lid,
1008 strategy,
1009 comp,
1010 max_tokens,
1011 None,
1012 )
1013 .await;
1014 self.messages = crate::context::build_context_from_session(
1015 session,
1016 current_lid,
1017 comp,
1018 max_tokens,
1019 None,
1020 );
1021
1022 let msgs_after = self.messages.len();
1023 let tokens_after = crate::context::total_tokens(&self.messages);
1024 let chain = session.loop_chain_to(current_lid);
1025 let loops_compacted = chain
1026 .iter()
1027 .filter(|lid| {
1028 session
1029 .get_loop(lid)
1030 .map(|r| r.compaction_block.is_some())
1031 .unwrap_or(false)
1032 })
1033 .count();
1034
1035 let _ = tx.send(AgentEvent::CompactionEnded {
1036 loop_id: loop_id.clone(),
1037 messages_before: msgs_before,
1038 messages_after: msgs_after,
1039 estimated_tokens_before: tokens_before,
1040 estimated_tokens_after: tokens_after,
1041 loops_compacted,
1042 timestamp: chrono::Utc::now(),
1043 });
1044
1045 let _ = tx.send(AgentEvent::AgentEnd {
1046 loop_id,
1047 messages: vec![],
1048 usage: Usage::default(),
1049 timestamp: chrono::Utc::now(),
1050 rejection: None,
1051 });
1052 }
1053
1054 pub async fn compact_context(&mut self) -> usize {
1062 let (Some(session), Some(ctx_config)) =
1063 (self.session.as_mut(), self.context_config.as_ref())
1064 else {
1065 return 0; };
1067 let comp = &ctx_config.compaction;
1068 let max_tokens = ctx_config.max_context_tokens;
1069
1070 let strategy: &dyn crate::context::BlockCompactionStrategy =
1071 &crate::context::DefaultBlockCompaction;
1072 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
1073
1074 if let Some(record) = session.get_loop_mut(current_lid) {
1075 record.messages = self.messages.clone();
1076 }
1077
1078 crate::context::compact_session_loops(
1079 session,
1080 current_lid,
1081 strategy,
1082 comp,
1083 max_tokens,
1084 None,
1085 )
1086 .await;
1087 self.messages = crate::context::build_context_from_session(
1088 session,
1089 current_lid,
1090 comp,
1091 max_tokens,
1092 None,
1093 );
1094
1095 let chain = session.loop_chain_to(current_lid);
1096 chain
1097 .iter()
1098 .filter(|lid| {
1099 session
1100 .get_loop(lid)
1101 .map(|r| r.compaction_block.is_some())
1102 .unwrap_or(false)
1103 })
1104 .count()
1105 }
1106
1107 pub fn build_config(&self) -> Result<AgentLoopConfig, super::agent::AgentBuildError> {
1110 let steering_queue = self.steering_queue.clone(); let steering_mode = self.steering_mode; let follow_up_queue = self.follow_up_queue.clone();
1115 let follow_up_mode = self.follow_up_mode;
1116
1117 Ok(AgentLoopConfig {
1122 model_config: self.model_config.clone(),
1123 provider_override: self.provider_override.clone(),
1124 thinking_level: self.thinking_level,
1125 max_tokens: self.max_tokens,
1126 temperature: self.temperature,
1127 convert_to_llm: self.convert_to_llm.clone(),
1128 transform_context: self.transform_context.clone(),
1129 get_steering_messages: Some(Box::new(move || {
1130 let mut queue = lock_queue(&steering_queue); match steering_mode {
1134 QueueMode::OneAtATime => {
1135 if queue.is_empty() {
1136 vec![]
1137 } else {
1138 vec![queue.remove(0)] }
1140 }
1141 QueueMode::All => queue.drain(..).collect(), }
1143 })),
1144 context_config: self.context_config.clone(),
1145 execution_limits: self.execution_limits.clone(),
1146 cache_config: self.cache_config.clone(),
1147 tool_execution: self.tool_execution.clone(),
1148 tool_timeout: self.tool_timeout,
1149 response_format: self.response_format.clone(),
1150 retry_config: self.retry_config.clone(),
1151 get_follow_up_messages: Some(Box::new(move || {
1152 let mut queue = lock_queue(&follow_up_queue);
1153 match follow_up_mode {
1154 QueueMode::OneAtATime => {
1155 if queue.is_empty() {
1156 vec![]
1157 } else {
1158 vec![queue.remove(0)]
1159 }
1160 }
1161 QueueMode::All => queue.drain(..).collect(),
1162 }
1163 })),
1164 before_turn: self.before_turn.clone(),
1165 after_turn: self.after_turn.clone(),
1166 before_loop: self.before_loop.clone(),
1167 after_loop: self.after_loop.clone(),
1168 before_tool_execution: self.before_tool_execution.clone(),
1169 after_tool_execution: self.after_tool_execution.clone(),
1170 before_tool_execution_update: self.before_tool_execution_update.clone(),
1171 after_tool_execution_update: self.after_tool_execution_update.clone(),
1172 before_compaction_start: self.before_compaction_start.clone(),
1173 after_compaction_end: self.after_compaction_end.clone(),
1174 on_error: self.on_error.clone(),
1175 input_filters: self.input_filters.clone(),
1176 first_turn_trigger: TurnTrigger::User,
1177 config_id: self.config_id.clone(),
1178 context_translation: self.context_translation.clone(),
1179 prun_pending: self.prun_pending.clone(),
1180 revert_pending: self.revert_pending.clone(),
1181 current_tool: Some(self.current_tool.clone()),
1182 revert_render_policy: self.revert_render_policy,
1183 })
1184 }
1185
1186 pub fn new_session(&mut self) -> String {
1195 self.session_id = uuid::Uuid::new_v4().to_string();
1196 self.loop_counters.clear();
1197 self.last_loop_id = None;
1198 self.last_active_at = None;
1202 self.session_id.clone()
1203 }
1204
1205 pub fn check_and_rotate(&mut self, threshold: std::time::Duration) -> Option<String> {
1213 let last = self.last_active_at?;
1214 let elapsed = (chrono::Utc::now() - last)
1215 .to_std()
1216 .unwrap_or(std::time::Duration::ZERO);
1217 if elapsed > threshold {
1218 Some(self.new_session())
1219 } else {
1220 None
1221 }
1222 }
1223}
1224
1225#[async_trait::async_trait]
1228impl Agent for BasicAgent {
1229 async fn prompt_messages_with_sender(
1233 &mut self,
1234 messages: Vec<AgentMessage>,
1235 tx: mpsc::UnboundedSender<AgentEvent>,
1236 ) {
1237 assert!(
1249 !self.is_streaming,
1250 "Agent is already streaming. Use steer() or follow_up()."
1251 );
1252
1253 self.last_active_at = Some(chrono::Utc::now());
1254 let cancel = CancellationToken::new();
1255 self.cancel = Some(cancel.clone()); self.is_streaming = true;
1257
1258 let config = self
1283 .build_config()
1284 .expect("BasicAgent always provides a model_config");
1285 let loop_id = self.next_loop_id(&config);
1286 self.last_loop_id = Some(loop_id.clone());
1287
1288 let mut context = AgentContext {
1289 system_prompt: self.system_prompt.clone(),
1290 messages: self.messages.clone(),
1291 tools: std::mem::take(&mut self.tools), agent_id: Some(self.agent_id.clone()),
1293 session_id: Some(self.session_id.clone()),
1294 loop_id: Some(loop_id),
1295 parent_loop_id: None, continuation_kind: None,
1297 session: self.session.take(), user_context: Vec::new(),
1299 inrun_context: Vec::new(),
1300 active_node_id: None,
1301 next_node_id: 0,
1302 };
1303
1304 let _new_messages = agent_loop(messages, &mut context, &config, tx, cancel).await;
1305
1306 self.tools = context.tools;
1307 self.messages = context.messages;
1308 self.session = context.session; self.is_streaming = false;
1310 self.cancel = None;
1311 }
1312
1313 async fn continue_loop_with_sender(
1321 &mut self,
1322 tx: mpsc::UnboundedSender<AgentEvent>, kind: ContinuationKind, ) {
1325 assert!(!self.is_streaming, "Agent is already streaming.");
1326 assert!(!self.messages.is_empty(), "No messages to continue from.");
1327
1328 let cancel = CancellationToken::new();
1329 self.cancel = Some(cancel.clone());
1330 self.is_streaming = true;
1331
1332 let config = self
1335 .build_config()
1336 .expect("BasicAgent always provides a model_config");
1337 let loop_id = self.next_loop_id(&config);
1338 let parent_loop_id = self.last_loop_id.clone(); self.last_loop_id = Some(loop_id.clone());
1340
1341 let tag = chrono::Utc::now().to_rfc3339();
1343 let kind_with_tag = match kind {
1344 ContinuationKind::Initial => ContinuationKind::Default, ContinuationKind::Default => ContinuationKind::Default,
1346 ContinuationKind::Rerun { .. } => ContinuationKind::Rerun { tag },
1347 ContinuationKind::Branch { .. } => ContinuationKind::Branch { tag },
1348 ContinuationKind::Compaction => ContinuationKind::Compaction,
1349 };
1350
1351 let mut context = AgentContext {
1353 system_prompt: self.system_prompt.clone(),
1354 messages: self.messages.clone(),
1355 tools: std::mem::take(&mut self.tools),
1356 agent_id: Some(self.agent_id.clone()),
1357 session_id: Some(self.session_id.clone()),
1358 loop_id: Some(loop_id),
1359 parent_loop_id,
1360 continuation_kind: Some(kind_with_tag),
1361 session: self.session.take(),
1362 user_context: Vec::new(),
1363 inrun_context: Vec::new(),
1364 active_node_id: None,
1365 next_node_id: 0,
1366 };
1367
1368 let _new_messages = agent_loop_continue(&mut context, &config, tx, cancel).await;
1369
1370 self.tools = context.tools;
1371 self.messages = context.messages;
1372 self.session = context.session;
1373 self.is_streaming = false;
1374 self.cancel = None;
1375 }
1376
1377 fn messages(&self) -> &[AgentMessage] {
1380 &self.messages
1381 }
1382
1383 fn is_streaming(&self) -> bool {
1384 self.is_streaming
1385 }
1386
1387 fn agent_id(&self) -> &str {
1388 &self.agent_id
1389 }
1390
1391 fn session_id(&self) -> &str {
1392 &self.session_id
1393 }
1394
1395 fn last_loop_id(&self) -> Option<&str> {
1396 self.last_loop_id.as_deref()
1397 }
1398
1399 fn clear_messages(&mut self) {
1402 self.messages.clear();
1403 }
1404
1405 fn append_message(&mut self, msg: AgentMessage) {
1406 self.messages.push(msg);
1407 }
1408
1409 fn replace_messages(&mut self, msgs: Vec<AgentMessage>) {
1410 self.messages = msgs;
1411 }
1412
1413 fn save_messages(&self) -> Result<String, serde_json::Error> {
1414 serde_json::to_string(&self.messages)
1415 }
1416
1417 fn restore_messages(&mut self, json: &str) -> Result<(), serde_json::Error> {
1418 let msgs: Vec<AgentMessage> = serde_json::from_str(json)?;
1419 self.messages = msgs;
1420 Ok(())
1421 }
1422
1423 fn set_tools(&mut self, tools: Vec<Arc<dyn AgentTool>>) {
1424 self.tools = tools;
1425 }
1426
1427 fn abort(&self) {
1430 if let Some(ref cancel) = self.cancel {
1431 cancel.cancel();
1432 }
1433 }
1434
1435 fn reset(&mut self) {
1436 self.messages.clear();
1437 self.clear_all_queues();
1438 self.is_streaming = false;
1439 self.cancel = None;
1440 }
1441
1442 fn steer(&self, msg: AgentMessage) {
1463 lock_queue(&self.steering_queue).push(msg);
1464 }
1465
1466 fn follow_up(&self, msg: AgentMessage) {
1467 lock_queue(&self.follow_up_queue).push(msg);
1468 }
1469
1470 fn clear_steering_queue(&self) {
1471 lock_queue(&self.steering_queue).clear();
1472 }
1473
1474 fn clear_follow_up_queue(&self) {
1475 lock_queue(&self.follow_up_queue).clear();
1476 }
1477
1478 fn set_steering_mode(&mut self, mode: QueueMode) {
1479 self.steering_mode = mode;
1480 }
1481
1482 fn set_follow_up_mode(&mut self, mode: QueueMode) {
1483 self.follow_up_mode = mode;
1484 }
1485
1486 fn profile(&self) -> Option<&AgentProfile> {
1489 self.profile.as_ref()
1490 }
1491
1492 fn system_prompt(&self) -> &str {
1493 &self.system_prompt
1494 }
1495
1496 fn model_config(&self) -> Option<&ModelConfig> {
1497 Some(&self.model_config)
1498 }
1499
1500 fn thinking_level(&self) -> ThinkingLevel {
1501 self.thinking_level
1502 }
1503
1504 fn temperature(&self) -> Option<f32> {
1505 self.temperature
1506 }
1507
1508 fn max_tokens(&self) -> Option<u32> {
1509 self.max_tokens
1510 }
1511
1512 fn context_config(&self) -> Option<&ContextConfig> {
1513 self.context_config.as_ref()
1514 }
1515
1516 fn execution_limits(&self) -> Option<&ExecutionLimits> {
1517 self.execution_limits.as_ref()
1518 }
1519
1520 fn cache_config(&self) -> CacheConfig {
1521 self.cache_config.clone()
1522 }
1523
1524 fn tool_execution(&self) -> ToolExecutionStrategy {
1525 self.tool_execution.clone()
1526 }
1527
1528 fn tool_timeout(&self) -> Option<std::time::Duration> {
1529 self.tool_timeout
1530 }
1531
1532 fn response_format(&self) -> crate::provider::ResponseFormat {
1533 self.response_format.clone()
1534 }
1535
1536 fn retry_config(&self) -> crate::provider::retry::RetryConfig {
1537 self.retry_config.clone()
1538 }
1539
1540 fn session(&self) -> Option<&crate::session::Session> {
1543 self.session.as_ref()
1544 }
1545
1546 fn workspace(&self) -> Option<&std::path::Path> {
1547 self.workspace.as_deref()
1548 }
1549
1550 fn set_before_turn(&mut self, f: Option<BeforeTurnFn>) {
1553 self.before_turn = f;
1554 }
1555
1556 fn set_after_turn(&mut self, f: Option<AfterTurnFn>) {
1557 self.after_turn = f;
1558 }
1559
1560 fn set_before_loop(&mut self, f: Option<BeforeLoopFn>) {
1561 self.before_loop = f;
1562 }
1563
1564 fn set_after_loop(&mut self, f: Option<AfterLoopFn>) {
1565 self.after_loop = f;
1566 }
1567
1568 fn set_before_tool_execution(&mut self, f: Option<BeforeToolExecutionFn>) {
1569 self.before_tool_execution = f;
1570 }
1571
1572 fn set_after_tool_execution(&mut self, f: Option<AfterToolExecutionFn>) {
1573 self.after_tool_execution = f;
1574 }
1575
1576 fn set_before_tool_execution_update(&mut self, f: Option<BeforeToolExecutionUpdateFn>) {
1577 self.before_tool_execution_update = f;
1578 }
1579
1580 fn set_after_tool_execution_update(&mut self, f: Option<AfterToolExecutionUpdateFn>) {
1581 self.after_tool_execution_update = f;
1582 }
1583
1584 fn set_convert_to_llm(&mut self, f: Option<ConvertToLlmFn>) {
1585 self.convert_to_llm = f;
1586 }
1587
1588 fn set_transform_context(&mut self, f: Option<TransformContextFn>) {
1589 self.transform_context = f;
1590 }
1591
1592 fn set_block_compaction_strategy(
1593 &mut self,
1594 s: Option<Arc<dyn crate::context::BlockCompactionStrategy>>,
1595 ) {
1596 if let Some(ref mut ctx) = self.context_config {
1598 ctx.compaction.block_strategy = s;
1599 }
1600 }
1601
1602 fn set_before_compaction_start(&mut self, f: Option<BeforeCompactionStartFn>) {
1603 self.before_compaction_start = f;
1604 }
1605
1606 fn set_after_compaction_end(&mut self, f: Option<AfterCompactionEndFn>) {
1607 self.after_compaction_end = f;
1608 }
1609
1610 fn set_context_translation(&mut self, s: Option<Arc<dyn ContextTranslationStrategy>>) {
1611 self.context_translation = s;
1612 }
1613
1614 fn context_translation(&self) -> Option<Arc<dyn ContextTranslationStrategy>> {
1615 self.context_translation.clone()
1616 }
1617}
1618
1619#[cfg(test)]
1620mod tests {
1621 use super::*;
1622
1623 fn poison_mutex<T: Send + 'static>(m: Arc<Mutex<T>>) {
1625 let m = m.clone();
1626 let _ = std::thread::spawn(move || {
1627 let _guard = m.lock().unwrap();
1628 panic!("intentional panic to poison the mutex");
1629 })
1630 .join(); }
1632
1633 #[test]
1634 fn lock_queue_recovers_inner_vec_after_poison() {
1635 let q: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![1, 2, 3]));
1636 poison_mutex(q.clone());
1637 assert!(
1638 q.is_poisoned(),
1639 "test pre-condition: mutex should be poisoned"
1640 );
1641
1642 let guard = lock_queue(&q);
1644 assert_eq!(*guard, vec![1, 2, 3]);
1645 }
1646
1647 #[test]
1648 fn basic_agent_steer_survives_queue_poison() {
1649 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1650
1651 let q = agent.steering_queue.clone();
1653 let _ = std::thread::spawn(move || {
1654 let _g = q.lock().unwrap();
1655 panic!("poison the steering queue");
1656 })
1657 .join();
1658 assert!(agent.steering_queue.is_poisoned());
1659
1660 agent.steer(AgentMessage::Llm(LlmMessage::new(Message::user("hi"))));
1662 assert_eq!(lock_queue(&agent.steering_queue).len(), 1);
1663 agent.clear_steering_queue();
1664 assert_eq!(lock_queue(&agent.steering_queue).len(), 0);
1665 }
1666
1667 #[test]
1668 fn basic_agent_follow_up_survives_queue_poison() {
1669 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1670
1671 let q = agent.follow_up_queue.clone();
1672 let _ = std::thread::spawn(move || {
1673 let _g = q.lock().unwrap();
1674 panic!("poison the follow-up queue");
1675 })
1676 .join();
1677 assert!(agent.follow_up_queue.is_poisoned());
1678
1679 agent.follow_up(AgentMessage::Llm(LlmMessage::new(Message::user("more"))));
1680 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 1);
1681 agent.clear_follow_up_queue();
1682 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 0);
1683 }
1684
1685 #[test]
1688 fn with_revert_render_policy_sets_config_field() {
1689 let custom = RevertRenderPolicy {
1690 lesson_window_turns: 1,
1691 lesson_window_count: 2,
1692 };
1693 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"))
1694 .with_revert_render_policy(custom);
1695
1696 assert_eq!(agent.revert_render_policy.lesson_window_turns, 1);
1698 assert_eq!(agent.revert_render_policy.lesson_window_count, 2);
1699
1700 let config = agent.build_config().expect("BasicAgent has a model_config");
1702 assert_eq!(config.revert_render_policy.lesson_window_turns, 1);
1703 assert_eq!(config.revert_render_policy.lesson_window_count, 2);
1704 }
1705
1706 #[test]
1707 fn revert_render_policy_defaults_to_phi_core_defaults() {
1708 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1709 let default = RevertRenderPolicy::default();
1710 assert_eq!(
1711 agent.revert_render_policy.lesson_window_turns,
1712 default.lesson_window_turns
1713 );
1714 assert_eq!(
1715 agent.revert_render_policy.lesson_window_count,
1716 default.lesson_window_count
1717 );
1718 }
1719
1720 #[test]
1723 fn current_tool_timeout_is_none_when_no_tool_in_flight() {
1724 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1725 assert!(agent.current_tool_timeout().is_none());
1726 }
1727
1728 #[test]
1729 fn current_tool_timeout_reflects_shared_slot() {
1730 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1734
1735 assert!(agent.current_tool_timeout().is_none());
1737
1738 {
1740 let mut guard = agent.current_tool.lock().unwrap();
1741 *guard = Some(crate::context::CurrentToolExecution {
1742 name: "bash".to_string(),
1743 timeout: Some(std::time::Duration::from_secs(5)),
1744 });
1745 }
1746 assert_eq!(
1747 agent.current_tool_timeout(),
1748 Some(std::time::Duration::from_secs(5))
1749 );
1750
1751 {
1753 let mut guard = agent.current_tool.lock().unwrap();
1754 *guard = Some(crate::context::CurrentToolExecution {
1755 name: "bash".to_string(),
1756 timeout: None,
1757 });
1758 }
1759 assert!(agent.current_tool_timeout().is_none());
1760
1761 {
1763 let mut guard = agent.current_tool.lock().unwrap();
1764 *guard = None;
1765 }
1766 assert!(agent.current_tool_timeout().is_none());
1767 }
1768
1769 #[test]
1770 fn current_tool_slot_is_shared_between_agent_and_config() {
1771 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1774 let config = agent.build_config().expect("BasicAgent has a model_config");
1775 let slot = config
1776 .current_tool
1777 .as_ref()
1778 .expect("BasicAgent installs the slot");
1779
1780 {
1782 let mut guard = slot.lock().unwrap();
1783 *guard = Some(crate::context::CurrentToolExecution {
1784 name: "fs.read_file".to_string(),
1785 timeout: Some(std::time::Duration::from_millis(250)),
1786 });
1787 }
1788
1789 assert_eq!(
1791 agent.current_tool_timeout(),
1792 Some(std::time::Duration::from_millis(250))
1793 );
1794 }
1795}