1use super::agent::{Agent, QueueMode};
8use super::profile::AgentProfile;
9use crate::agent_loop::{
10 agent_loop, agent_loop_continue, AfterCompactionEndFn, AfterLoopFn, AfterToolExecutionFn,
11 AfterToolExecutionUpdateFn, AfterTurnFn, AgentLoopConfig, BeforeCompactionStartFn,
12 BeforeLoopFn, BeforeToolExecutionFn, BeforeToolExecutionUpdateFn, BeforeTurnFn, ConvertToLlmFn,
13 OnErrorFn, TransformContextFn,
14};
15use crate::context::{CompactionStrategy, ContextConfig, ExecutionLimits};
16use crate::mcp::{McpClient, McpError, McpToolAdapter};
17use crate::provider::context_translation::ContextTranslationStrategy;
18use crate::provider::{ModelConfig, StreamProvider};
19use crate::types::*;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25fn lock_queue<T>(q: &Mutex<Vec<T>>) -> std::sync::MutexGuard<'_, Vec<T>> {
32 match q.lock() {
33 Ok(g) => g,
34 Err(poison) => {
35 tracing::warn!(
36 "BasicAgent: queue mutex was poisoned; recovering inner Vec. \
37 A prior hook or tool callback panicked while holding the lock."
38 );
39 poison.into_inner()
40 }
41 }
42}
43
44pub struct BasicAgent {
80 pub system_prompt: String,
82 pub model_config: ModelConfig, pub provider_override: Option<Arc<dyn StreamProvider>>,
86 pub thinking_level: ThinkingLevel,
87 pub max_tokens: Option<u32>, pub temperature: Option<f32>, messages: Vec<AgentMessage>, tools: Vec<Arc<dyn AgentTool>>,
109
110 steering_queue: Arc<Mutex<Vec<AgentMessage>>>,
134 follow_up_queue: Arc<Mutex<Vec<AgentMessage>>>,
135 steering_mode: QueueMode,
136 follow_up_mode: QueueMode,
137
138 pub context_config: Option<ContextConfig>,
140 pub execution_limits: Option<ExecutionLimits>,
141 pub cache_config: CacheConfig,
142 pub tool_execution: ToolExecutionStrategy,
143 pub tool_timeout: Option<std::time::Duration>,
144 pub response_format: crate::provider::ResponseFormat,
145 pub retry_config: crate::provider::retry::RetryConfig,
146
147 before_turn: Option<BeforeTurnFn>,
149 after_turn: Option<AfterTurnFn>,
150 on_error: Option<OnErrorFn>,
151
152 input_filters: Vec<Arc<dyn InputFilter>>,
154
155 before_loop: Option<BeforeLoopFn>,
157 after_loop: Option<AfterLoopFn>,
158 before_tool_execution: Option<BeforeToolExecutionFn>,
159 after_tool_execution: Option<AfterToolExecutionFn>,
160 before_tool_execution_update: Option<BeforeToolExecutionUpdateFn>,
161 after_tool_execution_update: Option<AfterToolExecutionUpdateFn>,
162 convert_to_llm: Option<ConvertToLlmFn>,
163 transform_context: Option<TransformContextFn>,
164 before_compaction_start: Option<BeforeCompactionStartFn>,
165 after_compaction_end: Option<AfterCompactionEndFn>,
166 context_translation: Option<Arc<dyn ContextTranslationStrategy>>,
167 prun_pending: Option<Arc<Mutex<Vec<crate::tools::prun::PrunRequest>>>>,
168 revert_pending: Option<Arc<Mutex<Vec<crate::tools::revert::RevertRequest>>>>,
169
170 config_id: Option<String>,
172 profile: Option<AgentProfile>,
173 workspace: Option<std::path::PathBuf>,
174
175 cancel: Option<CancellationToken>,
177 is_streaming: bool, agent_id: String,
205 session_id: String,
206 loop_counters: HashMap<String, usize>,
207 last_loop_id: Option<String>,
208 last_active_at: Option<chrono::DateTime<chrono::Utc>>,
211 session: Option<crate::session::Session>,
213}
214
215impl BasicAgent {
216 pub fn new(model_config: ModelConfig) -> Self {
217 Self {
218 model_config,
219 provider_override: None,
220 system_prompt: String::new(),
221 thinking_level: ThinkingLevel::Off,
222 max_tokens: None,
223 temperature: None,
224 messages: Vec::new(),
225 tools: Vec::new(),
226 steering_queue: Arc::new(Mutex::new(Vec::new())), follow_up_queue: Arc::new(Mutex::new(Vec::new())),
228 steering_mode: QueueMode::OneAtATime,
229 follow_up_mode: QueueMode::OneAtATime,
230 context_config: Some(ContextConfig::default()), execution_limits: Some(ExecutionLimits::default()), cache_config: CacheConfig::default(),
233 tool_execution: ToolExecutionStrategy::default(), tool_timeout: None,
235 response_format: crate::provider::ResponseFormat::Text,
236 retry_config: crate::provider::retry::RetryConfig::default(), before_turn: None,
238 after_turn: None,
239 on_error: None,
240 input_filters: Vec::new(),
241 before_loop: None,
242 after_loop: None,
243 before_tool_execution: None,
244 after_tool_execution: None,
245 before_tool_execution_update: None,
246 after_tool_execution_update: None,
247 convert_to_llm: None,
248 transform_context: None,
249 before_compaction_start: None,
250 after_compaction_end: None,
251 context_translation: None,
252 prun_pending: None,
253 revert_pending: None,
254 config_id: None,
255 profile: None,
256 workspace: None,
257 cancel: None,
258 is_streaming: false,
259 agent_id: uuid::Uuid::new_v4().to_string(),
260 session_id: uuid::Uuid::new_v4().to_string(),
261 loop_counters: HashMap::new(),
262 last_loop_id: None,
263 last_active_at: None,
264 session: None,
265 }
266 }
267
268 pub fn with_session(mut self, session: crate::session::Session) -> Self {
270 self.session = Some(session);
271 self
272 }
273
274 pub fn take_session(&mut self) -> Option<crate::session::Session> {
276 self.session.take()
277 }
278
279 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
299 self.system_prompt = prompt.into();
300 self
301 }
302
303 pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
304 self.thinking_level = level;
305 self
306 }
307
308 pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
309 self.tools = tools;
310 self
311 }
312
313 pub fn tools(&self) -> &[Arc<dyn AgentTool>] {
318 &self.tools
319 }
320
321 pub fn with_model_config(mut self, config: ModelConfig) -> Self {
322 self.model_config = config;
323 self
324 }
325
326 pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
329 self.provider_override = Some(provider);
330 self
331 }
332
333 pub fn with_max_tokens(mut self, max: u32) -> Self {
334 self.max_tokens = Some(max);
335 self
336 }
337
338 pub fn with_context_config(mut self, config: ContextConfig) -> Self {
339 self.context_config = Some(config);
340 self
341 }
342
343 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
344 self.cache_config = config;
345 self
346 }
347
348 pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
349 self.tool_execution = strategy;
350 self
351 }
352
353 pub fn with_tool_timeout(mut self, timeout: std::time::Duration) -> Self {
355 self.tool_timeout = Some(timeout);
356 self
357 }
358
359 pub fn with_response_format(mut self, format: crate::provider::ResponseFormat) -> Self {
361 self.response_format = format;
362 self
363 }
364
365 pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
366 self.retry_config = config;
367 self
368 }
369
370 pub fn with_skills(mut self, skills: crate::context::skills::SkillSet) -> Self {
376 let prompt_fragment = skills.format_for_prompt();
377 if !prompt_fragment.is_empty() {
378 if self.system_prompt.is_empty() {
379 self.system_prompt = prompt_fragment;
380 } else {
381 self.system_prompt = format!("{}\n\n{}", self.system_prompt, prompt_fragment);
382 }
383 }
384 self
385 }
386
387 pub fn with_execution_limits(mut self, limits: ExecutionLimits) -> Self {
388 self.execution_limits = Some(limits);
389 self
390 }
391
392 pub fn with_messages(mut self, msgs: Vec<AgentMessage>) -> Self {
393 self.messages = msgs;
394 self
395 }
396
397 pub fn on_before_turn(
419 mut self,
420 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
421 ) -> Self {
422 self.before_turn = Some(Arc::new(move |msgs, turn| {
423 let r = f(msgs, turn);
424 Box::pin(async move { r })
425 }));
426 self
427 }
428
429 pub fn on_after_turn(
430 mut self,
431 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
432 ) -> Self {
433 self.after_turn = Some(Arc::new(move |msgs, usage| {
434 f(msgs, usage);
435 Box::pin(async move {})
436 }));
437 self
438 }
439
440 pub fn on_error(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
441 self.on_error = Some(Arc::new(move |err| {
442 f(err);
443 Box::pin(async move {})
444 }));
445 self
446 }
447
448 pub fn with_input_filter(mut self, filter: impl InputFilter + 'static) -> Self {
450 self.input_filters.push(Arc::new(filter));
451 self
452 }
453
454 pub fn with_compaction_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
458 if let Some(ref mut ctx) = self.context_config {
459 ctx.compaction.in_memory_strategy = Some(Arc::new(strategy));
460 }
461 self
462 }
463
464 pub fn with_profile(mut self, profile: AgentProfile) -> Self {
468 if let Some(ref prompt) = profile.system_prompt {
470 if self.system_prompt.is_empty() {
471 self.system_prompt = prompt.clone();
472 }
473 }
474 if let Some(level) = profile.thinking_level {
475 if self.thinking_level == ThinkingLevel::Off {
476 self.thinking_level = level;
477 }
478 }
479 if let Some(temp) = profile.temperature {
480 if self.temperature.is_none() {
481 self.temperature = Some(temp);
482 }
483 }
484 if let Some(max) = profile.max_tokens {
485 if self.max_tokens.is_none() {
486 self.max_tokens = Some(max);
487 }
488 }
489 if let Some(ref id) = profile.config_id {
490 if self.config_id.is_none() {
491 self.config_id = Some(id.clone());
492 }
493 }
494 self.profile = Some(profile);
495 self
496 }
497
498 pub fn with_temperature(mut self, temp: f32) -> Self {
500 self.temperature = Some(temp);
501 self
502 }
503
504 pub fn with_config_id(mut self, id: impl Into<String>) -> Self {
506 self.config_id = Some(id.into());
507 self
508 }
509
510 pub fn with_workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
513 self.workspace = Some(path.into());
514 self
515 }
516
517 pub fn on_before_loop(
522 mut self,
523 f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
524 ) -> Self {
525 self.before_loop = Some(Arc::new(move |msgs, idx| {
526 let r = f(msgs, idx);
527 Box::pin(async move { r })
528 }));
529 self
530 }
531
532 pub fn on_after_loop(
534 mut self,
535 f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
536 ) -> Self {
537 self.after_loop = Some(Arc::new(move |msgs, usage| {
538 f(msgs, usage);
539 Box::pin(async move {})
540 }));
541 self
542 }
543
544 pub fn on_before_tool_execution(
546 mut self,
547 f: impl Fn(&str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
548 ) -> Self {
549 self.before_tool_execution = Some(Arc::new(move |name, id, args| {
550 let r = f(name, id, args);
551 Box::pin(async move { r })
552 }));
553 self
554 }
555
556 pub fn on_after_tool_execution(
558 mut self,
559 f: impl Fn(&str, &str, bool) + Send + Sync + 'static,
560 ) -> Self {
561 self.after_tool_execution = Some(Arc::new(move |name, id, is_error| {
562 f(name, id, is_error);
563 Box::pin(async move {})
564 }));
565 self
566 }
567
568 pub fn on_before_tool_execution_update(
573 mut self,
574 f: impl Fn(&str, &str, &str) -> bool + Send + Sync + 'static,
575 ) -> Self {
576 self.before_tool_execution_update = Some(Arc::new(f));
577 self
578 }
579
580 pub fn on_after_tool_execution_update(
585 mut self,
586 f: impl Fn(&str, &str, &str) + Send + Sync + 'static,
587 ) -> Self {
588 self.after_tool_execution_update = Some(Arc::new(f));
589 self
590 }
591
592 pub fn with_prun_tool(mut self) -> Self {
595 let pending = Arc::new(Mutex::new(Vec::new()));
596 self.tools.push(Arc::new(crate::tools::PrunTool::new(
597 pending.clone(),
598 crate::tools::PrunVariant::Prun,
599 )));
600 self.tools.push(Arc::new(crate::tools::PrunTool::new(
601 pending.clone(),
602 crate::tools::PrunVariant::PrunWithMemo,
603 )));
604 self.prun_pending = Some(pending);
605 self
606 }
607
608 pub fn with_revert_tool(mut self) -> Self {
616 let pending = Arc::new(Mutex::new(Vec::new()));
617 self.tools
618 .push(Arc::new(crate::tools::RevertTool::new(pending.clone())));
619 self.revert_pending = Some(pending);
620 self
621 }
622
623 pub fn with_convert_to_llm(
625 mut self,
626 f: impl Fn(&[AgentMessage]) -> Vec<Message> + Send + Sync + 'static,
627 ) -> Self {
628 self.convert_to_llm = Some(Arc::new(f));
629 self
630 }
631
632 pub fn with_transform_context(
634 mut self,
635 f: impl Fn(Vec<AgentMessage>) -> Vec<AgentMessage> + Send + Sync + 'static,
636 ) -> Self {
637 self.transform_context = Some(Arc::new(f));
638 self
639 }
640
641 pub fn with_block_compaction_strategy(
644 mut self,
645 strategy: impl crate::context::BlockCompactionStrategy + 'static,
646 ) -> Self {
647 if let Some(ref mut ctx) = self.context_config {
648 ctx.compaction.block_strategy = Some(Arc::new(strategy));
649 }
650 self
651 }
652
653 pub fn on_before_compaction_start(
655 mut self,
656 f: impl Fn(usize, usize) -> bool + Send + Sync + 'static,
657 ) -> Self {
658 self.before_compaction_start = Some(Arc::new(move |est, mc| {
659 let r = f(est, mc);
660 Box::pin(async move { r })
661 }));
662 self
663 }
664
665 pub fn on_after_compaction_end(
667 mut self,
668 f: impl Fn(usize, usize, usize, usize) + Send + Sync + 'static,
669 ) -> Self {
670 self.after_compaction_end = Some(Arc::new(move |mb, ma, tb, ta| {
671 f(mb, ma, tb, ta);
672 Box::pin(async move {})
673 }));
674 self
675 }
676
677 pub fn with_context_translation(
679 mut self,
680 strategy: Arc<dyn ContextTranslationStrategy>,
681 ) -> Self {
682 self.context_translation = Some(strategy);
683 self
684 }
685
686 pub fn with_sub_agent(mut self, sub: crate::agents::SubAgentTool) -> Self {
688 self.tools.push(Arc::new(sub));
689 self
690 }
691
692 pub fn without_context_management(mut self) -> Self {
694 self.context_config = None;
695 self.execution_limits = None;
696 self
697 }
698
699 #[cfg(feature = "openapi")]
703 pub async fn with_openapi_file(
704 mut self,
705 path: impl AsRef<std::path::Path>,
706 config: crate::openapi::OpenApiConfig,
707 filter: &crate::openapi::OperationFilter,
708 ) -> Result<Self, crate::openapi::OpenApiError> {
709 let adapters = crate::openapi::OpenApiToolAdapter::from_file(path, config, filter).await?;
710 for adapter in adapters {
711 self.tools.push(Arc::new(adapter));
712 }
713 Ok(self)
714 }
715
716 #[cfg(feature = "openapi")]
718 pub async fn with_openapi_url(
719 mut self,
720 url: &str,
721 config: crate::openapi::OpenApiConfig,
722 filter: &crate::openapi::OperationFilter,
723 ) -> Result<Self, crate::openapi::OpenApiError> {
724 let adapters = crate::openapi::OpenApiToolAdapter::from_url(url, config, filter).await?;
725 for adapter in adapters {
726 self.tools.push(Arc::new(adapter));
727 }
728 Ok(self)
729 }
730
731 #[cfg(feature = "openapi")]
733 pub fn with_openapi_spec(
734 mut self,
735 spec_str: &str,
736 config: crate::openapi::OpenApiConfig,
737 filter: &crate::openapi::OperationFilter,
738 ) -> Result<Self, crate::openapi::OpenApiError> {
739 let adapters = crate::openapi::OpenApiToolAdapter::from_str(spec_str, config, filter)?;
740 for adapter in adapters {
741 self.tools.push(Arc::new(adapter));
742 }
743 Ok(self)
744 }
745
746 pub async fn with_mcp_server_stdio(
750 mut self,
751 command: &str, args: &[&str], env: Option<HashMap<String, String>>, ) -> Result<Self, McpError> {
755 let client = McpClient::connect_stdio(command, args, env).await?;
756 let client = Arc::new(tokio::sync::Mutex::new(client));
757 let adapters = McpToolAdapter::from_client(client).await?;
758 for adapter in adapters {
759 self.tools.push(Arc::new(adapter));
760 }
761 Ok(self)
762 }
763
764 pub async fn with_mcp_server_http(mut self, url: &str) -> Result<Self, McpError> {
766 let client = McpClient::connect_http(url).await?;
767 let client = Arc::new(tokio::sync::Mutex::new(client));
768 let adapters = McpToolAdapter::from_client(client).await?;
769 for adapter in adapters {
770 self.tools.push(Arc::new(adapter));
771 }
772 Ok(self)
773 }
774
775 pub async fn prompt(&mut self, text: impl Into<String>) -> mpsc::UnboundedReceiver<AgentEvent> {
786 let (tx, rx) = mpsc::unbounded_channel();
787 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
788 self.prompt_messages_with_sender(vec![msg], tx).await;
789 rx
790 }
791
792 pub async fn prompt_with_sender(
796 &mut self,
797 text: impl Into<String>,
798 tx: mpsc::UnboundedSender<AgentEvent>,
799 ) {
800 let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
801 self.prompt_messages_with_sender(vec![msg], tx).await;
802 }
803
804 fn next_loop_id(&mut self, config: &AgentLoopConfig) -> String {
823 let effective_config_id = if let Some(ref id) = config.config_id {
824 id.clone()
825 } else {
826 let slugify = |s: &str| -> String {
827 s.chars()
828 .map(|c| {
829 if c.is_alphanumeric() || c == '-' {
830 c
831 } else {
832 '-'
833 }
834 })
835 .collect()
836 };
837 let thinking_part = if config.thinking_level != ThinkingLevel::Off {
838 ".thinking"
839 } else {
840 ""
841 };
842 format!(
843 "{}.{}{}",
844 config.model_config.provider,
845 slugify(&config.model_config.id),
846 thinking_part
847 )
848 };
849 let thread_key = format!("{}.{}", self.session_id, effective_config_id);
850 let n = self.loop_counters.entry(thread_key.clone()).or_insert(0);
851 *n += 1;
852 format!("{}.{}", thread_key, n)
853 }
854
855 pub async fn compact_context_with_sender(
894 &mut self,
895 tx: &tokio::sync::mpsc::UnboundedSender<AgentEvent>,
896 ) {
897 let (Some(session), Some(ctx_config)) =
898 (self.session.as_mut(), self.context_config.as_ref())
899 else {
900 return; };
902 let comp = &ctx_config.compaction;
903 let max_tokens = ctx_config.max_context_tokens;
904
905 let loop_id = self
906 .last_loop_id
907 .clone()
908 .unwrap_or_else(|| "compaction".to_string());
909
910 let _ = tx.send(AgentEvent::AgentStart {
911 agent_id: self.agent_id.clone(),
912 session_id: self.session_id.clone(),
913 loop_id: loop_id.clone(),
914 parent_loop_id: self.last_loop_id.clone(),
915 continuation_kind: ContinuationKind::Compaction,
916 timestamp: chrono::Utc::now(),
917 metadata: None,
918 config_snapshot: None, });
920
921 let msgs_before = self.messages.len();
922 let tokens_before = crate::context::total_tokens(&self.messages);
923
924 let _ = tx.send(AgentEvent::CompactionStarted {
925 loop_id: loop_id.clone(),
926 estimated_tokens: tokens_before,
927 message_count: msgs_before,
928 timestamp: chrono::Utc::now(),
929 });
930
931 let strategy: &dyn crate::context::BlockCompactionStrategy =
932 &crate::context::DefaultBlockCompaction;
933 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
934
935 if let Some(record) = session.get_loop_mut(current_lid) {
937 record.messages = self.messages.clone();
938 }
939
940 crate::context::compact_session_loops(
941 session,
942 current_lid,
943 strategy,
944 comp,
945 max_tokens,
946 None,
947 )
948 .await;
949 self.messages = crate::context::build_context_from_session(
950 session,
951 current_lid,
952 comp,
953 max_tokens,
954 None,
955 );
956
957 let msgs_after = self.messages.len();
958 let tokens_after = crate::context::total_tokens(&self.messages);
959 let chain = session.loop_chain_to(current_lid);
960 let loops_compacted = chain
961 .iter()
962 .filter(|lid| {
963 session
964 .get_loop(lid)
965 .map(|r| r.compaction_block.is_some())
966 .unwrap_or(false)
967 })
968 .count();
969
970 let _ = tx.send(AgentEvent::CompactionEnded {
971 loop_id: loop_id.clone(),
972 messages_before: msgs_before,
973 messages_after: msgs_after,
974 estimated_tokens_before: tokens_before,
975 estimated_tokens_after: tokens_after,
976 loops_compacted,
977 timestamp: chrono::Utc::now(),
978 });
979
980 let _ = tx.send(AgentEvent::AgentEnd {
981 loop_id,
982 messages: vec![],
983 usage: Usage::default(),
984 timestamp: chrono::Utc::now(),
985 rejection: None,
986 });
987 }
988
989 pub async fn compact_context(&mut self) -> usize {
997 let (Some(session), Some(ctx_config)) =
998 (self.session.as_mut(), self.context_config.as_ref())
999 else {
1000 return 0; };
1002 let comp = &ctx_config.compaction;
1003 let max_tokens = ctx_config.max_context_tokens;
1004
1005 let strategy: &dyn crate::context::BlockCompactionStrategy =
1006 &crate::context::DefaultBlockCompaction;
1007 let current_lid = self.last_loop_id.as_deref().unwrap_or("");
1008
1009 if let Some(record) = session.get_loop_mut(current_lid) {
1010 record.messages = self.messages.clone();
1011 }
1012
1013 crate::context::compact_session_loops(
1014 session,
1015 current_lid,
1016 strategy,
1017 comp,
1018 max_tokens,
1019 None,
1020 )
1021 .await;
1022 self.messages = crate::context::build_context_from_session(
1023 session,
1024 current_lid,
1025 comp,
1026 max_tokens,
1027 None,
1028 );
1029
1030 let chain = session.loop_chain_to(current_lid);
1031 chain
1032 .iter()
1033 .filter(|lid| {
1034 session
1035 .get_loop(lid)
1036 .map(|r| r.compaction_block.is_some())
1037 .unwrap_or(false)
1038 })
1039 .count()
1040 }
1041
1042 pub fn build_config(&self) -> Result<AgentLoopConfig, super::agent::AgentBuildError> {
1045 let steering_queue = self.steering_queue.clone(); let steering_mode = self.steering_mode; let follow_up_queue = self.follow_up_queue.clone();
1050 let follow_up_mode = self.follow_up_mode;
1051
1052 Ok(AgentLoopConfig {
1057 model_config: self.model_config.clone(),
1058 provider_override: self.provider_override.clone(),
1059 thinking_level: self.thinking_level,
1060 max_tokens: self.max_tokens,
1061 temperature: self.temperature,
1062 convert_to_llm: self.convert_to_llm.clone(),
1063 transform_context: self.transform_context.clone(),
1064 get_steering_messages: Some(Box::new(move || {
1065 let mut queue = lock_queue(&steering_queue); match steering_mode {
1069 QueueMode::OneAtATime => {
1070 if queue.is_empty() {
1071 vec![]
1072 } else {
1073 vec![queue.remove(0)] }
1075 }
1076 QueueMode::All => queue.drain(..).collect(), }
1078 })),
1079 context_config: self.context_config.clone(),
1080 execution_limits: self.execution_limits.clone(),
1081 cache_config: self.cache_config.clone(),
1082 tool_execution: self.tool_execution.clone(),
1083 tool_timeout: self.tool_timeout,
1084 response_format: self.response_format.clone(),
1085 retry_config: self.retry_config.clone(),
1086 get_follow_up_messages: Some(Box::new(move || {
1087 let mut queue = lock_queue(&follow_up_queue);
1088 match follow_up_mode {
1089 QueueMode::OneAtATime => {
1090 if queue.is_empty() {
1091 vec![]
1092 } else {
1093 vec![queue.remove(0)]
1094 }
1095 }
1096 QueueMode::All => queue.drain(..).collect(),
1097 }
1098 })),
1099 before_turn: self.before_turn.clone(),
1100 after_turn: self.after_turn.clone(),
1101 before_loop: self.before_loop.clone(),
1102 after_loop: self.after_loop.clone(),
1103 before_tool_execution: self.before_tool_execution.clone(),
1104 after_tool_execution: self.after_tool_execution.clone(),
1105 before_tool_execution_update: self.before_tool_execution_update.clone(),
1106 after_tool_execution_update: self.after_tool_execution_update.clone(),
1107 before_compaction_start: self.before_compaction_start.clone(),
1108 after_compaction_end: self.after_compaction_end.clone(),
1109 on_error: self.on_error.clone(),
1110 input_filters: self.input_filters.clone(),
1111 first_turn_trigger: TurnTrigger::User,
1112 config_id: self.config_id.clone(),
1113 context_translation: self.context_translation.clone(),
1114 prun_pending: self.prun_pending.clone(),
1115 revert_pending: self.revert_pending.clone(),
1116 })
1117 }
1118
1119 pub fn new_session(&mut self) -> String {
1128 self.session_id = uuid::Uuid::new_v4().to_string();
1129 self.loop_counters.clear();
1130 self.last_loop_id = None;
1131 self.last_active_at = None;
1135 self.session_id.clone()
1136 }
1137
1138 pub fn check_and_rotate(&mut self, threshold: std::time::Duration) -> Option<String> {
1146 let last = self.last_active_at?;
1147 let elapsed = (chrono::Utc::now() - last)
1148 .to_std()
1149 .unwrap_or(std::time::Duration::ZERO);
1150 if elapsed > threshold {
1151 Some(self.new_session())
1152 } else {
1153 None
1154 }
1155 }
1156}
1157
1158#[async_trait::async_trait]
1161impl Agent for BasicAgent {
1162 async fn prompt_messages_with_sender(
1166 &mut self,
1167 messages: Vec<AgentMessage>,
1168 tx: mpsc::UnboundedSender<AgentEvent>,
1169 ) {
1170 assert!(
1182 !self.is_streaming,
1183 "Agent is already streaming. Use steer() or follow_up()."
1184 );
1185
1186 self.last_active_at = Some(chrono::Utc::now());
1187 let cancel = CancellationToken::new();
1188 self.cancel = Some(cancel.clone()); self.is_streaming = true;
1190
1191 let config = self
1216 .build_config()
1217 .expect("BasicAgent always provides a model_config");
1218 let loop_id = self.next_loop_id(&config);
1219 self.last_loop_id = Some(loop_id.clone());
1220
1221 let mut context = AgentContext {
1222 system_prompt: self.system_prompt.clone(),
1223 messages: self.messages.clone(),
1224 tools: std::mem::take(&mut self.tools), agent_id: Some(self.agent_id.clone()),
1226 session_id: Some(self.session_id.clone()),
1227 loop_id: Some(loop_id),
1228 parent_loop_id: None, continuation_kind: None,
1230 session: self.session.take(), user_context: Vec::new(),
1232 inrun_context: Vec::new(),
1233 active_node_id: None,
1234 next_node_id: 0,
1235 };
1236
1237 let _new_messages = agent_loop(messages, &mut context, &config, tx, cancel).await;
1238
1239 self.tools = context.tools;
1240 self.messages = context.messages;
1241 self.session = context.session; self.is_streaming = false;
1243 self.cancel = None;
1244 }
1245
1246 async fn continue_loop_with_sender(
1254 &mut self,
1255 tx: mpsc::UnboundedSender<AgentEvent>, kind: ContinuationKind, ) {
1258 assert!(!self.is_streaming, "Agent is already streaming.");
1259 assert!(!self.messages.is_empty(), "No messages to continue from.");
1260
1261 let cancel = CancellationToken::new();
1262 self.cancel = Some(cancel.clone());
1263 self.is_streaming = true;
1264
1265 let config = self
1268 .build_config()
1269 .expect("BasicAgent always provides a model_config");
1270 let loop_id = self.next_loop_id(&config);
1271 let parent_loop_id = self.last_loop_id.clone(); self.last_loop_id = Some(loop_id.clone());
1273
1274 let tag = chrono::Utc::now().to_rfc3339();
1276 let kind_with_tag = match kind {
1277 ContinuationKind::Initial => ContinuationKind::Default, ContinuationKind::Default => ContinuationKind::Default,
1279 ContinuationKind::Rerun { .. } => ContinuationKind::Rerun { tag },
1280 ContinuationKind::Branch { .. } => ContinuationKind::Branch { tag },
1281 ContinuationKind::Compaction => ContinuationKind::Compaction,
1282 };
1283
1284 let mut context = AgentContext {
1286 system_prompt: self.system_prompt.clone(),
1287 messages: self.messages.clone(),
1288 tools: std::mem::take(&mut self.tools),
1289 agent_id: Some(self.agent_id.clone()),
1290 session_id: Some(self.session_id.clone()),
1291 loop_id: Some(loop_id),
1292 parent_loop_id,
1293 continuation_kind: Some(kind_with_tag),
1294 session: self.session.take(),
1295 user_context: Vec::new(),
1296 inrun_context: Vec::new(),
1297 active_node_id: None,
1298 next_node_id: 0,
1299 };
1300
1301 let _new_messages = agent_loop_continue(&mut context, &config, tx, cancel).await;
1302
1303 self.tools = context.tools;
1304 self.messages = context.messages;
1305 self.session = context.session;
1306 self.is_streaming = false;
1307 self.cancel = None;
1308 }
1309
1310 fn messages(&self) -> &[AgentMessage] {
1313 &self.messages
1314 }
1315
1316 fn is_streaming(&self) -> bool {
1317 self.is_streaming
1318 }
1319
1320 fn agent_id(&self) -> &str {
1321 &self.agent_id
1322 }
1323
1324 fn session_id(&self) -> &str {
1325 &self.session_id
1326 }
1327
1328 fn last_loop_id(&self) -> Option<&str> {
1329 self.last_loop_id.as_deref()
1330 }
1331
1332 fn clear_messages(&mut self) {
1335 self.messages.clear();
1336 }
1337
1338 fn append_message(&mut self, msg: AgentMessage) {
1339 self.messages.push(msg);
1340 }
1341
1342 fn replace_messages(&mut self, msgs: Vec<AgentMessage>) {
1343 self.messages = msgs;
1344 }
1345
1346 fn save_messages(&self) -> Result<String, serde_json::Error> {
1347 serde_json::to_string(&self.messages)
1348 }
1349
1350 fn restore_messages(&mut self, json: &str) -> Result<(), serde_json::Error> {
1351 let msgs: Vec<AgentMessage> = serde_json::from_str(json)?;
1352 self.messages = msgs;
1353 Ok(())
1354 }
1355
1356 fn set_tools(&mut self, tools: Vec<Arc<dyn AgentTool>>) {
1357 self.tools = tools;
1358 }
1359
1360 fn abort(&self) {
1363 if let Some(ref cancel) = self.cancel {
1364 cancel.cancel();
1365 }
1366 }
1367
1368 fn reset(&mut self) {
1369 self.messages.clear();
1370 self.clear_all_queues();
1371 self.is_streaming = false;
1372 self.cancel = None;
1373 }
1374
1375 fn steer(&self, msg: AgentMessage) {
1396 lock_queue(&self.steering_queue).push(msg);
1397 }
1398
1399 fn follow_up(&self, msg: AgentMessage) {
1400 lock_queue(&self.follow_up_queue).push(msg);
1401 }
1402
1403 fn clear_steering_queue(&self) {
1404 lock_queue(&self.steering_queue).clear();
1405 }
1406
1407 fn clear_follow_up_queue(&self) {
1408 lock_queue(&self.follow_up_queue).clear();
1409 }
1410
1411 fn set_steering_mode(&mut self, mode: QueueMode) {
1412 self.steering_mode = mode;
1413 }
1414
1415 fn set_follow_up_mode(&mut self, mode: QueueMode) {
1416 self.follow_up_mode = mode;
1417 }
1418
1419 fn profile(&self) -> Option<&AgentProfile> {
1422 self.profile.as_ref()
1423 }
1424
1425 fn system_prompt(&self) -> &str {
1426 &self.system_prompt
1427 }
1428
1429 fn model_config(&self) -> Option<&ModelConfig> {
1430 Some(&self.model_config)
1431 }
1432
1433 fn thinking_level(&self) -> ThinkingLevel {
1434 self.thinking_level
1435 }
1436
1437 fn temperature(&self) -> Option<f32> {
1438 self.temperature
1439 }
1440
1441 fn max_tokens(&self) -> Option<u32> {
1442 self.max_tokens
1443 }
1444
1445 fn context_config(&self) -> Option<&ContextConfig> {
1446 self.context_config.as_ref()
1447 }
1448
1449 fn execution_limits(&self) -> Option<&ExecutionLimits> {
1450 self.execution_limits.as_ref()
1451 }
1452
1453 fn cache_config(&self) -> CacheConfig {
1454 self.cache_config.clone()
1455 }
1456
1457 fn tool_execution(&self) -> ToolExecutionStrategy {
1458 self.tool_execution.clone()
1459 }
1460
1461 fn tool_timeout(&self) -> Option<std::time::Duration> {
1462 self.tool_timeout
1463 }
1464
1465 fn response_format(&self) -> crate::provider::ResponseFormat {
1466 self.response_format.clone()
1467 }
1468
1469 fn retry_config(&self) -> crate::provider::retry::RetryConfig {
1470 self.retry_config.clone()
1471 }
1472
1473 fn session(&self) -> Option<&crate::session::Session> {
1476 self.session.as_ref()
1477 }
1478
1479 fn workspace(&self) -> Option<&std::path::Path> {
1480 self.workspace.as_deref()
1481 }
1482
1483 fn set_before_turn(&mut self, f: Option<BeforeTurnFn>) {
1486 self.before_turn = f;
1487 }
1488
1489 fn set_after_turn(&mut self, f: Option<AfterTurnFn>) {
1490 self.after_turn = f;
1491 }
1492
1493 fn set_before_loop(&mut self, f: Option<BeforeLoopFn>) {
1494 self.before_loop = f;
1495 }
1496
1497 fn set_after_loop(&mut self, f: Option<AfterLoopFn>) {
1498 self.after_loop = f;
1499 }
1500
1501 fn set_before_tool_execution(&mut self, f: Option<BeforeToolExecutionFn>) {
1502 self.before_tool_execution = f;
1503 }
1504
1505 fn set_after_tool_execution(&mut self, f: Option<AfterToolExecutionFn>) {
1506 self.after_tool_execution = f;
1507 }
1508
1509 fn set_before_tool_execution_update(&mut self, f: Option<BeforeToolExecutionUpdateFn>) {
1510 self.before_tool_execution_update = f;
1511 }
1512
1513 fn set_after_tool_execution_update(&mut self, f: Option<AfterToolExecutionUpdateFn>) {
1514 self.after_tool_execution_update = f;
1515 }
1516
1517 fn set_convert_to_llm(&mut self, f: Option<ConvertToLlmFn>) {
1518 self.convert_to_llm = f;
1519 }
1520
1521 fn set_transform_context(&mut self, f: Option<TransformContextFn>) {
1522 self.transform_context = f;
1523 }
1524
1525 fn set_block_compaction_strategy(
1526 &mut self,
1527 s: Option<Arc<dyn crate::context::BlockCompactionStrategy>>,
1528 ) {
1529 if let Some(ref mut ctx) = self.context_config {
1531 ctx.compaction.block_strategy = s;
1532 }
1533 }
1534
1535 fn set_before_compaction_start(&mut self, f: Option<BeforeCompactionStartFn>) {
1536 self.before_compaction_start = f;
1537 }
1538
1539 fn set_after_compaction_end(&mut self, f: Option<AfterCompactionEndFn>) {
1540 self.after_compaction_end = f;
1541 }
1542
1543 fn set_context_translation(&mut self, s: Option<Arc<dyn ContextTranslationStrategy>>) {
1544 self.context_translation = s;
1545 }
1546
1547 fn context_translation(&self) -> Option<Arc<dyn ContextTranslationStrategy>> {
1548 self.context_translation.clone()
1549 }
1550}
1551
1552#[cfg(test)]
1553mod tests {
1554 use super::*;
1555
1556 fn poison_mutex<T: Send + 'static>(m: Arc<Mutex<T>>) {
1558 let m = m.clone();
1559 let _ = std::thread::spawn(move || {
1560 let _guard = m.lock().unwrap();
1561 panic!("intentional panic to poison the mutex");
1562 })
1563 .join(); }
1565
1566 #[test]
1567 fn lock_queue_recovers_inner_vec_after_poison() {
1568 let q: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![1, 2, 3]));
1569 poison_mutex(q.clone());
1570 assert!(
1571 q.is_poisoned(),
1572 "test pre-condition: mutex should be poisoned"
1573 );
1574
1575 let guard = lock_queue(&q);
1577 assert_eq!(*guard, vec![1, 2, 3]);
1578 }
1579
1580 #[test]
1581 fn basic_agent_steer_survives_queue_poison() {
1582 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1583
1584 let q = agent.steering_queue.clone();
1586 let _ = std::thread::spawn(move || {
1587 let _g = q.lock().unwrap();
1588 panic!("poison the steering queue");
1589 })
1590 .join();
1591 assert!(agent.steering_queue.is_poisoned());
1592
1593 agent.steer(AgentMessage::Llm(LlmMessage::new(Message::user("hi"))));
1595 assert_eq!(lock_queue(&agent.steering_queue).len(), 1);
1596 agent.clear_steering_queue();
1597 assert_eq!(lock_queue(&agent.steering_queue).len(), 0);
1598 }
1599
1600 #[test]
1601 fn basic_agent_follow_up_survives_queue_poison() {
1602 let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1603
1604 let q = agent.follow_up_queue.clone();
1605 let _ = std::thread::spawn(move || {
1606 let _g = q.lock().unwrap();
1607 panic!("poison the follow-up queue");
1608 })
1609 .join();
1610 assert!(agent.follow_up_queue.is_poisoned());
1611
1612 agent.follow_up(AgentMessage::Llm(LlmMessage::new(Message::user("more"))));
1613 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 1);
1614 agent.clear_follow_up_queue();
1615 assert_eq!(lock_queue(&agent.follow_up_queue).len(), 0);
1616 }
1617}