1use super::agent::{Agent, QueueMode};
8use super::profile::AgentProfile;
9use crate::agent_loop::{
10 agent_loop, agent_loop_continue, AfterCompactionEndFn, AfterLoopFn, AfterToolExecutionFn,
11 AfterToolExecutionUpdateFn, AfterTurnFn, AgentLoopConfig, BeforeCompactionStartFn,
12 BeforeLoopFn, BeforeToolExecutionFn, BeforeToolExecutionUpdateFn, BeforeTurnFn, ConvertToLlmFn,
13 OnErrorFn, TransformContextFn,
14};
15use crate::context::{CompactionStrategy, ContextConfig, ExecutionLimits};
16use crate::mcp::{McpClient, McpError, McpToolAdapter};
17use crate::provider::context_translation::ContextTranslationStrategy;
18use crate::provider::{ModelConfig, StreamProvider};
19use crate::types::*;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25fn lock_queue<T>(q: &Mutex<Vec<T>>) -> std::sync::MutexGuard<'_, Vec<T>> {
32 match q.lock() {
33 Ok(g) => g,
34 Err(poison) => {
35 tracing::warn!(
36 "BasicAgent: queue mutex was poisoned; recovering inner Vec. \
37 A prior hook or tool callback panicked while holding the lock."
38 );
39 poison.into_inner()
40 }
41 }
42}
43
44pub struct BasicAgent {
80 pub system_prompt: String,
82 pub model_config: ModelConfig, pub provider_override: Option<Arc<dyn StreamProvider>>,
86 pub thinking_level: ThinkingLevel,
87 pub max_tokens: Option<u32>, pub temperature: Option<f32>, messages: Vec<AgentMessage>, tools: Vec<Arc<dyn AgentTool>>,
109
110 steering_queue: Arc<Mutex<Vec<AgentMessage>>>,
134 follow_up_queue: Arc<Mutex<Vec<AgentMessage>>>,
135 steering_mode: QueueMode,
136 follow_up_mode: QueueMode,
137
138 pub context_config: Option<ContextConfig>,
140 pub execution_limits: Option<ExecutionLimits>,
141 pub cache_config: CacheConfig,
142 pub tool_execution: ToolExecutionStrategy,
143 pub tool_timeout: Option<std::time::Duration>,
144 pub response_format: crate::provider::ResponseFormat,
145 pub retry_config: crate::provider::retry::RetryConfig,
146
147 before_turn: Option<BeforeTurnFn>,
149 after_turn: Option<AfterTurnFn>,
150 on_error: Option<OnErrorFn>,
151
152 input_filters: Vec<Arc<dyn InputFilter>>,
154
155 before_loop: Option<BeforeLoopFn>,
157 after_loop: Option<AfterLoopFn>,
158 before_tool_execution: Option<BeforeToolExecutionFn>,
159 after_tool_execution: Option<AfterToolExecutionFn>,
160 before_tool_execution_update: Option<BeforeToolExecutionUpdateFn>,
161 after_tool_execution_update: Option<AfterToolExecutionUpdateFn>,
162 convert_to_llm: Option<ConvertToLlmFn>,
163 transform_context: Option<TransformContextFn>,
164 before_compaction_start: Option<BeforeCompactionStartFn>,
165 after_compaction_end: Option<AfterCompactionEndFn>,
166 context_translation: Option<Arc<dyn ContextTranslationStrategy>>,
167 prun_pending: Option<Arc<Mutex<Vec<crate::tools::prun::PrunRequest>>>>,
168 revert_pending: Option<Arc<Mutex<Vec<crate::tools::revert::RevertRequest>>>>,
169
170 config_id: Option<String>,
172 profile: Option<AgentProfile>,
173 workspace: Option<std::path::PathBuf>,
174
175 cancel: Option<CancellationToken>,
177 is_streaming: bool, agent_id: String,
205 session_id: String,
206 loop_counters: HashMap<String, usize>,
207 last_loop_id: Option<String>,
208 last_active_at: Option<chrono::DateTime<chrono::Utc>>,
211 session: Option<crate::session::Session>,
213}
214
215impl BasicAgent {
216 pub fn new(model_config: ModelConfig) -> Self {
217 Self {
218 model_config,
219 provider_override: None,
220 system_prompt: String::new(),
221 thinking_level: ThinkingLevel::Off,
222 max_tokens: None,
223 temperature: None,
224 messages: Vec::new(),
225 tools: Vec::new(),
226 steering_queue: Arc::new(Mutex::new(Vec::new())), follow_up_queue: Arc::new(Mutex::new(Vec::new())),
228 steering_mode: QueueMode::OneAtATime,
229 follow_up_mode: QueueMode::OneAtATime,
230 context_config: Some(ContextConfig::default()), execution_limits: Some(ExecutionLimits::default()), cache_config: CacheConfig::default(),
233 tool_execution: ToolExecutionStrategy::default(), tool_timeout: None,
235 response_format: crate::provider::ResponseFormat::Text,
236 retry_config: crate::provider::retry::RetryConfig::default(), before_turn: None,
238 after_turn: None,
239 on_error: None,
240 input_filters: Vec::new(),
241 before_loop: None,
242 after_loop: None,
243 before_tool_execution: None,
244 after_tool_execution: None,
245 before_tool_execution_update: None,
246 after_tool_execution_update: None,
247 convert_to_llm: None,
248 transform_context: None,
249 before_compaction_start: None,
250 after_compaction_end: None,
251 context_translation: None,
252 prun_pending: None,
253 revert_pending: None,
254 config_id: None,
255 profile: None,
256 workspace: None,
257 cancel: None,
258 is_streaming: false,
259 agent_id: uuid::Uuid::new_v4().to_string(),
260 session_id: uuid::Uuid::new_v4().to_string(),
261 loop_counters: HashMap::new(),
262 last_loop_id: None,
263 last_active_at: None,
264 session: None,
265 }
266 }
267
268 pub fn with_session(mut self, session: crate::session::Session) -> Self {
270 self.session = Some(session);
271 self
272 }
273
274 pub fn take_session(&mut self) -> Option<crate::session::Session> {
276 self.session.take()
277 }
278
279 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
299 self.system_prompt = prompt.into();
300 self
301 }
302
303 pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
304 self.thinking_level = level;
305 self
306 }
307
308 pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
309 self.tools = tools;
310 self
311 }
312
313 pub fn tools(&self) -> &[Arc<dyn AgentTool>] {
318 &self.tools
319 }
320
321 pub fn with_model_config(mut self, config: ModelConfig) -> Self {
322 self.model_config = config;
323 self
324 }
325
326 pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
329 self.provider_override = Some(provider);
330 self
331 }
332
333 pub fn with_max_tokens(mut self, max: u32) -> Self {
334 self.max_tokens = Some(max);
335 self
336 }
337
338 pub fn with_context_config(mut self, config: ContextConfig) -> Self {
339 self.context_config = Some(config);
340 self
341 }
342
343 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
344 self.cache_config = config;
345 self
346 }
347
348 pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
349 self.tool_execution = strategy;
350 self
351 }
352
353 pub fn with_tool_timeout(mut self, timeout: std::time::Duration) -> Self {
355 self.tool_timeout = Some(timeout);
356 self
357 }
358
359 pub fn with_response_format(mut self, format: crate::provider::ResponseFormat) -> Self {
361 self.response_format = format;
362 self
363 }
364
365 pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
366 self.retry_config = config;
367 self
368 }
369
370 pub fn with_skills(mut self, skills: crate::context::skills::SkillSet) -> Self {
376 let prompt_fragment = skills.format_for_prompt();
377 if !prompt_fragment.is_empty() {
378 if self.system_prompt.is_empty() {
379 self.system_prompt = prompt_fragment;
380 } else {
381 self.system_prompt = format!("{}\n\n{}", self.system_prompt, prompt_fragment);
382 }
383 }
384 self
385 }
386
387 pub fn with_execution_limits(mut self, limits: ExecutionLimits) -> Self {
388 self.execution_limits = Some(limits);
389 self
390 }
391
392 pub fn with_messages(mut self, msgs: Vec<AgentMessage>) -> Self {
393 self.messages = msgs;
394 self
395 }
396
397 pub fn on_before_turn(
413 mut self,
414 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
415 ) -> Self {
416 self.before_turn = Some(Arc::new(f));
417 self
418 }
419
420 pub fn on_after_turn(
421 mut self,
422 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
423 ) -> Self {
424 self.after_turn = Some(Arc::new(f));
425 self
426 }
427
428 pub fn on_error(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
429 self.on_error = Some(Arc::new(f));
430 self
431 }
432
433 pub fn with_input_filter(mut self, filter: impl InputFilter + 'static) -> Self {
435 self.input_filters.push(Arc::new(filter));
436 self
437 }
438
439 pub fn with_compaction_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
443 if let Some(ref mut ctx) = self.context_config {
444 ctx.compaction.in_memory_strategy = Some(Arc::new(strategy));
445 }
446 self
447 }
448
449 pub fn with_profile(mut self, profile: AgentProfile) -> Self {
453 if let Some(ref prompt) = profile.system_prompt {
455 if self.system_prompt.is_empty() {
456 self.system_prompt = prompt.clone();
457 }
458 }
459 if let Some(level) = profile.thinking_level {
460 if self.thinking_level == ThinkingLevel::Off {
461 self.thinking_level = level;
462 }
463 }
464 if let Some(temp) = profile.temperature {
465 if self.temperature.is_none() {
466 self.temperature = Some(temp);
467 }
468 }
469 if let Some(max) = profile.max_tokens {
470 if self.max_tokens.is_none() {
471 self.max_tokens = Some(max);
472 }
473 }
474 if let Some(ref id) = profile.config_id {
475 if self.config_id.is_none() {
476 self.config_id = Some(id.clone());
477 }
478 }
479 self.profile = Some(profile);
480 self
481 }
482
483 pub fn with_temperature(mut self, temp: f32) -> Self {
485 self.temperature = Some(temp);
486 self
487 }
488
489 pub fn with_config_id(mut self, id: impl Into<String>) -> Self {
491 self.config_id = Some(id.into());
492 self
493 }
494
495 pub fn with_workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
498 self.workspace = Some(path.into());
499 self
500 }
501
502 pub fn on_before_loop(
504 mut self,
505 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
506 ) -> Self {
507 self.before_loop = Some(Arc::new(f));
508 self
509 }
510
511 pub fn on_after_loop(
513 mut self,
514 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
515 ) -> Self {
516 self.after_loop = Some(Arc::new(f));
517 self
518 }
519
520 pub fn on_before_tool_execution(
522 mut self,
523 f: impl Fn(&str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
524 ) -> Self {
525 self.before_tool_execution = Some(Arc::new(f));
526 self
527 }
528
529 pub fn on_after_tool_execution(
531 mut self,
532 f: impl Fn(&str, &str, bool) + Send + Sync + 'static,
533 ) -> Self {
534 self.after_tool_execution = Some(Arc::new(f));
535 self
536 }
537
538 pub fn on_before_tool_execution_update(
540 mut self,
541 f: impl Fn(&str, &str, &str) -> bool + Send + Sync + 'static,
542 ) -> Self {
543 self.before_tool_execution_update = Some(Arc::new(f));
544 self
545 }
546
547 pub fn on_after_tool_execution_update(
549 mut self,
550 f: impl Fn(&str, &str, &str) + Send + Sync + 'static,
551 ) -> Self {
552 self.after_tool_execution_update = Some(Arc::new(f));
553 self
554 }
555
556 pub fn with_prun_tool(mut self) -> Self {
559 let pending = Arc::new(Mutex::new(Vec::new()));
560 self.tools.push(Arc::new(crate::tools::PrunTool::new(
561 pending.clone(),
562 crate::tools::PrunVariant::Prun,
563 )));
564 self.tools.push(Arc::new(crate::tools::PrunTool::new(
565 pending.clone(),
566 crate::tools::PrunVariant::PrunWithMemo,
567 )));
568 self.prun_pending = Some(pending);
569 self
570 }
571
572 pub fn with_revert_tool(mut self) -> Self {
580 let pending = Arc::new(Mutex::new(Vec::new()));
581 self.tools
582 .push(Arc::new(crate::tools::RevertTool::new(pending.clone())));
583 self.revert_pending = Some(pending);
584 self
585 }
586
587 pub fn with_convert_to_llm(
589 mut self,
590 f: impl Fn(&[AgentMessage]) -> Vec<Message> + Send + Sync + 'static,
591 ) -> Self {
592 self.convert_to_llm = Some(Arc::new(f));
593 self
594 }
595
596 pub fn with_transform_context(
598 mut self,
599 f: impl Fn(Vec<AgentMessage>) -> Vec<AgentMessage> + Send + Sync + 'static,
600 ) -> Self {
601 self.transform_context = Some(Arc::new(f));
602 self
603 }
604
605 pub fn with_block_compaction_strategy(
608 mut self,
609 strategy: impl crate::context::BlockCompactionStrategy + 'static,
610 ) -> Self {
611 if let Some(ref mut ctx) = self.context_config {
612 ctx.compaction.block_strategy = Some(Arc::new(strategy));
613 }
614 self
615 }
616
617 pub fn on_before_compaction_start(
619 mut self,
620 f: impl Fn(usize, usize) -> bool + Send + Sync + 'static,
621 ) -> Self {
622 self.before_compaction_start = Some(Arc::new(f));
623 self
624 }
625
626 pub fn on_after_compaction_end(
628 mut self,
629 f: impl Fn(usize, usize, usize, usize) + Send + Sync + 'static,
630 ) -> Self {
631 self.after_compaction_end = Some(Arc::new(f));
632 self
633 }
634
635 pub fn with_context_translation(
637 mut self,
638 strategy: Arc<dyn ContextTranslationStrategy>,
639 ) -> Self {
640 self.context_translation = Some(strategy);
641 self
642 }
643
644 pub fn with_sub_agent(mut self, sub: crate::agents::SubAgentTool) -> Self {
646 self.tools.push(Arc::new(sub));
647 self
648 }
649
650 pub fn without_context_management(mut self) -> Self {
652 self.context_config = None;
653 self.execution_limits = None;
654 self
655 }
656
657 #[cfg(feature = "openapi")]
661 pub async fn with_openapi_file(
662 mut self,
663 path: impl AsRef<std::path::Path>,
664 config: crate::openapi::OpenApiConfig,
665 filter: &crate::openapi::OperationFilter,
666 ) -> Result<Self, crate::openapi::OpenApiError> {
667 let adapters = crate::openapi::OpenApiToolAdapter::from_file(path, config, filter).await?;
668 for adapter in adapters {
669 self.tools.push(Arc::new(adapter));
670 }
671 Ok(self)
672 }
673
674 #[cfg(feature = "openapi")]
676 pub async fn with_openapi_url(
677 mut self,
678 url: &str,
679 config: crate::openapi::OpenApiConfig,
680 filter: &crate::openapi::OperationFilter,
681 ) -> Result<Self, crate::openapi::OpenApiError> {
682 let adapters = crate::openapi::OpenApiToolAdapter::from_url(url, config, filter).await?;
683 for adapter in adapters {
684 self.tools.push(Arc::new(adapter));
685 }
686 Ok(self)
687 }
688
689 #[cfg(feature = "openapi")]
691 pub fn with_openapi_spec(
692 mut self,
693 spec_str: &str,
694 config: crate::openapi::OpenApiConfig,
695 filter: &crate::openapi::OperationFilter,
696 ) -> Result<Self, crate::openapi::OpenApiError> {
697 let adapters = crate::openapi::OpenApiToolAdapter::from_str(spec_str, config, filter)?;
698 for adapter in adapters {
699 self.tools.push(Arc::new(adapter));
700 }
701 Ok(self)
702 }
703
704 pub async fn with_mcp_server_stdio(
708 mut self,
709 command: &str, args: &[&str], env: Option<HashMap<String, String>>, ) -> Result<Self, McpError> {
713 let client = McpClient::connect_stdio(command, args, env).await?;
714 let client = Arc::new(tokio::sync::Mutex::new(client));
715 let adapters = McpToolAdapter::from_client(client).await?;
716 for adapter in adapters {
717 self.tools.push(Arc::new(adapter));
718 }
719 Ok(self)
720 }
721
722 pub async fn with_mcp_server_http(mut self, url: &str) -> Result<Self, McpError> {
724 let client = McpClient::connect_http(url).await?;
725 let client = Arc::new(tokio::sync::Mutex::new(client));
726 let adapters = McpToolAdapter::from_client(client).await?;
727 for adapter in adapters {
728 self.tools.push(Arc::new(adapter));
729 }
730 Ok(self)
731 }
732
733 pub async fn prompt(&mut self, text: impl Into<String>) -> mpsc::UnboundedReceiver<AgentEvent> {
744 let (tx, rx) = mpsc::unbounded_channel();
745 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
746 self.prompt_messages_with_sender(vec![msg], tx).await;
747 rx
748 }
749
750 pub async fn prompt_with_sender(
754 &mut self,
755 text: impl Into<String>,
756 tx: mpsc::UnboundedSender<AgentEvent>,
757 ) {
758 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
759 self.prompt_messages_with_sender(vec![msg], tx).await;
760 }
761
762 fn next_loop_id(&mut self, config: &AgentLoopConfig) -> String {
781 let effective_config_id = if let Some(ref id) = config.config_id {
782 id.clone()
783 } else {
784 let slugify = |s: &str| -> String {
785 s.chars()
786 .map(|c| {
787 if c.is_alphanumeric() || c == '-' {
788 c
789 } else {
790 '-'
791 }
792 })
793 .collect()
794 };
795 let thinking_part = if config.thinking_level != ThinkingLevel::Off {
796 ".thinking"
797 } else {
798 ""
799 };
800 format!(
801 "{}.{}{}",
802 config.model_config.provider,
803 slugify(&config.model_config.id),
804 thinking_part
805 )
806 };
807 let thread_key = format!("{}.{}", self.session_id, effective_config_id);
808 let n = self.loop_counters.entry(thread_key.clone()).or_insert(0);
809 *n += 1;
810 format!("{}.{}", thread_key, n)
811 }
812
813 pub fn compact_context_with_sender(
850 &mut self,
851 tx: &tokio::sync::mpsc::UnboundedSender<AgentEvent>,
852 ) {
853 let (Some(session), Some(ctx_config)) =
854 (self.session.as_mut(), self.context_config.as_ref())
855 else {
856 return; };
858 let comp = &ctx_config.compaction;
859 let max_tokens = ctx_config.max_context_tokens;
860
861 let loop_id = self
862 .last_loop_id
863 .clone()
864 .unwrap_or_else(|| "compaction".to_string());
865
866 let _ = tx.send(AgentEvent::AgentStart {
867 agent_id: self.agent_id.clone(),
868 session_id: self.session_id.clone(),
869 loop_id: loop_id.clone(),
870 parent_loop_id: self.last_loop_id.clone(),
871 continuation_kind: ContinuationKind::Compaction,
872 timestamp: chrono::Utc::now(),
873 metadata: None,
874 config_snapshot: None, });
876
877 let msgs_before = self.messages.len();
878 let tokens_before = crate::context::total_tokens(&self.messages);
879
880 let _ = tx.send(AgentEvent::CompactionStarted {
881 loop_id: loop_id.clone(),
882 estimated_tokens: tokens_before,
883 message_count: msgs_before,
884 timestamp: chrono::Utc::now(),
885 });
886
887 let strategy: &dyn crate::context::BlockCompactionStrategy =
888 &crate::context::DefaultBlockCompaction;
889 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
890
891 if let Some(record) = session.get_loop_mut(current_lid) {
893 record.messages = self.messages.clone();
894 }
895
896 crate::context::compact_session_loops(
897 session,
898 current_lid,
899 strategy,
900 comp,
901 max_tokens,
902 None,
903 );
904 self.messages = crate::context::build_context_from_session(
905 session,
906 current_lid,
907 comp,
908 max_tokens,
909 None,
910 );
911
912 let msgs_after = self.messages.len();
913 let tokens_after = crate::context::total_tokens(&self.messages);
914 let chain = session.loop_chain_to(current_lid);
915 let loops_compacted = chain
916 .iter()
917 .filter(|lid| {
918 session
919 .get_loop(lid)
920 .map(|r| r.compaction_block.is_some())
921 .unwrap_or(false)
922 })
923 .count();
924
925 let _ = tx.send(AgentEvent::CompactionEnded {
926 loop_id: loop_id.clone(),
927 messages_before: msgs_before,
928 messages_after: msgs_after,
929 estimated_tokens_before: tokens_before,
930 estimated_tokens_after: tokens_after,
931 loops_compacted,
932 timestamp: chrono::Utc::now(),
933 });
934
935 let _ = tx.send(AgentEvent::AgentEnd {
936 loop_id,
937 messages: vec![],
938 usage: Usage::default(),
939 timestamp: chrono::Utc::now(),
940 rejection: None,
941 });
942 }
943
944 pub fn compact_context(&mut self) -> usize {
950 let (Some(session), Some(ctx_config)) =
951 (self.session.as_mut(), self.context_config.as_ref())
952 else {
953 return 0; };
955 let comp = &ctx_config.compaction;
956 let max_tokens = ctx_config.max_context_tokens;
957
958 let strategy: &dyn crate::context::BlockCompactionStrategy =
959 &crate::context::DefaultBlockCompaction;
960 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
961
962 if let Some(record) = session.get_loop_mut(current_lid) {
963 record.messages = self.messages.clone();
964 }
965
966 crate::context::compact_session_loops(
967 session,
968 current_lid,
969 strategy,
970 comp,
971 max_tokens,
972 None,
973 );
974 self.messages = crate::context::build_context_from_session(
975 session,
976 current_lid,
977 comp,
978 max_tokens,
979 None,
980 );
981
982 let chain = session.loop_chain_to(current_lid);
983 chain
984 .iter()
985 .filter(|lid| {
986 session
987 .get_loop(lid)
988 .map(|r| r.compaction_block.is_some())
989 .unwrap_or(false)
990 })
991 .count()
992 }
993
994 pub fn build_config(&self) -> Result<AgentLoopConfig, super::agent::AgentBuildError> {
997 let steering_queue = self.steering_queue.clone(); let steering_mode = self.steering_mode; let follow_up_queue = self.follow_up_queue.clone();
1002 let follow_up_mode = self.follow_up_mode;
1003
1004 Ok(AgentLoopConfig {
1009 model_config: self.model_config.clone(),
1010 provider_override: self.provider_override.clone(),
1011 thinking_level: self.thinking_level,
1012 max_tokens: self.max_tokens,
1013 temperature: self.temperature,
1014 convert_to_llm: self.convert_to_llm.clone(),
1015 transform_context: self.transform_context.clone(),
1016 get_steering_messages: Some(Box::new(move || {
1017 let mut queue = lock_queue(&steering_queue); match steering_mode {
1021 QueueMode::OneAtATime => {
1022 if queue.is_empty() {
1023 vec![]
1024 } else {
1025 vec![queue.remove(0)] }
1027 }
1028 QueueMode::All => queue.drain(..).collect(), }
1030 })),
1031 context_config: self.context_config.clone(),
1032 execution_limits: self.execution_limits.clone(),
1033 cache_config: self.cache_config.clone(),
1034 tool_execution: self.tool_execution.clone(),
1035 tool_timeout: self.tool_timeout,
1036 response_format: self.response_format.clone(),
1037 retry_config: self.retry_config.clone(),
1038 get_follow_up_messages: Some(Box::new(move || {
1039 let mut queue = lock_queue(&follow_up_queue);
1040 match follow_up_mode {
1041 QueueMode::OneAtATime => {
1042 if queue.is_empty() {
1043 vec![]
1044 } else {
1045 vec![queue.remove(0)]
1046 }
1047 }
1048 QueueMode::All => queue.drain(..).collect(),
1049 }
1050 })),
1051 before_turn: self.before_turn.clone(),
1052 after_turn: self.after_turn.clone(),
1053 before_loop: self.before_loop.clone(),
1054 after_loop: self.after_loop.clone(),
1055 before_tool_execution: self.before_tool_execution.clone(),
1056 after_tool_execution: self.after_tool_execution.clone(),
1057 before_tool_execution_update: self.before_tool_execution_update.clone(),
1058 after_tool_execution_update: self.after_tool_execution_update.clone(),
1059 before_compaction_start: self.before_compaction_start.clone(),
1060 after_compaction_end: self.after_compaction_end.clone(),
1061 on_error: self.on_error.clone(),
1062 input_filters: self.input_filters.clone(),
1063 first_turn_trigger: TurnTrigger::User,
1064 config_id: self.config_id.clone(),
1065 context_translation: self.context_translation.clone(),
1066 prun_pending: self.prun_pending.clone(),
1067 revert_pending: self.revert_pending.clone(),
1068 })
1069 }
1070
1071 pub fn new_session(&mut self) -> String {
1080 self.session_id = uuid::Uuid::new_v4().to_string();
1081 self.loop_counters.clear();
1082 self.last_loop_id = None;
1083 self.last_active_at = None;
1087 self.session_id.clone()
1088 }
1089
1090 pub fn check_and_rotate(&mut self, threshold: std::time::Duration) -> Option<String> {
1098 let last = self.last_active_at?;
1099 let elapsed = (chrono::Utc::now() - last)
1100 .to_std()
1101 .unwrap_or(std::time::Duration::ZERO);
1102 if elapsed > threshold {
1103 Some(self.new_session())
1104 } else {
1105 None
1106 }
1107 }
1108}
1109
1110#[async_trait::async_trait]
1113impl Agent for BasicAgent {
1114 async fn prompt_messages_with_sender(
1118 &mut self,
1119 messages: Vec<AgentMessage>,
1120 tx: mpsc::UnboundedSender<AgentEvent>,
1121 ) {
1122 assert!(
1134 !self.is_streaming,
1135 "Agent is already streaming. Use steer() or follow_up()."
1136 );
1137
1138 self.last_active_at = Some(chrono::Utc::now());
1139 let cancel = CancellationToken::new();
1140 self.cancel = Some(cancel.clone()); self.is_streaming = true;
1142
1143 let config = self
1168 .build_config()
1169 .expect("BasicAgent always provides a model_config");
1170 let loop_id = self.next_loop_id(&config);
1171 self.last_loop_id = Some(loop_id.clone());
1172
1173 let mut context = AgentContext {
1174 system_prompt: self.system_prompt.clone(),
1175 messages: self.messages.clone(),
1176 tools: std::mem::take(&mut self.tools), agent_id: Some(self.agent_id.clone()),
1178 session_id: Some(self.session_id.clone()),
1179 loop_id: Some(loop_id),
1180 parent_loop_id: None, continuation_kind: None,
1182 session: self.session.take(), user_context: Vec::new(),
1184 inrun_context: Vec::new(),
1185 active_node_id: None,
1186 next_node_id: 0,
1187 };
1188
1189 let _new_messages = agent_loop(messages, &mut context, &config, tx, cancel).await;
1190
1191 self.tools = context.tools;
1192 self.messages = context.messages;
1193 self.session = context.session; self.is_streaming = false;
1195 self.cancel = None;
1196 }
1197
1198 async fn continue_loop_with_sender(
1206 &mut self,
1207 tx: mpsc::UnboundedSender<AgentEvent>, kind: ContinuationKind, ) {
1210 assert!(!self.is_streaming, "Agent is already streaming.");
1211 assert!(!self.messages.is_empty(), "No messages to continue from.");
1212
1213 let cancel = CancellationToken::new();
1214 self.cancel = Some(cancel.clone());
1215 self.is_streaming = true;
1216
1217 let config = self
1220 .build_config()
1221 .expect("BasicAgent always provides a model_config");
1222 let loop_id = self.next_loop_id(&config);
1223 let parent_loop_id = self.last_loop_id.clone(); self.last_loop_id = Some(loop_id.clone());
1225
1226 let tag = chrono::Utc::now().to_rfc3339();
1228 let kind_with_tag = match kind {
1229 ContinuationKind::Initial => ContinuationKind::Default, ContinuationKind::Default => ContinuationKind::Default,
1231 ContinuationKind::Rerun { .. } => ContinuationKind::Rerun { tag },
1232 ContinuationKind::Branch { .. } => ContinuationKind::Branch { tag },
1233 ContinuationKind::Compaction => ContinuationKind::Compaction,
1234 };
1235
1236 let mut context = AgentContext {
1238 system_prompt: self.system_prompt.clone(),
1239 messages: self.messages.clone(),
1240 tools: std::mem::take(&mut self.tools),
1241 agent_id: Some(self.agent_id.clone()),
1242 session_id: Some(self.session_id.clone()),
1243 loop_id: Some(loop_id),
1244 parent_loop_id,
1245 continuation_kind: Some(kind_with_tag),
1246 session: self.session.take(),
1247 user_context: Vec::new(),
1248 inrun_context: Vec::new(),
1249 active_node_id: None,
1250 next_node_id: 0,
1251 };
1252
1253 let _new_messages = agent_loop_continue(&mut context, &config, tx, cancel).await;
1254
1255 self.tools = context.tools;
1256 self.messages = context.messages;
1257 self.session = context.session;
1258 self.is_streaming = false;
1259 self.cancel = None;
1260 }
1261
1262 fn messages(&self) -> &[AgentMessage] {
1265 &self.messages
1266 }
1267
1268 fn is_streaming(&self) -> bool {
1269 self.is_streaming
1270 }
1271
1272 fn agent_id(&self) -> &str {
1273 &self.agent_id
1274 }
1275
1276 fn session_id(&self) -> &str {
1277 &self.session_id
1278 }
1279
1280 fn last_loop_id(&self) -> Option<&str> {
1281 self.last_loop_id.as_deref()
1282 }
1283
1284 fn clear_messages(&mut self) {
1287 self.messages.clear();
1288 }
1289
1290 fn append_message(&mut self, msg: AgentMessage) {
1291 self.messages.push(msg);
1292 }
1293
1294 fn replace_messages(&mut self, msgs: Vec<AgentMessage>) {
1295 self.messages = msgs;
1296 }
1297
1298 fn save_messages(&self) -> Result<String, serde_json::Error> {
1299 serde_json::to_string(&self.messages)
1300 }
1301
1302 fn restore_messages(&mut self, json: &str) -> Result<(), serde_json::Error> {
1303 let msgs: Vec<AgentMessage> = serde_json::from_str(json)?;
1304 self.messages = msgs;
1305 Ok(())
1306 }
1307
1308 fn set_tools(&mut self, tools: Vec<Arc<dyn AgentTool>>) {
1309 self.tools = tools;
1310 }
1311
1312 fn abort(&self) {
1315 if let Some(ref cancel) = self.cancel {
1316 cancel.cancel();
1317 }
1318 }
1319
1320 fn reset(&mut self) {
1321 self.messages.clear();
1322 self.clear_all_queues();
1323 self.is_streaming = false;
1324 self.cancel = None;
1325 }
1326
1327 fn steer(&self, msg: AgentMessage) {
1348 lock_queue(&self.steering_queue).push(msg);
1349 }
1350
1351 fn follow_up(&self, msg: AgentMessage) {
1352 lock_queue(&self.follow_up_queue).push(msg);
1353 }
1354
1355 fn clear_steering_queue(&self) {
1356 lock_queue(&self.steering_queue).clear();
1357 }
1358
1359 fn clear_follow_up_queue(&self) {
1360 lock_queue(&self.follow_up_queue).clear();
1361 }
1362
1363 fn set_steering_mode(&mut self, mode: QueueMode) {
1364 self.steering_mode = mode;
1365 }
1366
1367 fn set_follow_up_mode(&mut self, mode: QueueMode) {
1368 self.follow_up_mode = mode;
1369 }
1370
1371 fn profile(&self) -> Option<&AgentProfile> {
1374 self.profile.as_ref()
1375 }
1376
1377 fn system_prompt(&self) -> &str {
1378 &self.system_prompt
1379 }
1380
1381 fn model_config(&self) -> Option<&ModelConfig> {
1382 Some(&self.model_config)
1383 }
1384
1385 fn thinking_level(&self) -> ThinkingLevel {
1386 self.thinking_level
1387 }
1388
1389 fn temperature(&self) -> Option<f32> {
1390 self.temperature
1391 }
1392
1393 fn max_tokens(&self) -> Option<u32> {
1394 self.max_tokens
1395 }
1396
1397 fn context_config(&self) -> Option<&ContextConfig> {
1398 self.context_config.as_ref()
1399 }
1400
1401 fn execution_limits(&self) -> Option<&ExecutionLimits> {
1402 self.execution_limits.as_ref()
1403 }
1404
1405 fn cache_config(&self) -> CacheConfig {
1406 self.cache_config.clone()
1407 }
1408
1409 fn tool_execution(&self) -> ToolExecutionStrategy {
1410 self.tool_execution.clone()
1411 }
1412
1413 fn tool_timeout(&self) -> Option<std::time::Duration> {
1414 self.tool_timeout
1415 }
1416
1417 fn response_format(&self) -> crate::provider::ResponseFormat {
1418 self.response_format.clone()
1419 }
1420
1421 fn retry_config(&self) -> crate::provider::retry::RetryConfig {
1422 self.retry_config.clone()
1423 }
1424
1425 fn session(&self) -> Option<&crate::session::Session> {
1428 self.session.as_ref()
1429 }
1430
1431 fn workspace(&self) -> Option<&std::path::Path> {
1432 self.workspace.as_deref()
1433 }
1434
1435 fn set_before_turn(&mut self, f: Option<BeforeTurnFn>) {
1438 self.before_turn = f;
1439 }
1440
1441 fn set_after_turn(&mut self, f: Option<AfterTurnFn>) {
1442 self.after_turn = f;
1443 }
1444
1445 fn set_before_loop(&mut self, f: Option<BeforeLoopFn>) {
1446 self.before_loop = f;
1447 }
1448
1449 fn set_after_loop(&mut self, f: Option<AfterLoopFn>) {
1450 self.after_loop = f;
1451 }
1452
1453 fn set_before_tool_execution(&mut self, f: Option<BeforeToolExecutionFn>) {
1454 self.before_tool_execution = f;
1455 }
1456
1457 fn set_after_tool_execution(&mut self, f: Option<AfterToolExecutionFn>) {
1458 self.after_tool_execution = f;
1459 }
1460
1461 fn set_before_tool_execution_update(&mut self, f: Option<BeforeToolExecutionUpdateFn>) {
1462 self.before_tool_execution_update = f;
1463 }
1464
1465 fn set_after_tool_execution_update(&mut self, f: Option<AfterToolExecutionUpdateFn>) {
1466 self.after_tool_execution_update = f;
1467 }
1468
1469 fn set_convert_to_llm(&mut self, f: Option<ConvertToLlmFn>) {
1470 self.convert_to_llm = f;
1471 }
1472
1473 fn set_transform_context(&mut self, f: Option<TransformContextFn>) {
1474 self.transform_context = f;
1475 }
1476
1477 fn set_block_compaction_strategy(
1478 &mut self,
1479 s: Option<Arc<dyn crate::context::BlockCompactionStrategy>>,
1480 ) {
1481 if let Some(ref mut ctx) = self.context_config {
1483 ctx.compaction.block_strategy = s;
1484 }
1485 }
1486
1487 fn set_before_compaction_start(&mut self, f: Option<BeforeCompactionStartFn>) {
1488 self.before_compaction_start = f;
1489 }
1490
1491 fn set_after_compaction_end(&mut self, f: Option<AfterCompactionEndFn>) {
1492 self.after_compaction_end = f;
1493 }
1494
1495 fn set_context_translation(&mut self, s: Option<Arc<dyn ContextTranslationStrategy>>) {
1496 self.context_translation = s;
1497 }
1498
1499 fn context_translation(&self) -> Option<Arc<dyn ContextTranslationStrategy>> {
1500 self.context_translation.clone()
1501 }
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506 use super::*;
1507
1508 fn poison_mutex<T: Send + 'static>(m: Arc<Mutex<T>>) {
1510 let m = m.clone();
1511 let _ = std::thread::spawn(move || {
1512 let _guard = m.lock().unwrap();
1513 panic!("intentional panic to poison the mutex");
1514 })
1515 .join(); }
1517
1518 #[test]
1519 fn lock_queue_recovers_inner_vec_after_poison() {
1520 let q: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![1, 2, 3]));
1521 poison_mutex(q.clone());
1522 assert!(
1523 q.is_poisoned(),
1524 "test pre-condition: mutex should be poisoned"
1525 );
1526
1527 let guard = lock_queue(&q);
1529 assert_eq!(*guard, vec![1, 2, 3]);
1530 }
1531
1532 #[test]
1533 fn basic_agent_steer_survives_queue_poison() {
1534 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1535
1536 let q = agent.steering_queue.clone();
1538 let _ = std::thread::spawn(move || {
1539 let _g = q.lock().unwrap();
1540 panic!("poison the steering queue");
1541 })
1542 .join();
1543 assert!(agent.steering_queue.is_poisoned());
1544
1545 agent.steer(AgentMessage::Llm(LlmMessage::new(Message::user("hi"))));
1547 assert_eq!(lock_queue(&agent.steering_queue).len(), 1);
1548 agent.clear_steering_queue();
1549 assert_eq!(lock_queue(&agent.steering_queue).len(), 0);
1550 }
1551
1552 #[test]
1553 fn basic_agent_follow_up_survives_queue_poison() {
1554 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1555
1556 let q = agent.follow_up_queue.clone();
1557 let _ = std::thread::spawn(move || {
1558 let _g = q.lock().unwrap();
1559 panic!("poison the follow-up queue");
1560 })
1561 .join();
1562 assert!(agent.follow_up_queue.is_poisoned());
1563
1564 agent.follow_up(AgentMessage::Llm(LlmMessage::new(Message::user("more"))));
1565 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 1);
1566 agent.clear_follow_up_queue();
1567 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 0);
1568 }
1569}