1use super::agent::{Agent, QueueMode};
8use super::profile::AgentProfile;
9use crate::agent_loop::{
10 agent_loop, agent_loop_continue, AfterCompactionEndFn, AfterLoopFn, AfterToolExecutionFn,
11 AfterToolExecutionUpdateFn, AfterTurnFn, AgentLoopConfig, BeforeCompactionStartFn,
12 BeforeLoopFn, BeforeToolExecutionFn, BeforeToolExecutionUpdateFn, BeforeTurnFn, ConvertToLlmFn,
13 OnErrorFn, TransformContextFn,
14};
15use crate::context::{CompactionStrategy, ContextConfig, ExecutionLimits};
16use crate::mcp::{McpClient, McpError, McpToolAdapter};
17use crate::provider::context_translation::ContextTranslationStrategy;
18use crate::provider::{ModelConfig, StreamProvider};
19use crate::types::*;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25fn lock_queue<T>(q: &Mutex<Vec<T>>) -> std::sync::MutexGuard<'_, Vec<T>> {
32 match q.lock() {
33 Ok(g) => g,
34 Err(poison) => {
35 tracing::warn!(
36 "BasicAgent: queue mutex was poisoned; recovering inner Vec. \
37 A prior hook or tool callback panicked while holding the lock."
38 );
39 poison.into_inner()
40 }
41 }
42}
43
44pub struct BasicAgent {
80 pub system_prompt: String,
82 pub model_config: ModelConfig, pub provider_override: Option<Arc<dyn StreamProvider>>,
86 pub thinking_level: ThinkingLevel,
87 pub max_tokens: Option<u32>, pub temperature: Option<f32>, messages: Vec<AgentMessage>, tools: Vec<Arc<dyn AgentTool>>,
109
110 steering_queue: Arc<Mutex<Vec<AgentMessage>>>,
134 follow_up_queue: Arc<Mutex<Vec<AgentMessage>>>,
135 steering_mode: QueueMode,
136 follow_up_mode: QueueMode,
137
138 pub context_config: Option<ContextConfig>,
140 pub execution_limits: Option<ExecutionLimits>,
141 pub cache_config: CacheConfig,
142 pub tool_execution: ToolExecutionStrategy,
143 pub tool_timeout: Option<std::time::Duration>,
144 pub response_format: crate::provider::ResponseFormat,
145 pub retry_config: crate::provider::retry::RetryConfig,
146
147 before_turn: Option<BeforeTurnFn>,
149 after_turn: Option<AfterTurnFn>,
150 on_error: Option<OnErrorFn>,
151
152 input_filters: Vec<Arc<dyn InputFilter>>,
154
155 before_loop: Option<BeforeLoopFn>,
157 after_loop: Option<AfterLoopFn>,
158 before_tool_execution: Option<BeforeToolExecutionFn>,
159 after_tool_execution: Option<AfterToolExecutionFn>,
160 before_tool_execution_update: Option<BeforeToolExecutionUpdateFn>,
161 after_tool_execution_update: Option<AfterToolExecutionUpdateFn>,
162 convert_to_llm: Option<ConvertToLlmFn>,
163 transform_context: Option<TransformContextFn>,
164 before_compaction_start: Option<BeforeCompactionStartFn>,
165 after_compaction_end: Option<AfterCompactionEndFn>,
166 context_translation: Option<Arc<dyn ContextTranslationStrategy>>,
167 prun_pending: Option<Arc<Mutex<Vec<crate::tools::prun::PrunRequest>>>>,
168
169 config_id: Option<String>,
171 profile: Option<AgentProfile>,
172 workspace: Option<std::path::PathBuf>,
173
174 cancel: Option<CancellationToken>,
176 is_streaming: bool, agent_id: String,
204 session_id: String,
205 loop_counters: HashMap<String, usize>,
206 last_loop_id: Option<String>,
207 last_active_at: Option<chrono::DateTime<chrono::Utc>>,
210 session: Option<crate::session::Session>,
212}
213
214impl BasicAgent {
215 pub fn new(model_config: ModelConfig) -> Self {
216 Self {
217 model_config,
218 provider_override: None,
219 system_prompt: String::new(),
220 thinking_level: ThinkingLevel::Off,
221 max_tokens: None,
222 temperature: None,
223 messages: Vec::new(),
224 tools: Vec::new(),
225 steering_queue: Arc::new(Mutex::new(Vec::new())), follow_up_queue: Arc::new(Mutex::new(Vec::new())),
227 steering_mode: QueueMode::OneAtATime,
228 follow_up_mode: QueueMode::OneAtATime,
229 context_config: Some(ContextConfig::default()), execution_limits: Some(ExecutionLimits::default()), cache_config: CacheConfig::default(),
232 tool_execution: ToolExecutionStrategy::default(), tool_timeout: None,
234 response_format: crate::provider::ResponseFormat::Text,
235 retry_config: crate::provider::retry::RetryConfig::default(), before_turn: None,
237 after_turn: None,
238 on_error: None,
239 input_filters: Vec::new(),
240 before_loop: None,
241 after_loop: None,
242 before_tool_execution: None,
243 after_tool_execution: None,
244 before_tool_execution_update: None,
245 after_tool_execution_update: None,
246 convert_to_llm: None,
247 transform_context: None,
248 before_compaction_start: None,
249 after_compaction_end: None,
250 context_translation: None,
251 prun_pending: None,
252 config_id: None,
253 profile: None,
254 workspace: None,
255 cancel: None,
256 is_streaming: false,
257 agent_id: uuid::Uuid::new_v4().to_string(),
258 session_id: uuid::Uuid::new_v4().to_string(),
259 loop_counters: HashMap::new(),
260 last_loop_id: None,
261 last_active_at: None,
262 session: None,
263 }
264 }
265
266 pub fn with_session(mut self, session: crate::session::Session) -> Self {
268 self.session = Some(session);
269 self
270 }
271
272 pub fn take_session(&mut self) -> Option<crate::session::Session> {
274 self.session.take()
275 }
276
277 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
297 self.system_prompt = prompt.into();
298 self
299 }
300
301 pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
302 self.thinking_level = level;
303 self
304 }
305
306 pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
307 self.tools = tools;
308 self
309 }
310
311 pub fn with_model_config(mut self, config: ModelConfig) -> Self {
312 self.model_config = config;
313 self
314 }
315
316 pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
319 self.provider_override = Some(provider);
320 self
321 }
322
323 pub fn with_max_tokens(mut self, max: u32) -> Self {
324 self.max_tokens = Some(max);
325 self
326 }
327
328 pub fn with_context_config(mut self, config: ContextConfig) -> Self {
329 self.context_config = Some(config);
330 self
331 }
332
333 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
334 self.cache_config = config;
335 self
336 }
337
338 pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
339 self.tool_execution = strategy;
340 self
341 }
342
343 pub fn with_tool_timeout(mut self, timeout: std::time::Duration) -> Self {
345 self.tool_timeout = Some(timeout);
346 self
347 }
348
349 pub fn with_response_format(mut self, format: crate::provider::ResponseFormat) -> Self {
351 self.response_format = format;
352 self
353 }
354
355 pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
356 self.retry_config = config;
357 self
358 }
359
360 pub fn with_skills(mut self, skills: crate::context::skills::SkillSet) -> Self {
366 let prompt_fragment = skills.format_for_prompt();
367 if !prompt_fragment.is_empty() {
368 if self.system_prompt.is_empty() {
369 self.system_prompt = prompt_fragment;
370 } else {
371 self.system_prompt = format!("{}\n\n{}", self.system_prompt, prompt_fragment);
372 }
373 }
374 self
375 }
376
377 pub fn with_execution_limits(mut self, limits: ExecutionLimits) -> Self {
378 self.execution_limits = Some(limits);
379 self
380 }
381
382 pub fn with_messages(mut self, msgs: Vec<AgentMessage>) -> Self {
383 self.messages = msgs;
384 self
385 }
386
387 pub fn on_before_turn(
403 mut self,
404 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
405 ) -> Self {
406 self.before_turn = Some(Arc::new(f));
407 self
408 }
409
410 pub fn on_after_turn(
411 mut self,
412 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
413 ) -> Self {
414 self.after_turn = Some(Arc::new(f));
415 self
416 }
417
418 pub fn on_error(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
419 self.on_error = Some(Arc::new(f));
420 self
421 }
422
423 pub fn with_input_filter(mut self, filter: impl InputFilter + 'static) -> Self {
425 self.input_filters.push(Arc::new(filter));
426 self
427 }
428
429 pub fn with_compaction_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
433 if let Some(ref mut ctx) = self.context_config {
434 ctx.compaction.in_memory_strategy = Some(Arc::new(strategy));
435 }
436 self
437 }
438
439 pub fn with_profile(mut self, profile: AgentProfile) -> Self {
443 if let Some(ref prompt) = profile.system_prompt {
445 if self.system_prompt.is_empty() {
446 self.system_prompt = prompt.clone();
447 }
448 }
449 if let Some(level) = profile.thinking_level {
450 if self.thinking_level == ThinkingLevel::Off {
451 self.thinking_level = level;
452 }
453 }
454 if let Some(temp) = profile.temperature {
455 if self.temperature.is_none() {
456 self.temperature = Some(temp);
457 }
458 }
459 if let Some(max) = profile.max_tokens {
460 if self.max_tokens.is_none() {
461 self.max_tokens = Some(max);
462 }
463 }
464 if let Some(ref id) = profile.config_id {
465 if self.config_id.is_none() {
466 self.config_id = Some(id.clone());
467 }
468 }
469 self.profile = Some(profile);
470 self
471 }
472
473 pub fn with_temperature(mut self, temp: f32) -> Self {
475 self.temperature = Some(temp);
476 self
477 }
478
479 pub fn with_config_id(mut self, id: impl Into<String>) -> Self {
481 self.config_id = Some(id.into());
482 self
483 }
484
485 pub fn with_workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
488 self.workspace = Some(path.into());
489 self
490 }
491
492 pub fn on_before_loop(
494 mut self,
495 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
496 ) -> Self {
497 self.before_loop = Some(Arc::new(f));
498 self
499 }
500
501 pub fn on_after_loop(
503 mut self,
504 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
505 ) -> Self {
506 self.after_loop = Some(Arc::new(f));
507 self
508 }
509
510 pub fn on_before_tool_execution(
512 mut self,
513 f: impl Fn(&str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
514 ) -> Self {
515 self.before_tool_execution = Some(Arc::new(f));
516 self
517 }
518
519 pub fn on_after_tool_execution(
521 mut self,
522 f: impl Fn(&str, &str, bool) + Send + Sync + 'static,
523 ) -> Self {
524 self.after_tool_execution = Some(Arc::new(f));
525 self
526 }
527
528 pub fn on_before_tool_execution_update(
530 mut self,
531 f: impl Fn(&str, &str, &str) -> bool + Send + Sync + 'static,
532 ) -> Self {
533 self.before_tool_execution_update = Some(Arc::new(f));
534 self
535 }
536
537 pub fn on_after_tool_execution_update(
539 mut self,
540 f: impl Fn(&str, &str, &str) + Send + Sync + 'static,
541 ) -> Self {
542 self.after_tool_execution_update = Some(Arc::new(f));
543 self
544 }
545
546 pub fn with_prun_tool(mut self) -> Self {
549 let pending = Arc::new(Mutex::new(Vec::new()));
550 self.tools.push(Arc::new(crate::tools::PrunTool::new(
551 pending.clone(),
552 crate::tools::PrunVariant::Prun,
553 )));
554 self.tools.push(Arc::new(crate::tools::PrunTool::new(
555 pending.clone(),
556 crate::tools::PrunVariant::PrunWithMemo,
557 )));
558 self.prun_pending = Some(pending);
559 self
560 }
561
562 pub fn with_convert_to_llm(
564 mut self,
565 f: impl Fn(&[AgentMessage]) -> Vec<Message> + Send + Sync + 'static,
566 ) -> Self {
567 self.convert_to_llm = Some(Arc::new(f));
568 self
569 }
570
571 pub fn with_transform_context(
573 mut self,
574 f: impl Fn(Vec<AgentMessage>) -> Vec<AgentMessage> + Send + Sync + 'static,
575 ) -> Self {
576 self.transform_context = Some(Arc::new(f));
577 self
578 }
579
580 pub fn with_block_compaction_strategy(
583 mut self,
584 strategy: impl crate::context::BlockCompactionStrategy + 'static,
585 ) -> Self {
586 if let Some(ref mut ctx) = self.context_config {
587 ctx.compaction.block_strategy = Some(Arc::new(strategy));
588 }
589 self
590 }
591
592 pub fn on_before_compaction_start(
594 mut self,
595 f: impl Fn(usize, usize) -> bool + Send + Sync + 'static,
596 ) -> Self {
597 self.before_compaction_start = Some(Arc::new(f));
598 self
599 }
600
601 pub fn on_after_compaction_end(
603 mut self,
604 f: impl Fn(usize, usize, usize, usize) + Send + Sync + 'static,
605 ) -> Self {
606 self.after_compaction_end = Some(Arc::new(f));
607 self
608 }
609
610 pub fn with_context_translation(
612 mut self,
613 strategy: Arc<dyn ContextTranslationStrategy>,
614 ) -> Self {
615 self.context_translation = Some(strategy);
616 self
617 }
618
619 pub fn with_sub_agent(mut self, sub: crate::agents::SubAgentTool) -> Self {
621 self.tools.push(Arc::new(sub));
622 self
623 }
624
625 pub fn without_context_management(mut self) -> Self {
627 self.context_config = None;
628 self.execution_limits = None;
629 self
630 }
631
632 #[cfg(feature = "openapi")]
636 pub async fn with_openapi_file(
637 mut self,
638 path: impl AsRef<std::path::Path>,
639 config: crate::openapi::OpenApiConfig,
640 filter: &crate::openapi::OperationFilter,
641 ) -> Result<Self, crate::openapi::OpenApiError> {
642 let adapters = crate::openapi::OpenApiToolAdapter::from_file(path, config, filter).await?;
643 for adapter in adapters {
644 self.tools.push(Arc::new(adapter));
645 }
646 Ok(self)
647 }
648
649 #[cfg(feature = "openapi")]
651 pub async fn with_openapi_url(
652 mut self,
653 url: &str,
654 config: crate::openapi::OpenApiConfig,
655 filter: &crate::openapi::OperationFilter,
656 ) -> Result<Self, crate::openapi::OpenApiError> {
657 let adapters = crate::openapi::OpenApiToolAdapter::from_url(url, config, filter).await?;
658 for adapter in adapters {
659 self.tools.push(Arc::new(adapter));
660 }
661 Ok(self)
662 }
663
664 #[cfg(feature = "openapi")]
666 pub fn with_openapi_spec(
667 mut self,
668 spec_str: &str,
669 config: crate::openapi::OpenApiConfig,
670 filter: &crate::openapi::OperationFilter,
671 ) -> Result<Self, crate::openapi::OpenApiError> {
672 let adapters = crate::openapi::OpenApiToolAdapter::from_str(spec_str, config, filter)?;
673 for adapter in adapters {
674 self.tools.push(Arc::new(adapter));
675 }
676 Ok(self)
677 }
678
679 pub async fn with_mcp_server_stdio(
683 mut self,
684 command: &str, args: &[&str], env: Option<HashMap<String, String>>, ) -> Result<Self, McpError> {
688 let client = McpClient::connect_stdio(command, args, env).await?;
689 let client = Arc::new(tokio::sync::Mutex::new(client));
690 let adapters = McpToolAdapter::from_client(client).await?;
691 for adapter in adapters {
692 self.tools.push(Arc::new(adapter));
693 }
694 Ok(self)
695 }
696
697 pub async fn with_mcp_server_http(mut self, url: &str) -> Result<Self, McpError> {
699 let client = McpClient::connect_http(url).await?;
700 let client = Arc::new(tokio::sync::Mutex::new(client));
701 let adapters = McpToolAdapter::from_client(client).await?;
702 for adapter in adapters {
703 self.tools.push(Arc::new(adapter));
704 }
705 Ok(self)
706 }
707
708 pub async fn prompt(&mut self, text: impl Into<String>) -> mpsc::UnboundedReceiver<AgentEvent> {
719 let (tx, rx) = mpsc::unbounded_channel();
720 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
721 self.prompt_messages_with_sender(vec![msg], tx).await;
722 rx
723 }
724
725 pub async fn prompt_with_sender(
729 &mut self,
730 text: impl Into<String>,
731 tx: mpsc::UnboundedSender<AgentEvent>,
732 ) {
733 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
734 self.prompt_messages_with_sender(vec![msg], tx).await;
735 }
736
737 fn next_loop_id(&mut self, config: &AgentLoopConfig) -> String {
756 let effective_config_id = if let Some(ref id) = config.config_id {
757 id.clone()
758 } else {
759 let slugify = |s: &str| -> String {
760 s.chars()
761 .map(|c| {
762 if c.is_alphanumeric() || c == '-' {
763 c
764 } else {
765 '-'
766 }
767 })
768 .collect()
769 };
770 let thinking_part = if config.thinking_level != ThinkingLevel::Off {
771 ".thinking"
772 } else {
773 ""
774 };
775 format!(
776 "{}.{}{}",
777 config.model_config.provider,
778 slugify(&config.model_config.id),
779 thinking_part
780 )
781 };
782 let thread_key = format!("{}.{}", self.session_id, effective_config_id);
783 let n = self.loop_counters.entry(thread_key.clone()).or_insert(0);
784 *n += 1;
785 format!("{}.{}", thread_key, n)
786 }
787
788 pub fn compact_context_with_sender(
825 &mut self,
826 tx: &tokio::sync::mpsc::UnboundedSender<AgentEvent>,
827 ) {
828 let (Some(session), Some(ctx_config)) =
829 (self.session.as_mut(), self.context_config.as_ref())
830 else {
831 return; };
833 let comp = &ctx_config.compaction;
834 let max_tokens = ctx_config.max_context_tokens;
835
836 let loop_id = self
837 .last_loop_id
838 .clone()
839 .unwrap_or_else(|| "compaction".to_string());
840
841 let _ = tx.send(AgentEvent::AgentStart {
842 agent_id: self.agent_id.clone(),
843 session_id: self.session_id.clone(),
844 loop_id: loop_id.clone(),
845 parent_loop_id: self.last_loop_id.clone(),
846 continuation_kind: ContinuationKind::Compaction,
847 timestamp: chrono::Utc::now(),
848 metadata: None,
849 config_snapshot: None, });
851
852 let msgs_before = self.messages.len();
853 let tokens_before = crate::context::total_tokens(&self.messages);
854
855 let _ = tx.send(AgentEvent::CompactionStarted {
856 loop_id: loop_id.clone(),
857 estimated_tokens: tokens_before,
858 message_count: msgs_before,
859 timestamp: chrono::Utc::now(),
860 });
861
862 let strategy: &dyn crate::context::BlockCompactionStrategy =
863 &crate::context::DefaultBlockCompaction;
864 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
865
866 if let Some(record) = session.get_loop_mut(current_lid) {
868 record.messages = self.messages.clone();
869 }
870
871 crate::context::compact_session_loops(
872 session,
873 current_lid,
874 strategy,
875 comp,
876 max_tokens,
877 None,
878 );
879 self.messages = crate::context::build_context_from_session(
880 session,
881 current_lid,
882 comp,
883 max_tokens,
884 None,
885 );
886
887 let msgs_after = self.messages.len();
888 let tokens_after = crate::context::total_tokens(&self.messages);
889 let chain = session.loop_chain_to(current_lid);
890 let loops_compacted = chain
891 .iter()
892 .filter(|lid| {
893 session
894 .get_loop(lid)
895 .map(|r| r.compaction_block.is_some())
896 .unwrap_or(false)
897 })
898 .count();
899
900 let _ = tx.send(AgentEvent::CompactionEnded {
901 loop_id: loop_id.clone(),
902 messages_before: msgs_before,
903 messages_after: msgs_after,
904 estimated_tokens_before: tokens_before,
905 estimated_tokens_after: tokens_after,
906 loops_compacted,
907 timestamp: chrono::Utc::now(),
908 });
909
910 let _ = tx.send(AgentEvent::AgentEnd {
911 loop_id,
912 messages: vec![],
913 usage: Usage::default(),
914 timestamp: chrono::Utc::now(),
915 rejection: None,
916 });
917 }
918
919 pub fn compact_context(&mut self) -> usize {
925 let (Some(session), Some(ctx_config)) =
926 (self.session.as_mut(), self.context_config.as_ref())
927 else {
928 return 0; };
930 let comp = &ctx_config.compaction;
931 let max_tokens = ctx_config.max_context_tokens;
932
933 let strategy: &dyn crate::context::BlockCompactionStrategy =
934 &crate::context::DefaultBlockCompaction;
935 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
936
937 if let Some(record) = session.get_loop_mut(current_lid) {
938 record.messages = self.messages.clone();
939 }
940
941 crate::context::compact_session_loops(
942 session,
943 current_lid,
944 strategy,
945 comp,
946 max_tokens,
947 None,
948 );
949 self.messages = crate::context::build_context_from_session(
950 session,
951 current_lid,
952 comp,
953 max_tokens,
954 None,
955 );
956
957 let chain = session.loop_chain_to(current_lid);
958 chain
959 .iter()
960 .filter(|lid| {
961 session
962 .get_loop(lid)
963 .map(|r| r.compaction_block.is_some())
964 .unwrap_or(false)
965 })
966 .count()
967 }
968
969 fn build_config(&self) -> Result<AgentLoopConfig, super::agent::AgentBuildError> {
972 let steering_queue = self.steering_queue.clone(); let steering_mode = self.steering_mode; let follow_up_queue = self.follow_up_queue.clone();
977 let follow_up_mode = self.follow_up_mode;
978
979 Ok(AgentLoopConfig {
984 model_config: self.model_config.clone(),
985 provider_override: self.provider_override.clone(),
986 thinking_level: self.thinking_level,
987 max_tokens: self.max_tokens,
988 temperature: self.temperature,
989 convert_to_llm: self.convert_to_llm.clone(),
990 transform_context: self.transform_context.clone(),
991 get_steering_messages: Some(Box::new(move || {
992 let mut queue = lock_queue(&steering_queue); match steering_mode {
996 QueueMode::OneAtATime => {
997 if queue.is_empty() {
998 vec![]
999 } else {
1000 vec![queue.remove(0)] }
1002 }
1003 QueueMode::All => queue.drain(..).collect(), }
1005 })),
1006 context_config: self.context_config.clone(),
1007 execution_limits: self.execution_limits.clone(),
1008 cache_config: self.cache_config.clone(),
1009 tool_execution: self.tool_execution.clone(),
1010 tool_timeout: self.tool_timeout,
1011 response_format: self.response_format.clone(),
1012 retry_config: self.retry_config.clone(),
1013 get_follow_up_messages: Some(Box::new(move || {
1014 let mut queue = lock_queue(&follow_up_queue);
1015 match follow_up_mode {
1016 QueueMode::OneAtATime => {
1017 if queue.is_empty() {
1018 vec![]
1019 } else {
1020 vec![queue.remove(0)]
1021 }
1022 }
1023 QueueMode::All => queue.drain(..).collect(),
1024 }
1025 })),
1026 before_turn: self.before_turn.clone(),
1027 after_turn: self.after_turn.clone(),
1028 before_loop: self.before_loop.clone(),
1029 after_loop: self.after_loop.clone(),
1030 before_tool_execution: self.before_tool_execution.clone(),
1031 after_tool_execution: self.after_tool_execution.clone(),
1032 before_tool_execution_update: self.before_tool_execution_update.clone(),
1033 after_tool_execution_update: self.after_tool_execution_update.clone(),
1034 before_compaction_start: self.before_compaction_start.clone(),
1035 after_compaction_end: self.after_compaction_end.clone(),
1036 on_error: self.on_error.clone(),
1037 input_filters: self.input_filters.clone(),
1038 first_turn_trigger: TurnTrigger::User,
1039 config_id: self.config_id.clone(),
1040 context_translation: self.context_translation.clone(),
1041 prun_pending: self.prun_pending.clone(),
1042 })
1043 }
1044
1045 pub fn new_session(&mut self) -> String {
1054 self.session_id = uuid::Uuid::new_v4().to_string();
1055 self.loop_counters.clear();
1056 self.last_loop_id = None;
1057 self.last_active_at = None;
1061 self.session_id.clone()
1062 }
1063
1064 pub fn check_and_rotate(&mut self, threshold: std::time::Duration) -> Option<String> {
1072 let last = self.last_active_at?;
1073 let elapsed = (chrono::Utc::now() - last)
1074 .to_std()
1075 .unwrap_or(std::time::Duration::ZERO);
1076 if elapsed > threshold {
1077 Some(self.new_session())
1078 } else {
1079 None
1080 }
1081 }
1082}
1083
1084#[async_trait::async_trait]
1087impl Agent for BasicAgent {
1088 async fn prompt_messages_with_sender(
1092 &mut self,
1093 messages: Vec<AgentMessage>,
1094 tx: mpsc::UnboundedSender<AgentEvent>,
1095 ) {
1096 assert!(
1108 !self.is_streaming,
1109 "Agent is already streaming. Use steer() or follow_up()."
1110 );
1111
1112 self.last_active_at = Some(chrono::Utc::now());
1113 let cancel = CancellationToken::new();
1114 self.cancel = Some(cancel.clone()); self.is_streaming = true;
1116
1117 let config = self
1142 .build_config()
1143 .expect("BasicAgent always provides a model_config");
1144 let loop_id = self.next_loop_id(&config);
1145 self.last_loop_id = Some(loop_id.clone());
1146
1147 let mut context = AgentContext {
1148 system_prompt: self.system_prompt.clone(),
1149 messages: self.messages.clone(),
1150 tools: std::mem::take(&mut self.tools), agent_id: Some(self.agent_id.clone()),
1152 session_id: Some(self.session_id.clone()),
1153 loop_id: Some(loop_id),
1154 parent_loop_id: None, continuation_kind: None,
1156 session: self.session.take(), user_context: Vec::new(),
1158 inrun_context: Vec::new(),
1159 };
1160
1161 let _new_messages = agent_loop(messages, &mut context, &config, tx, cancel).await;
1162
1163 self.tools = context.tools;
1164 self.messages = context.messages;
1165 self.session = context.session; self.is_streaming = false;
1167 self.cancel = None;
1168 }
1169
1170 async fn continue_loop_with_sender(
1178 &mut self,
1179 tx: mpsc::UnboundedSender<AgentEvent>, kind: ContinuationKind, ) {
1182 assert!(!self.is_streaming, "Agent is already streaming.");
1183 assert!(!self.messages.is_empty(), "No messages to continue from.");
1184
1185 let cancel = CancellationToken::new();
1186 self.cancel = Some(cancel.clone());
1187 self.is_streaming = true;
1188
1189 let config = self
1192 .build_config()
1193 .expect("BasicAgent always provides a model_config");
1194 let loop_id = self.next_loop_id(&config);
1195 let parent_loop_id = self.last_loop_id.clone(); self.last_loop_id = Some(loop_id.clone());
1197
1198 let tag = chrono::Utc::now().to_rfc3339();
1200 let kind_with_tag = match kind {
1201 ContinuationKind::Initial => ContinuationKind::Default, ContinuationKind::Default => ContinuationKind::Default,
1203 ContinuationKind::Rerun { .. } => ContinuationKind::Rerun { tag },
1204 ContinuationKind::Branch { .. } => ContinuationKind::Branch { tag },
1205 ContinuationKind::Compaction => ContinuationKind::Compaction,
1206 };
1207
1208 let mut context = AgentContext {
1210 system_prompt: self.system_prompt.clone(),
1211 messages: self.messages.clone(),
1212 tools: std::mem::take(&mut self.tools),
1213 agent_id: Some(self.agent_id.clone()),
1214 session_id: Some(self.session_id.clone()),
1215 loop_id: Some(loop_id),
1216 parent_loop_id,
1217 continuation_kind: Some(kind_with_tag),
1218 session: self.session.take(),
1219 user_context: Vec::new(),
1220 inrun_context: Vec::new(),
1221 };
1222
1223 let _new_messages = agent_loop_continue(&mut context, &config, tx, cancel).await;
1224
1225 self.tools = context.tools;
1226 self.messages = context.messages;
1227 self.session = context.session;
1228 self.is_streaming = false;
1229 self.cancel = None;
1230 }
1231
1232 fn messages(&self) -> &[AgentMessage] {
1235 &self.messages
1236 }
1237
1238 fn is_streaming(&self) -> bool {
1239 self.is_streaming
1240 }
1241
1242 fn agent_id(&self) -> &str {
1243 &self.agent_id
1244 }
1245
1246 fn session_id(&self) -> &str {
1247 &self.session_id
1248 }
1249
1250 fn last_loop_id(&self) -> Option<&str> {
1251 self.last_loop_id.as_deref()
1252 }
1253
1254 fn clear_messages(&mut self) {
1257 self.messages.clear();
1258 }
1259
1260 fn append_message(&mut self, msg: AgentMessage) {
1261 self.messages.push(msg);
1262 }
1263
1264 fn replace_messages(&mut self, msgs: Vec<AgentMessage>) {
1265 self.messages = msgs;
1266 }
1267
1268 fn save_messages(&self) -> Result<String, serde_json::Error> {
1269 serde_json::to_string(&self.messages)
1270 }
1271
1272 fn restore_messages(&mut self, json: &str) -> Result<(), serde_json::Error> {
1273 let msgs: Vec<AgentMessage> = serde_json::from_str(json)?;
1274 self.messages = msgs;
1275 Ok(())
1276 }
1277
1278 fn set_tools(&mut self, tools: Vec<Arc<dyn AgentTool>>) {
1279 self.tools = tools;
1280 }
1281
1282 fn abort(&self) {
1285 if let Some(ref cancel) = self.cancel {
1286 cancel.cancel();
1287 }
1288 }
1289
1290 fn reset(&mut self) {
1291 self.messages.clear();
1292 self.clear_all_queues();
1293 self.is_streaming = false;
1294 self.cancel = None;
1295 }
1296
1297 fn steer(&self, msg: AgentMessage) {
1318 lock_queue(&self.steering_queue).push(msg);
1319 }
1320
1321 fn follow_up(&self, msg: AgentMessage) {
1322 lock_queue(&self.follow_up_queue).push(msg);
1323 }
1324
1325 fn clear_steering_queue(&self) {
1326 lock_queue(&self.steering_queue).clear();
1327 }
1328
1329 fn clear_follow_up_queue(&self) {
1330 lock_queue(&self.follow_up_queue).clear();
1331 }
1332
1333 fn set_steering_mode(&mut self, mode: QueueMode) {
1334 self.steering_mode = mode;
1335 }
1336
1337 fn set_follow_up_mode(&mut self, mode: QueueMode) {
1338 self.follow_up_mode = mode;
1339 }
1340
1341 fn profile(&self) -> Option<&AgentProfile> {
1344 self.profile.as_ref()
1345 }
1346
1347 fn system_prompt(&self) -> &str {
1348 &self.system_prompt
1349 }
1350
1351 fn model_config(&self) -> Option<&ModelConfig> {
1352 Some(&self.model_config)
1353 }
1354
1355 fn thinking_level(&self) -> ThinkingLevel {
1356 self.thinking_level
1357 }
1358
1359 fn temperature(&self) -> Option<f32> {
1360 self.temperature
1361 }
1362
1363 fn max_tokens(&self) -> Option<u32> {
1364 self.max_tokens
1365 }
1366
1367 fn context_config(&self) -> Option<&ContextConfig> {
1368 self.context_config.as_ref()
1369 }
1370
1371 fn execution_limits(&self) -> Option<&ExecutionLimits> {
1372 self.execution_limits.as_ref()
1373 }
1374
1375 fn cache_config(&self) -> CacheConfig {
1376 self.cache_config.clone()
1377 }
1378
1379 fn tool_execution(&self) -> ToolExecutionStrategy {
1380 self.tool_execution.clone()
1381 }
1382
1383 fn tool_timeout(&self) -> Option<std::time::Duration> {
1384 self.tool_timeout
1385 }
1386
1387 fn response_format(&self) -> crate::provider::ResponseFormat {
1388 self.response_format.clone()
1389 }
1390
1391 fn retry_config(&self) -> crate::provider::retry::RetryConfig {
1392 self.retry_config.clone()
1393 }
1394
1395 fn session(&self) -> Option<&crate::session::Session> {
1398 self.session.as_ref()
1399 }
1400
1401 fn workspace(&self) -> Option<&std::path::Path> {
1402 self.workspace.as_deref()
1403 }
1404
1405 fn set_before_turn(&mut self, f: Option<BeforeTurnFn>) {
1408 self.before_turn = f;
1409 }
1410
1411 fn set_after_turn(&mut self, f: Option<AfterTurnFn>) {
1412 self.after_turn = f;
1413 }
1414
1415 fn set_before_loop(&mut self, f: Option<BeforeLoopFn>) {
1416 self.before_loop = f;
1417 }
1418
1419 fn set_after_loop(&mut self, f: Option<AfterLoopFn>) {
1420 self.after_loop = f;
1421 }
1422
1423 fn set_before_tool_execution(&mut self, f: Option<BeforeToolExecutionFn>) {
1424 self.before_tool_execution = f;
1425 }
1426
1427 fn set_after_tool_execution(&mut self, f: Option<AfterToolExecutionFn>) {
1428 self.after_tool_execution = f;
1429 }
1430
1431 fn set_before_tool_execution_update(&mut self, f: Option<BeforeToolExecutionUpdateFn>) {
1432 self.before_tool_execution_update = f;
1433 }
1434
1435 fn set_after_tool_execution_update(&mut self, f: Option<AfterToolExecutionUpdateFn>) {
1436 self.after_tool_execution_update = f;
1437 }
1438
1439 fn set_convert_to_llm(&mut self, f: Option<ConvertToLlmFn>) {
1440 self.convert_to_llm = f;
1441 }
1442
1443 fn set_transform_context(&mut self, f: Option<TransformContextFn>) {
1444 self.transform_context = f;
1445 }
1446
1447 fn set_block_compaction_strategy(
1448 &mut self,
1449 s: Option<Arc<dyn crate::context::BlockCompactionStrategy>>,
1450 ) {
1451 if let Some(ref mut ctx) = self.context_config {
1453 ctx.compaction.block_strategy = s;
1454 }
1455 }
1456
1457 fn set_before_compaction_start(&mut self, f: Option<BeforeCompactionStartFn>) {
1458 self.before_compaction_start = f;
1459 }
1460
1461 fn set_after_compaction_end(&mut self, f: Option<AfterCompactionEndFn>) {
1462 self.after_compaction_end = f;
1463 }
1464
1465 fn set_context_translation(&mut self, s: Option<Arc<dyn ContextTranslationStrategy>>) {
1466 self.context_translation = s;
1467 }
1468
1469 fn context_translation(&self) -> Option<Arc<dyn ContextTranslationStrategy>> {
1470 self.context_translation.clone()
1471 }
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use super::*;
1477
1478 fn poison_mutex<T: Send + 'static>(m: Arc<Mutex<T>>) {
1480 let m = m.clone();
1481 let _ = std::thread::spawn(move || {
1482 let _guard = m.lock().unwrap();
1483 panic!("intentional panic to poison the mutex");
1484 })
1485 .join(); }
1487
1488 #[test]
1489 fn lock_queue_recovers_inner_vec_after_poison() {
1490 let q: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![1, 2, 3]));
1491 poison_mutex(q.clone());
1492 assert!(
1493 q.is_poisoned(),
1494 "test pre-condition: mutex should be poisoned"
1495 );
1496
1497 let guard = lock_queue(&q);
1499 assert_eq!(*guard, vec![1, 2, 3]);
1500 }
1501
1502 #[test]
1503 fn basic_agent_steer_survives_queue_poison() {
1504 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1505
1506 let q = agent.steering_queue.clone();
1508 let _ = std::thread::spawn(move || {
1509 let _g = q.lock().unwrap();
1510 panic!("poison the steering queue");
1511 })
1512 .join();
1513 assert!(agent.steering_queue.is_poisoned());
1514
1515 agent.steer(AgentMessage::Llm(LlmMessage::new(Message::user("hi"))));
1517 assert_eq!(lock_queue(&agent.steering_queue).len(), 1);
1518 agent.clear_steering_queue();
1519 assert_eq!(lock_queue(&agent.steering_queue).len(), 0);
1520 }
1521
1522 #[test]
1523 fn basic_agent_follow_up_survives_queue_poison() {
1524 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1525
1526 let q = agent.follow_up_queue.clone();
1527 let _ = std::thread::spawn(move || {
1528 let _g = q.lock().unwrap();
1529 panic!("poison the follow-up queue");
1530 })
1531 .join();
1532 assert!(agent.follow_up_queue.is_poisoned());
1533
1534 agent.follow_up(AgentMessage::Llm(LlmMessage::new(Message::user("more"))));
1535 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 1);
1536 agent.clear_follow_up_queue();
1537 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 0);
1538 }
1539}