Skip to main content

phi_core/agents/
basic_agent.rs

1//! The default in-memory `Agent` implementation.
2//!
3//! [`BasicAgent`] owns a single linear message history and runs the `agent_loop` directly.
4//! It is the concrete type most callers will use. Configuration is done via the builder
5//! pattern; the runtime interface is provided by the [`Agent`](super::Agent) trait.
6
7use super::agent::{Agent, QueueMode};
8use super::profile::AgentProfile;
9use crate::agent_loop::{
10    agent_loop, agent_loop_continue, AfterCompactionEndFn, AfterLoopFn, AfterToolExecutionFn,
11    AfterToolExecutionUpdateFn, AfterTurnFn, AgentLoopConfig, BeforeCompactionStartFn,
12    BeforeLoopFn, BeforeToolExecutionFn, BeforeToolExecutionUpdateFn, BeforeTurnFn, ConvertToLlmFn,
13    OnErrorFn, TransformContextFn,
14};
15use crate::context::{CompactionStrategy, ContextConfig, ExecutionLimits};
16use crate::mcp::{McpClient, McpError, McpToolAdapter};
17use crate::provider::context_translation::ContextTranslationStrategy;
18use crate::provider::{ModelConfig, StreamProvider};
19use crate::types::*;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25/// Acquire a `Mutex<Vec<T>>` guard tolerating poisoning.
26///
27/// `Mutex` poisoning happens when a thread panics while holding the guard.
28/// The steering and follow-up queues are recoverable data — a panic in a hook
29/// or tool callback should not crash the entire agent session. We log a warning
30/// and recover the inner `Vec` via `PoisonError::into_inner()`.
31fn lock_queue<T>(q: &Mutex<Vec<T>>) -> std::sync::MutexGuard<'_, Vec<T>> {
32    match q.lock() {
33        Ok(g) => g,
34        Err(poison) => {
35            tracing::warn!(
36                "BasicAgent: queue mutex was poisoned; recovering inner Vec. \
37                 A prior hook or tool callback panicked while holding the lock."
38            );
39            poison.into_inner()
40        }
41    }
42}
43
44/*
45ARCHITECTURE: BasicAgent vs agent_loop — stateful wrapper vs stateless functions
46
47The agent loop (agent_loop.rs) is a set of FREE FUNCTIONS — they take all their
48inputs as parameters and return outputs. They have no hidden state.
49
50The BasicAgent struct is an OPTIONAL stateful wrapper that owns:
51  - Message history (Vec<AgentMessage>) — the conversation so far
52  - Tools (Vec<Arc<dyn AgentTool>>) — registered capabilities
53  - ModelConfig — complete provider identity: id, api_key, base_url, api protocol, cost rates
54  - Steering/follow-up queues (Arc<Mutex<>>) — for mid-run interrupts
55
56Why this separation?
57  - Free functions: easier to test, compose, and reason about
58  - BasicAgent struct: easier to use in applications (less boilerplate)
59  - You can use agent_loop() directly if you need more control
60
61The BasicAgent uses the BUILDER PATTERN for construction:
62  BasicAgent::new(ModelConfig::anthropic("claude-sonnet-4-20250514", "Claude Sonnet 4", &key))
63      .with_system_prompt("...")
64      .with_tools(vec![...])
65
66Each `with_*` method takes `mut self` and returns `Self` — consuming and
67returning the same value. This chains naturally and avoids separate calls.
68Python analogy: it's like a fluent API but ownership-safe.
69*/
70
71/// Reference implementation of the [`Agent`] trait.
72///
73/// Custom agents should implement the `Agent` trait directly. New generic agent
74/// methods should be defined on the `Agent` trait first, then implemented here —
75/// never add public methods to `BasicAgent` without the corresponding trait method.
76///
77/// Configuration is done via the builder pattern before any prompting. The runtime
78/// interface (prompting, state access, control) is provided via the [`Agent`] trait.
79pub struct BasicAgent {
80    // -- Public configuration (readable/overridable externally) --
81    pub system_prompt: String,
82    pub model_config: ModelConfig, // complete provider identity: model id, api_key, base_url, cost rates
83    /// Optional provider override. When set, bypasses `ProviderRegistry` dispatch.
84    /// Used primarily in tests to inject a `MockProvider`.
85    pub provider_override: Option<Arc<dyn StreamProvider>>,
86    pub thinking_level: ThinkingLevel,
87    pub max_tokens: Option<u32>,  // None = use model_config.max_tokens
88    pub temperature: Option<f32>, // None = use provider default
89
90    // -- Private configuration (only mutated via builder methods) --
91    messages: Vec<AgentMessage>, // full conversation history
92
93    /*
94    RUST QUIRK: `Arc<dyn Trait>` — shared trait objects
95
96    `Arc<dyn AgentTool>` means: "I have shared (reference-counted) ownership of a
97    heap-allocated value of some type that implements AgentTool."
98
99    Why Arc instead of Box?
100    Arc allows the same tool to be shared across parallel agent branches without copying.
101    `AgentContext` clones (used for evaluational parallelism) increment each Arc's
102    reference count — zero-cost for tools. Tools are immutable during execution
103    (execute takes &self), so Arc sharing is semantically correct.
104
105    Python analogy: tools are shared objects — multiple agents can reference the same
106    tool instance without transferring ownership.
107    */
108    tools: Vec<Arc<dyn AgentTool>>,
109
110    /*
111    RUST QUIRK: `Arc<Mutex<Vec<AgentMessage>>>` — shared mutable state across threads
112
113    This is the canonical Rust pattern for "I need to mutate this from multiple places."
114
115    Arc  = Atomically Reference Counted — shared ownership (multiple holders, thread-safe)
116    Mutex = Mutual Exclusion — only one thread can access the inner value at a time
117
118    The BasicAgent OWNS the queues (Arc keeps them alive as long as BasicAgent is alive).
119    The agent loop USES the queues via the closures in build_config() — those closures
120    clone the Arc (incrementing the reference count) and lock the Mutex to read/drain.
121
122    Python analogy: threading.Lock() wrapping a shared list, passed to threads via closure.
123
124    Why Arc instead of Rc?
125    Rc (Reference Counted) is NOT thread-safe. Since tokio runs on a thread pool,
126    closures may execute on any thread, so we need Arc (atomic = thread-safe).
127
128    Queue access goes through `lock_queue()` (see top of file) which tolerates
129    poisoning — a panic in a hook or tool callback logs a warning and recovers the
130    inner `Vec` rather than crashing the agent session. Poisoning still indicates a
131    bug upstream; we surface it via `tracing::warn!`.
132    */
133    steering_queue: Arc<Mutex<Vec<AgentMessage>>>,
134    follow_up_queue: Arc<Mutex<Vec<AgentMessage>>>,
135    steering_mode: QueueMode,
136    follow_up_mode: QueueMode,
137
138    // Context, limits & caching
139    pub context_config: Option<ContextConfig>,
140    pub execution_limits: Option<ExecutionLimits>,
141    pub cache_config: CacheConfig,
142    pub tool_execution: ToolExecutionStrategy,
143    pub tool_timeout: Option<std::time::Duration>,
144    pub response_format: crate::provider::ResponseFormat,
145    pub retry_config: crate::provider::retry::RetryConfig,
146
147    // Lifecycle callbacks
148    before_turn: Option<BeforeTurnFn>,
149    after_turn: Option<AfterTurnFn>,
150    on_error: Option<OnErrorFn>,
151
152    // Input filters
153    input_filters: Vec<Arc<dyn InputFilter>>,
154
155    // ── Hook/callback fields (wired into build_config) ──────────────────
156    before_loop: Option<BeforeLoopFn>,
157    after_loop: Option<AfterLoopFn>,
158    before_tool_execution: Option<BeforeToolExecutionFn>,
159    after_tool_execution: Option<AfterToolExecutionFn>,
160    before_tool_execution_update: Option<BeforeToolExecutionUpdateFn>,
161    after_tool_execution_update: Option<AfterToolExecutionUpdateFn>,
162    convert_to_llm: Option<ConvertToLlmFn>,
163    transform_context: Option<TransformContextFn>,
164    before_compaction_start: Option<BeforeCompactionStartFn>,
165    after_compaction_end: Option<AfterCompactionEndFn>,
166    context_translation: Option<Arc<dyn ContextTranslationStrategy>>,
167    prun_pending: Option<Arc<Mutex<Vec<crate::tools::prun::PrunRequest>>>>,
168    revert_pending: Option<Arc<Mutex<Vec<crate::tools::revert::RevertRequest>>>>,
169
170    // ── Profile, config identity, and workspace ──────────────────────────
171    config_id: Option<String>,
172    profile: Option<AgentProfile>,
173    workspace: Option<std::path::PathBuf>,
174
175    // Control — cancel token is Some during a streaming call, None otherwise
176    cancel: Option<CancellationToken>,
177    is_streaming: bool, // guard against concurrent prompt() calls
178
179    // ── Session identity ─────────────────────────────────────────────────────
180    // These fields give every loop call within this BasicAgent a consistent, traceable identity.
181    // agent_id and session_id are generated once at BasicAgent::new() and threaded into every
182    // AgentContext built by this BasicAgent.
183    //
184    // loop_counters: HashMap keyed by "{session_id}.{effective_config_id}" — each unique
185    // (session, config) combination has its own monotonic counter, so loop IDs self-document
186    // which config produced them:
187    //   ses_xyz.anthropic.claude-opus-4.1   ← first claude loop
188    //   ses_xyz.openai.gpt-4o.1             ← first openai loop (independent counter)
189    //   ses_xyz.anthropic.claude-opus-4.2   ← second claude loop
190    //
191    // last_loop_id: tracks the most recently started loop so agent_loop_continue() can
192    // set parent_loop_id automatically, enabling ancestry tracking across reruns/branches.
193    //
194    /* ROADMAP — future session/identity capabilities:
195       - HITL resume: user cancels mid-run, reviews, resumes → use continue_loop_with_sender(Rerun|Branch)
196       - Checkpoint restore: context serialised to disk, later restored → continue_loop_with_sender(Branch)
197       - Parallel exploration: multiple branches from same checkpoint, concurrent →
198             multiple concurrent continue_loop_with_sender(Branch) calls in the same session
199       - Auto origin/continue selection: inspect last message role → if ToolResult, auto-continue
200       - Sub-agent parent linking (automatic): BasicAgent::with_sub_agent() could auto-pass
201             self.last_loop_id as parent_loop_id to SubAgentTool; currently requires
202             manual wiring via SubAgentTool::with_parent_loop_id()
203    */
204    agent_id: String,
205    session_id: String,
206    loop_counters: HashMap<String, usize>,
207    last_loop_id: Option<String>,
208    /// Timestamp of the most recent `prompt_messages_with_sender` call.
209    /// Used by [`check_and_rotate`][BasicAgent::check_and_rotate] to detect inactivity.
210    last_active_at: Option<chrono::DateTime<chrono::Utc>>,
211    /// Optional session for block-based compaction.
212    session: Option<crate::session::Session>,
213}
214
215impl BasicAgent {
216    pub fn new(model_config: ModelConfig) -> Self {
217        Self {
218            model_config,
219            provider_override: None,
220            system_prompt: String::new(),
221            thinking_level: ThinkingLevel::Off,
222            max_tokens: None,
223            temperature: None,
224            messages: Vec::new(),
225            tools: Vec::new(),
226            steering_queue: Arc::new(Mutex::new(Vec::new())), // empty, shared with closures
227            follow_up_queue: Arc::new(Mutex::new(Vec::new())),
228            steering_mode: QueueMode::OneAtATime,
229            follow_up_mode: QueueMode::OneAtATime,
230            context_config: Some(ContextConfig::default()), // enabled by default
231            execution_limits: Some(ExecutionLimits::default()), // enabled by default
232            cache_config: CacheConfig::default(),
233            tool_execution: ToolExecutionStrategy::default(), // Parallel
234            tool_timeout: None,
235            response_format: crate::provider::ResponseFormat::Text,
236            retry_config: crate::provider::retry::RetryConfig::default(), // 3 retries
237            before_turn: None,
238            after_turn: None,
239            on_error: None,
240            input_filters: Vec::new(),
241            before_loop: None,
242            after_loop: None,
243            before_tool_execution: None,
244            after_tool_execution: None,
245            before_tool_execution_update: None,
246            after_tool_execution_update: None,
247            convert_to_llm: None,
248            transform_context: None,
249            before_compaction_start: None,
250            after_compaction_end: None,
251            context_translation: None,
252            prun_pending: None,
253            revert_pending: None,
254            config_id: None,
255            profile: None,
256            workspace: None,
257            cancel: None,
258            is_streaming: false,
259            agent_id: uuid::Uuid::new_v4().to_string(),
260            session_id: uuid::Uuid::new_v4().to_string(),
261            loop_counters: HashMap::new(),
262            last_loop_id: None,
263            last_active_at: None,
264            session: None,
265        }
266    }
267
268    /// Set a session for block-based compaction.
269    pub fn with_session(mut self, session: crate::session::Session) -> Self {
270        self.session = Some(session);
271        self
272    }
273
274    /// Take the session out of the agent, returning ownership.
275    pub fn take_session(&mut self) -> Option<crate::session::Session> {
276        self.session.take()
277    }
278
279    /*
280    RUST QUIRK: Builder pattern — `mut self` + return `Self`
281
282    Builder methods take OWNERSHIP of `self` (consume the BasicAgent), modify it, then
283    return it. This allows chaining:
284      BasicAgent::new(p).with_model("x").with_tools(vec![...])
285
286    `mut self` — self is moved in (consumed), marked mutable for modification.
287    `self` in the return — move the (now mutated) value back to the caller.
288
289    NO CLONE IS MADE — ownership transfers in, gets mutated, transfers out.
290    This is zero-cost: just a stack value being modified in place.
291
292    Contrast with Python where you'd either mutate self in-place (returning None)
293    OR create a copy. Rust's builder pattern gives you chaining WITH ownership safety.
294    */
295
296    // -- Builder-style setters --
297
298    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
299        self.system_prompt = prompt.into();
300        self
301    }
302
303    pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
304        self.thinking_level = level;
305        self
306    }
307
308    pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
309        self.tools = tools;
310        self
311    }
312
313    /// Read-only view of the currently registered tools. Useful for tests that
314    /// assert the LLM-facing tool registry (e.g. the Composition I opt-in
315    /// guarantee — `revert_to_state` must NOT appear without an explicit
316    /// `with_revert_tool()` call).
317    pub fn tools(&self) -> &[Arc<dyn AgentTool>] {
318        &self.tools
319    }
320
321    pub fn with_model_config(mut self, config: ModelConfig) -> Self {
322        self.model_config = config;
323        self
324    }
325
326    /// Override the provider used by the agent loop, bypassing `ProviderRegistry` dispatch.
327    /// Primarily used in tests to inject a `MockProvider`.
328    pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
329        self.provider_override = Some(provider);
330        self
331    }
332
333    pub fn with_max_tokens(mut self, max: u32) -> Self {
334        self.max_tokens = Some(max);
335        self
336    }
337
338    pub fn with_context_config(mut self, config: ContextConfig) -> Self {
339        self.context_config = Some(config);
340        self
341    }
342
343    pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
344        self.cache_config = config;
345        self
346    }
347
348    pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
349        self.tool_execution = strategy;
350        self
351    }
352
353    /// Set the per-tool execution timeout. See [`AgentLoopConfig::tool_timeout`].
354    pub fn with_tool_timeout(mut self, timeout: std::time::Duration) -> Self {
355        self.tool_timeout = Some(timeout);
356        self
357    }
358
359    /// Set the desired LLM output shape. See [`crate::provider::ResponseFormat`].
360    pub fn with_response_format(mut self, format: crate::provider::ResponseFormat) -> Self {
361        self.response_format = format;
362        self
363    }
364
365    pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
366        self.retry_config = config;
367        self
368    }
369
370    /// Load skills and append their index to the system prompt.
371    ///
372    /// The skills index is appended as XML per the [AgentSkills standard](https://agentskills.io).
373    /// The agent can then read individual SKILL.md files using the `read_file` tool
374    /// when it decides a skill is relevant.
375    pub fn with_skills(mut self, skills: crate::context::skills::SkillSet) -> Self {
376        let prompt_fragment = skills.format_for_prompt();
377        if !prompt_fragment.is_empty() {
378            if self.system_prompt.is_empty() {
379                self.system_prompt = prompt_fragment;
380            } else {
381                self.system_prompt = format!("{}\n\n{}", self.system_prompt, prompt_fragment);
382            }
383        }
384        self
385    }
386
387    pub fn with_execution_limits(mut self, limits: ExecutionLimits) -> Self {
388        self.execution_limits = Some(limits);
389        self
390    }
391
392    pub fn with_messages(mut self, msgs: Vec<AgentMessage>) -> Self {
393        self.messages = msgs;
394        self
395    }
396
397    /*
398    RUST QUIRK: `impl Fn(...) + Send + Sync + 'static` — accepting a callable
399
400    This accepts ANY callable (closure or function) that:
401      - Takes (&[AgentMessage], usize) and returns bool   ← the Fn signature
402      - Is safe to call from another thread               ← Send + Sync
403      - Doesn't borrow from the local stack               ← 'static
404
405    Why not just `Box<dyn Fn(...)>`? Because the compiler can inline impl Fn
406    at the call site (monomorphization), while dyn Fn always goes through a vtable.
407    Both work; impl Fn is faster for one-time construction.
408
409    `Arc::new(f)` — wrap in Arc so it can be cloned cheaply into each AgentLoopConfig.
410    The Arc's type becomes Arc<dyn Fn(...)> (the BeforeTurnFn type alias).
411    */
412    /// Set the before-turn hook. Return `false` to abort the turn.
413    ///
414    /// 0.9.0: takes a sync closure for ergonomic back-compat. For async
415    /// bodies, set the field directly via `agent.before_turn = Some(...)`
416    /// using the `BeforeTurnFn` type alias (`Arc<dyn for<'a> Fn(...) ->
417    /// HookFuture<'a, bool> + Send + Sync>`).
418    pub fn on_before_turn(
419        mut self,
420        f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
421    ) -> Self {
422        self.before_turn = Some(Arc::new(move |msgs, turn| {
423            let r = f(msgs, turn);
424            Box::pin(async move { r })
425        }));
426        self
427    }
428
429    pub fn on_after_turn(
430        mut self,
431        f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
432    ) -> Self {
433        self.after_turn = Some(Arc::new(move |msgs, usage| {
434            f(msgs, usage);
435            Box::pin(async move {})
436        }));
437        self
438    }
439
440    pub fn on_error(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
441        self.on_error = Some(Arc::new(move |err| {
442            f(err);
443            Box::pin(async move {})
444        }));
445        self
446    }
447
448    /// Add an input filter. Filters run in order on user messages before the LLM call.
449    pub fn with_input_filter(mut self, filter: impl InputFilter + 'static) -> Self {
450        self.input_filters.push(Arc::new(filter));
451        self
452    }
453
454    /// Set a custom in-memory compaction strategy on the context config.
455    /// When set, replaces `DefaultCompaction` during context compaction
456    /// for sessionless runs. (G5: stored on CompactionConfig, not BasicAgent.)
457    pub fn with_compaction_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
458        if let Some(ref mut ctx) = self.context_config {
459            ctx.compaction.in_memory_strategy = Some(Arc::new(strategy));
460        }
461        self
462    }
463
464    /// Set the agent profile blueprint. Also copies profile fields into this agent's
465    /// public fields for backward compatibility (profile values act as defaults;
466    /// existing field values take precedence if already set).
467    pub fn with_profile(mut self, profile: AgentProfile) -> Self {
468        // Copy profile defaults into pub fields (only if not already set)
469        if let Some(ref prompt) = profile.system_prompt {
470            if self.system_prompt.is_empty() {
471                self.system_prompt = prompt.clone();
472            }
473        }
474        if let Some(level) = profile.thinking_level {
475            if self.thinking_level == ThinkingLevel::Off {
476                self.thinking_level = level;
477            }
478        }
479        if let Some(temp) = profile.temperature {
480            if self.temperature.is_none() {
481                self.temperature = Some(temp);
482            }
483        }
484        if let Some(max) = profile.max_tokens {
485            if self.max_tokens.is_none() {
486                self.max_tokens = Some(max);
487            }
488        }
489        if let Some(ref id) = profile.config_id {
490            if self.config_id.is_none() {
491                self.config_id = Some(id.clone());
492            }
493        }
494        self.profile = Some(profile);
495        self
496    }
497
498    /// Set the temperature for LLM calls.
499    pub fn with_temperature(mut self, temp: f32) -> Self {
500        self.temperature = Some(temp);
501        self
502    }
503
504    /// Set the config identity, used as the middle segment of `loop_id`.
505    pub fn with_config_id(mut self, id: impl Into<String>) -> Self {
506        self.config_id = Some(id.into());
507        self
508    }
509
510    /// Set the agent workspace directory. File paths in system prompt blocks
511    /// resolve relative to this directory.
512    pub fn with_workspace(mut self, path: impl Into<std::path::PathBuf>) -> Self {
513        self.workspace = Some(path.into());
514        self
515    }
516
517    /// Set the before-loop hook. Return `false` to abort the loop.
518    ///
519    /// 0.9.0: sync-closure setter for back-compat. Async bodies should set
520    /// the field directly via `BeforeLoopFn`.
521    pub fn on_before_loop(
522        mut self,
523        f: impl Fn(&[AgentMessage], usize) -> bool + Send + Sync + 'static,
524    ) -> Self {
525        self.before_loop = Some(Arc::new(move |msgs, idx| {
526            let r = f(msgs, idx);
527            Box::pin(async move { r })
528        }));
529        self
530    }
531
532    /// Set the after-loop hook.
533    pub fn on_after_loop(
534        mut self,
535        f: impl Fn(&[AgentMessage], &Usage) + Send + Sync + 'static,
536    ) -> Self {
537        self.after_loop = Some(Arc::new(move |msgs, usage| {
538            f(msgs, usage);
539            Box::pin(async move {})
540        }));
541        self
542    }
543
544    /// Set the before-tool-execution hook. Return `false` to skip the tool call.
545    pub fn on_before_tool_execution(
546        mut self,
547        f: impl Fn(&str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
548    ) -> Self {
549        self.before_tool_execution = Some(Arc::new(move |name, id, args| {
550            let r = f(name, id, args);
551            Box::pin(async move { r })
552        }));
553        self
554    }
555
556    /// Set the after-tool-execution hook.
557    pub fn on_after_tool_execution(
558        mut self,
559        f: impl Fn(&str, &str, bool) + Send + Sync + 'static,
560    ) -> Self {
561        self.after_tool_execution = Some(Arc::new(move |name, id, is_error| {
562            f(name, id, is_error);
563            Box::pin(async move {})
564        }));
565        self
566    }
567
568    /// Set the before-tool-execution-update hook. Return `false` to suppress the event.
569    ///
570    /// Sync hook (no async migration in 0.9.0 — fires from inside the sync
571    /// `ToolUpdateFn` callback).
572    pub fn on_before_tool_execution_update(
573        mut self,
574        f: impl Fn(&str, &str, &str) -> bool + Send + Sync + 'static,
575    ) -> Self {
576        self.before_tool_execution_update = Some(Arc::new(f));
577        self
578    }
579
580    /// Set the after-tool-execution-update hook.
581    ///
582    /// Sync hook (no async migration in 0.9.0 — fires from inside the sync
583    /// `ToolUpdateFn` callback).
584    pub fn on_after_tool_execution_update(
585        mut self,
586        f: impl Fn(&str, &str, &str) + Send + Sync + 'static,
587    ) -> Self {
588        self.after_tool_execution_update = Some(Arc::new(f));
589        self
590    }
591
592    /// Enable the prun tool (both `prun` and `prun_with_memo` variants).
593    /// Adds both tool variants to the tool set and wires up the shared pending queue.
594    pub fn with_prun_tool(mut self) -> Self {
595        let pending = Arc::new(Mutex::new(Vec::new()));
596        self.tools.push(Arc::new(crate::tools::PrunTool::new(
597            pending.clone(),
598            crate::tools::PrunVariant::Prun,
599        )));
600        self.tools.push(Arc::new(crate::tools::PrunTool::new(
601            pending.clone(),
602            crate::tools::PrunVariant::PrunWithMemo,
603        )));
604        self.prun_pending = Some(pending);
605        self
606    }
607
608    /// Enable the `revert_to_state` tool (Composition I braking layer).
609    ///
610    /// Registers a [`RevertTool`](crate::tools::RevertTool) on the agent and
611    /// wires the shared `revert_pending` queue into the loop config so
612    /// `apply_revert` runs between turns. The opt-in guarantee — there is no
613    /// other registration path — is the load-bearing safety invariant for
614    /// downstream consumers that have not yet adopted Composition I.
615    pub fn with_revert_tool(mut self) -> Self {
616        let pending = Arc::new(Mutex::new(Vec::new()));
617        self.tools
618            .push(Arc::new(crate::tools::RevertTool::new(pending.clone())));
619        self.revert_pending = Some(pending);
620        self
621    }
622
623    /// Set a custom convert-to-LLM function.
624    pub fn with_convert_to_llm(
625        mut self,
626        f: impl Fn(&[AgentMessage]) -> Vec<Message> + Send + Sync + 'static,
627    ) -> Self {
628        self.convert_to_llm = Some(Arc::new(f));
629        self
630    }
631
632    /// Set a custom transform-context function.
633    pub fn with_transform_context(
634        mut self,
635        f: impl Fn(Vec<AgentMessage>) -> Vec<AgentMessage> + Send + Sync + 'static,
636    ) -> Self {
637        self.transform_context = Some(Arc::new(f));
638        self
639    }
640
641    /// Set a custom block compaction strategy for Session-aware compaction.
642    /// (G5: stored on CompactionConfig, not BasicAgent.)
643    pub fn with_block_compaction_strategy(
644        mut self,
645        strategy: impl crate::context::BlockCompactionStrategy + 'static,
646    ) -> Self {
647        if let Some(ref mut ctx) = self.context_config {
648            ctx.compaction.block_strategy = Some(Arc::new(strategy));
649        }
650        self
651    }
652
653    /// Set the before-compaction-start hook (G1). Return `false` to skip compaction.
654    pub fn on_before_compaction_start(
655        mut self,
656        f: impl Fn(usize, usize) -> bool + Send + Sync + 'static,
657    ) -> Self {
658        self.before_compaction_start = Some(Arc::new(move |est, mc| {
659            let r = f(est, mc);
660            Box::pin(async move { r })
661        }));
662        self
663    }
664
665    /// Set the after-compaction-end hook (G1).
666    pub fn on_after_compaction_end(
667        mut self,
668        f: impl Fn(usize, usize, usize, usize) + Send + Sync + 'static,
669    ) -> Self {
670        self.after_compaction_end = Some(Arc::new(move |mb, ma, tb, ta| {
671            f(mb, ma, tb, ta);
672            Box::pin(async move {})
673        }));
674        self
675    }
676
677    /// Set the context translation strategy (G8) for cross-provider compatibility.
678    pub fn with_context_translation(
679        mut self,
680        strategy: Arc<dyn ContextTranslationStrategy>,
681    ) -> Self {
682        self.context_translation = Some(strategy);
683        self
684    }
685
686    /// Add a sub-agent tool. The sub-agent runs its own `agent_loop()` when invoked.
687    pub fn with_sub_agent(mut self, sub: crate::agents::SubAgentTool) -> Self {
688        self.tools.push(Arc::new(sub));
689        self
690    }
691
692    /// Disable automatic context compaction
693    pub fn without_context_management(mut self) -> Self {
694        self.context_config = None;
695        self.execution_limits = None;
696        self
697    }
698
699    // -- OpenAPI integration --
700
701    /// Load tools from an OpenAPI spec file and add them to the agent.
702    #[cfg(feature = "openapi")]
703    pub async fn with_openapi_file(
704        mut self,
705        path: impl AsRef<std::path::Path>,
706        config: crate::openapi::OpenApiConfig,
707        filter: &crate::openapi::OperationFilter,
708    ) -> Result<Self, crate::openapi::OpenApiError> {
709        let adapters = crate::openapi::OpenApiToolAdapter::from_file(path, config, filter).await?;
710        for adapter in adapters {
711            self.tools.push(Arc::new(adapter));
712        }
713        Ok(self)
714    }
715
716    /// Fetch an OpenAPI spec from a URL and add its tools to the agent.
717    #[cfg(feature = "openapi")]
718    pub async fn with_openapi_url(
719        mut self,
720        url: &str,
721        config: crate::openapi::OpenApiConfig,
722        filter: &crate::openapi::OperationFilter,
723    ) -> Result<Self, crate::openapi::OpenApiError> {
724        let adapters = crate::openapi::OpenApiToolAdapter::from_url(url, config, filter).await?;
725        for adapter in adapters {
726            self.tools.push(Arc::new(adapter));
727        }
728        Ok(self)
729    }
730
731    /// Parse an OpenAPI spec string and add its tools to the agent.
732    #[cfg(feature = "openapi")]
733    pub fn with_openapi_spec(
734        mut self,
735        spec_str: &str,
736        config: crate::openapi::OpenApiConfig,
737        filter: &crate::openapi::OperationFilter,
738    ) -> Result<Self, crate::openapi::OpenApiError> {
739        let adapters = crate::openapi::OpenApiToolAdapter::from_str(spec_str, config, filter)?;
740        for adapter in adapters {
741            self.tools.push(Arc::new(adapter));
742        }
743        Ok(self)
744    }
745
746    // -- MCP integration --
747
748    /// Connect to an MCP server via stdio and add its tools to the agent.
749    pub async fn with_mcp_server_stdio(
750        mut self,
751        command: &str, // EXECUTABLE — path or name of the MCP server binary (e.g. "npx", "python")
752        args: &[&str], // ARGV — command-line arguments to the binary (e.g. &["-y", "@my/mcp"])
753        env: Option<HashMap<String, String>>, // ENV OVERRIDES — extra env vars for the child process; None = inherit parent env
754    ) -> Result<Self, McpError> {
755        let client = McpClient::connect_stdio(command, args, env).await?;
756        let client = Arc::new(tokio::sync::Mutex::new(client));
757        let adapters = McpToolAdapter::from_client(client).await?;
758        for adapter in adapters {
759            self.tools.push(Arc::new(adapter));
760        }
761        Ok(self)
762    }
763
764    /// Connect to an MCP server via HTTP and add its tools to the agent.
765    pub async fn with_mcp_server_http(mut self, url: &str) -> Result<Self, McpError> {
766        let client = McpClient::connect_http(url).await?;
767        let client = Arc::new(tokio::sync::Mutex::new(client));
768        let adapters = McpToolAdapter::from_client(client).await?;
769        for adapter in adapters {
770            self.tools.push(Arc::new(adapter));
771        }
772        Ok(self)
773    }
774
775    // -- Ergonomic prompting wrappers --
776    // These inherent methods accept `impl Into<String>` so callers can pass `&str` directly.
777    // All other runtime methods (state, mutation, control, queues) are provided solely by
778    // the `Agent` trait impl below — import `use phi_core::Agent` (or `use phi_core::*`)
779    // to call them on a concrete `BasicAgent`.
780
781    /// Send a text prompt. Returns a stream of `AgentEvent`s.
782    ///
783    /// Accepts `impl Into<String>` (e.g. `&str`). The trait's [`Agent::prompt`] default
784    /// requires an owned `String`; use this inherent method to pass `&str` without `.to_string()`.
785    pub async fn prompt(&mut self, text: impl Into<String>) -> mpsc::UnboundedReceiver<AgentEvent> {
786        let (tx, rx) = mpsc::unbounded_channel();
787        let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
788        self.prompt_messages_with_sender(vec![msg], tx).await;
789        rx
790    }
791
792    /// Send a text prompt, streaming events to a caller-provided sender.
793    ///
794    /// Accepts `impl Into<String>` (e.g. `&str`).
795    pub async fn prompt_with_sender(
796        &mut self,
797        text: impl Into<String>,
798        tx: mpsc::UnboundedSender<AgentEvent>,
799    ) {
800        let msg = AgentMessage::Llm(LlmMessage::new(Message::user(text)));
801        self.prompt_messages_with_sender(vec![msg], tx).await;
802    }
803
804    // -- Internal --
805
806    /*
807    next_loop_id — derive the next loop_id for this config within this session.
808
809    DESIGN: loop_id encodes which config produced the loop, making identity self-documenting.
810      Format: "{session_id}.{effective_config_id}.{N}"
811      effective_config_id = config.config_id if set, else "{provider_id}.{model_slug}[.thinking]"
812
813    COUNTER: HashMap keyed by "{session_id}.{effective_config_id}".
814    Each unique (session, config) pair has its own counter — so two different configs
815    in the same session get independent counters (both start at .1), while two calls
816    with the same config get sequential numbers (.1, .2, .3).
817
818    SLUG: Non-alphanumeric chars in the model name are replaced with '-' so the loop_id
819    is a clean, URL-safe identifier. E.g. "claude-opus-4.5" → "claude-opus-4-5".
820    Hyphens are kept as-is (they're valid slug separators).
821    */
822    fn next_loop_id(&mut self, config: &AgentLoopConfig) -> String {
823        let effective_config_id = if let Some(ref id) = config.config_id {
824            id.clone()
825        } else {
826            let slugify = |s: &str| -> String {
827                s.chars()
828                    .map(|c| {
829                        if c.is_alphanumeric() || c == '-' {
830                            c
831                        } else {
832                            '-'
833                        }
834                    })
835                    .collect()
836            };
837            let thinking_part = if config.thinking_level != ThinkingLevel::Off {
838                ".thinking"
839            } else {
840                ""
841            };
842            format!(
843                "{}.{}{}",
844                config.model_config.provider,
845                slugify(&config.model_config.id),
846                thinking_part
847            )
848        };
849        let thread_key = format!("{}.{}", self.session_id, effective_config_id);
850        let n = self.loop_counters.entry(thread_key.clone()).or_insert(0);
851        *n += 1;
852        format!("{}.{}", thread_key, n)
853    }
854
855    /*
856    build_config — assemble AgentLoopConfig from BasicAgent's current state.
857
858    ARCHITECTURE: Why a separate build_config() method?
859
860    AgentLoopConfig is the "parameter bundle" for the stateless agent_loop() function.
861    build_config() constructs it fresh each call — it's not stored on BasicAgent.
862    This means: AgentLoopConfig borrows from BasicAgent (hence the lifetime `'_`),
863    and both share the same Arc<Mutex<>> queues via clone (cheap, no allocation).
864
865    RUST QUIRK: `move` closures for the queue callbacks
866
867    The steering/follow-up closures need to outlive build_config()'s stack frame
868    (they're stored in AgentLoopConfig and called later by the agent loop).
869    So they use `move` to capture `steering_queue` (Arc clone) and `steering_mode` (Copy).
870
871    We clone the Arc before the move:
872      let steering_queue = self.steering_queue.clone();
873    This gives the closure its own Arc reference to the same underlying Mutex.
874    The BasicAgent still holds its own Arc reference. Both are valid simultaneously.
875
876    `self.provider.clone()` — clone the Arc:
877      self.provider is Arc<dyn StreamProvider>
878      .clone() bumps the reference count — cheap, no data duplication
879    Both BasicAgent and AgentLoopConfig now share ownership of the same underlying provider.
880    */
881
882    // ── Standalone compaction API ────────────────────────────────────────
883
884    /// Run block-based compaction on the agent's session and emit the full event lifecycle.
885    ///
886    /// Emits: `AgentStart(Compaction)` → `CompactionStarted` → `CompactionEnded` → `AgentEnd`.
887    ///
888    /// Requires `self.session` to be `Some` and `self.context_config` to be `Some`.
889    /// Panics if either is missing.
890    /// No-op if `self.session` or `self.context_config` is `None`.
891    ///
892    /// 0.9.0: now `async fn` to drive the async `BlockCompactionStrategy`.
893    pub async fn compact_context_with_sender(
894        &mut self,
895        tx: &tokio::sync::mpsc::UnboundedSender<AgentEvent>,
896    ) {
897        let (Some(session), Some(ctx_config)) =
898            (self.session.as_mut(), self.context_config.as_ref())
899        else {
900            return; // No session or config — nothing to compact
901        };
902        let comp = &ctx_config.compaction;
903        let max_tokens = ctx_config.max_context_tokens;
904
905        let loop_id = self
906            .last_loop_id
907            .clone()
908            .unwrap_or_else(|| "compaction".to_string());
909
910        let _ = tx.send(AgentEvent::AgentStart {
911            agent_id: self.agent_id.clone(),
912            session_id: self.session_id.clone(),
913            loop_id: loop_id.clone(),
914            parent_loop_id: self.last_loop_id.clone(),
915            continuation_kind: ContinuationKind::Compaction,
916            timestamp: chrono::Utc::now(),
917            metadata: None,
918            config_snapshot: None, // Compaction pass — no LLM config relevant
919        });
920
921        let msgs_before = self.messages.len();
922        let tokens_before = crate::context::total_tokens(&self.messages);
923
924        let _ = tx.send(AgentEvent::CompactionStarted {
925            loop_id: loop_id.clone(),
926            estimated_tokens: tokens_before,
927            message_count: msgs_before,
928            timestamp: chrono::Utc::now(),
929        });
930
931        let strategy: &dyn crate::context::BlockCompactionStrategy =
932            &crate::context::DefaultBlockCompaction;
933        let current_lid = self.last_loop_id.as_deref().unwrap_or("");
934
935        // Sync messages into the current loop record
936        if let Some(record) = session.get_loop_mut(current_lid) {
937            record.messages = self.messages.clone();
938        }
939
940        crate::context::compact_session_loops(
941            session,
942            current_lid,
943            strategy,
944            comp,
945            max_tokens,
946            None,
947        )
948        .await;
949        self.messages = crate::context::build_context_from_session(
950            session,
951            current_lid,
952            comp,
953            max_tokens,
954            None,
955        );
956
957        let msgs_after = self.messages.len();
958        let tokens_after = crate::context::total_tokens(&self.messages);
959        let chain = session.loop_chain_to(current_lid);
960        let loops_compacted = chain
961            .iter()
962            .filter(|lid| {
963                session
964                    .get_loop(lid)
965                    .map(|r| r.compaction_block.is_some())
966                    .unwrap_or(false)
967            })
968            .count();
969
970        let _ = tx.send(AgentEvent::CompactionEnded {
971            loop_id: loop_id.clone(),
972            messages_before: msgs_before,
973            messages_after: msgs_after,
974            estimated_tokens_before: tokens_before,
975            estimated_tokens_after: tokens_after,
976            loops_compacted,
977            timestamp: chrono::Utc::now(),
978        });
979
980        let _ = tx.send(AgentEvent::AgentEnd {
981            loop_id,
982            messages: vec![],
983            usage: Usage::default(),
984            timestamp: chrono::Utc::now(),
985            rejection: None,
986        });
987    }
988
989    /// Fire-and-forget compaction. Returns the number of loops that received
990    /// new `CompactionBlock`s.
991    ///
992    /// Requires `self.session` to be `Some` and `self.context_config` to be `Some`.
993    /// Returns 0 if `self.session` or `self.context_config` is `None`.
994    ///
995    /// 0.9.0: now `async fn` to drive the async `BlockCompactionStrategy`.
996    pub async fn compact_context(&mut self) -> usize {
997        let (Some(session), Some(ctx_config)) =
998            (self.session.as_mut(), self.context_config.as_ref())
999        else {
1000            return 0; // No session or config — nothing to compact
1001        };
1002        let comp = &ctx_config.compaction;
1003        let max_tokens = ctx_config.max_context_tokens;
1004
1005        let strategy: &dyn crate::context::BlockCompactionStrategy =
1006            &crate::context::DefaultBlockCompaction;
1007        let current_lid = self.last_loop_id.as_deref().unwrap_or("");
1008
1009        if let Some(record) = session.get_loop_mut(current_lid) {
1010            record.messages = self.messages.clone();
1011        }
1012
1013        crate::context::compact_session_loops(
1014            session,
1015            current_lid,
1016            strategy,
1017            comp,
1018            max_tokens,
1019            None,
1020        )
1021        .await;
1022        self.messages = crate::context::build_context_from_session(
1023            session,
1024            current_lid,
1025            comp,
1026            max_tokens,
1027            None,
1028        );
1029
1030        let chain = session.loop_chain_to(current_lid);
1031        chain
1032            .iter()
1033            .filter(|lid| {
1034                session
1035                    .get_loop(lid)
1036                    .map(|r| r.compaction_block.is_some())
1037                    .unwrap_or(false)
1038            })
1039            .count()
1040    }
1041
1042    // -- Internal --
1043
1044    pub fn build_config(&self) -> Result<AgentLoopConfig, super::agent::AgentBuildError> {
1045        // Clone Arc handles before the move closures capture them
1046        let steering_queue = self.steering_queue.clone(); // cheap Arc clone
1047        let steering_mode = self.steering_mode; // Copy — no clone needed
1048
1049        let follow_up_queue = self.follow_up_queue.clone();
1050        let follow_up_mode = self.follow_up_mode;
1051
1052        // BasicAgent's constructor requires a `ModelConfig`, so this branch is
1053        // unreachable — wrap in Ok unconditionally. The Result is in the trait
1054        // signature for the benefit of custom Agent implementors that may not
1055        // have a model_config.
1056        Ok(AgentLoopConfig {
1057            model_config: self.model_config.clone(),
1058            provider_override: self.provider_override.clone(),
1059            thinking_level: self.thinking_level,
1060            max_tokens: self.max_tokens,
1061            temperature: self.temperature,
1062            convert_to_llm: self.convert_to_llm.clone(),
1063            transform_context: self.transform_context.clone(),
1064            get_steering_messages: Some(Box::new(move || {
1065                // This closure runs each time the agent loop checks for steering messages.
1066                // `move` captured: steering_queue (Arc clone), steering_mode (Copy)
1067                let mut queue = lock_queue(&steering_queue); // poison-tolerant lock
1068                match steering_mode {
1069                    QueueMode::OneAtATime => {
1070                        if queue.is_empty() {
1071                            vec![]
1072                        } else {
1073                            vec![queue.remove(0)] // remove and return first element
1074                        }
1075                    }
1076                    QueueMode::All => queue.drain(..).collect(), // drain all and return
1077                }
1078            })),
1079            context_config: self.context_config.clone(),
1080            execution_limits: self.execution_limits.clone(),
1081            cache_config: self.cache_config.clone(),
1082            tool_execution: self.tool_execution.clone(),
1083            tool_timeout: self.tool_timeout,
1084            response_format: self.response_format.clone(),
1085            retry_config: self.retry_config.clone(),
1086            get_follow_up_messages: Some(Box::new(move || {
1087                let mut queue = lock_queue(&follow_up_queue);
1088                match follow_up_mode {
1089                    QueueMode::OneAtATime => {
1090                        if queue.is_empty() {
1091                            vec![]
1092                        } else {
1093                            vec![queue.remove(0)]
1094                        }
1095                    }
1096                    QueueMode::All => queue.drain(..).collect(),
1097                }
1098            })),
1099            before_turn: self.before_turn.clone(),
1100            after_turn: self.after_turn.clone(),
1101            before_loop: self.before_loop.clone(),
1102            after_loop: self.after_loop.clone(),
1103            before_tool_execution: self.before_tool_execution.clone(),
1104            after_tool_execution: self.after_tool_execution.clone(),
1105            before_tool_execution_update: self.before_tool_execution_update.clone(),
1106            after_tool_execution_update: self.after_tool_execution_update.clone(),
1107            before_compaction_start: self.before_compaction_start.clone(),
1108            after_compaction_end: self.after_compaction_end.clone(),
1109            on_error: self.on_error.clone(),
1110            input_filters: self.input_filters.clone(),
1111            first_turn_trigger: TurnTrigger::User,
1112            config_id: self.config_id.clone(),
1113            context_translation: self.context_translation.clone(),
1114            prun_pending: self.prun_pending.clone(),
1115            revert_pending: self.revert_pending.clone(),
1116        })
1117    }
1118
1119    // ── Session management ────────────────────────────────────────────────────
1120
1121    /// Immediately rotate to a new `session_id`.
1122    ///
1123    /// All subsequent loops will belong to the new session. Loop counters are
1124    /// reset so the new session's loop ids start from `.1`.
1125    ///
1126    /// Returns the newly assigned `session_id`.
1127    pub fn new_session(&mut self) -> String {
1128        self.session_id = uuid::Uuid::new_v4().to_string();
1129        self.loop_counters.clear();
1130        self.last_loop_id = None;
1131        // Clear last_active_at so the new session is treated as never-used.
1132        // Without this, a subsequent check_and_rotate would see the old timestamp
1133        // and immediately rotate again without any prompt having run.
1134        self.last_active_at = None;
1135        self.session_id.clone()
1136    }
1137
1138    /// Rotate to a new session if the agent has been idle for longer than `threshold`.
1139    ///
1140    /// Idleness is measured from the last [`prompt_messages_with_sender`][Self::prompt_messages_with_sender]
1141    /// call. If no prompt has ever been issued, returns `None` (no rotation needed
1142    /// — the session has never been used).
1143    ///
1144    /// Returns `Some(new_session_id)` if rotation happened, `None` otherwise.
1145    pub fn check_and_rotate(&mut self, threshold: std::time::Duration) -> Option<String> {
1146        let last = self.last_active_at?;
1147        let elapsed = (chrono::Utc::now() - last)
1148            .to_std()
1149            .unwrap_or(std::time::Duration::ZERO);
1150        if elapsed > threshold {
1151            Some(self.new_session())
1152        } else {
1153            None
1154        }
1155    }
1156}
1157
1158// ── Agent trait implementation ────────────────────────────────────────────────
1159
1160#[async_trait::async_trait]
1161impl Agent for BasicAgent {
1162    // ── Core async implementations ────────────────────────────────────────────
1163
1164    /// Send messages as a prompt, streaming events to a caller-provided sender.
1165    async fn prompt_messages_with_sender(
1166        &mut self,
1167        messages: Vec<AgentMessage>,
1168        tx: mpsc::UnboundedSender<AgentEvent>,
1169    ) {
1170        /*
1171        RUST QUIRK: `assert!()` — panic with a message if condition is false
1172
1173        `assert!(condition, "message")` panics if condition is false.
1174        This is a "programmer error" guard (not a runtime error) — you should
1175        never call prompt() on an already-streaming BasicAgent. If you do, it's a bug.
1176
1177        Python analogy: `assert not self.is_streaming, "..."` (but assert can be
1178        disabled with -O in Python; Rust's assert! is ALWAYS enabled in production.
1179        For debug-only assertions, use `debug_assert!()` in Rust.)
1180        */
1181        assert!(
1182            !self.is_streaming,
1183            "Agent is already streaming. Use steer() or follow_up()."
1184        );
1185
1186        self.last_active_at = Some(chrono::Utc::now());
1187        let cancel = CancellationToken::new();
1188        self.cancel = Some(cancel.clone()); // store a clone so abort() can cancel it
1189        self.is_streaming = true;
1190
1191        /*
1192        RUST QUIRK: `std::mem::take(&mut self.tools)` — efficient ownership transfer
1193
1194        `std::mem::take(dest)` replaces `*dest` with its Default value and returns
1195        the original. For Vec, Default is an empty Vec (no allocation).
1196
1197        Why not `self.tools.clone()`?
1198        For single-loop execution we MOVE the tools into the context (zero allocation).
1199        Arc::clone is cheap (just a reference-count increment), but we still prefer
1200        a move here since BasicAgent temporarily relinquishes the tools anyway.
1201        We want to MOVE the tools into the context, not copy them.
1202
1203        Why not just `self.tools` (move out)?
1204        You can't partially move out of a struct that you still have a &mut reference to.
1205        `mem::take` is the safe way to move a field out, leaving a valid default behind.
1206
1207        After the loop, we move the tools BACK: `self.tools = context.tools;`
1208        So the BasicAgent relinquishes ownership for the duration of the loop,
1209        then reclaims it afterward. Zero allocation.
1210
1211        Python analogy: temporarily `tools = self.tools; self.tools = []` — then restore.
1212        */
1213        // Build config first (only borrows self), then derive loop_id (mutates loop_counters).
1214        // `.expect` is safe: BasicAgent always supplies a model_config (required by ctor).
1215        let config = self
1216            .build_config()
1217            .expect("BasicAgent always provides a model_config");
1218        let loop_id = self.next_loop_id(&config);
1219        self.last_loop_id = Some(loop_id.clone());
1220
1221        let mut context = AgentContext {
1222            system_prompt: self.system_prompt.clone(),
1223            messages: self.messages.clone(),
1224            tools: std::mem::take(&mut self.tools), // MOVE tools out, leaving self.tools = []
1225            agent_id: Some(self.agent_id.clone()),
1226            session_id: Some(self.session_id.clone()),
1227            loop_id: Some(loop_id),
1228            parent_loop_id: None, // origin — no parent
1229            continuation_kind: None,
1230            session: self.session.take(), // Move session into context for block-based compaction
1231            user_context: Vec::new(),
1232            inrun_context: Vec::new(),
1233            active_node_id: None,
1234            next_node_id: 0,
1235        };
1236
1237        let _new_messages = agent_loop(messages, &mut context, &config, tx, cancel).await;
1238
1239        self.tools = context.tools;
1240        self.messages = context.messages;
1241        self.session = context.session; // Reclaim session after loop
1242        self.is_streaming = false;
1243        self.cancel = None;
1244    }
1245
1246    /// Continue from current context, streaming events to a caller-provided sender.
1247    ///
1248    /// `kind` describes how this continuation relates to prior loops:
1249    /// - `Default` — unspecified continuation (preserves current semantics; use when the
1250    ///   Rerun/Branch distinction is not relevant to the caller)
1251    /// - `Rerun { tag }` — retry from the same context state (auto-generates a UTC timestamp tag)
1252    /// - `Branch { tag }` — explore a different path from the same starting point (same tag)
1253    async fn continue_loop_with_sender(
1254        &mut self,
1255        tx: mpsc::UnboundedSender<AgentEvent>, // OBSERVER — events from this continuation pushed here
1256        kind: ContinuationKind,                // CONTINUATION KIND — Default | Rerun | Branch
1257    ) {
1258        assert!(!self.is_streaming, "Agent is already streaming.");
1259        assert!(!self.messages.is_empty(), "No messages to continue from.");
1260
1261        let cancel = CancellationToken::new();
1262        self.cancel = Some(cancel.clone());
1263        self.is_streaming = true;
1264
1265        // Build config first (only borrows self), then derive loop_id (mutates loop_counters).
1266        // `.expect` is safe: BasicAgent always supplies a model_config (required by ctor).
1267        let config = self
1268            .build_config()
1269            .expect("BasicAgent always provides a model_config");
1270        let loop_id = self.next_loop_id(&config);
1271        let parent_loop_id = self.last_loop_id.clone(); // points to the loop this continues from
1272        self.last_loop_id = Some(loop_id.clone());
1273
1274        // Auto-generate the timestamp tag for Rerun/Branch (RFC 3339 UTC).
1275        let tag = chrono::Utc::now().to_rfc3339();
1276        let kind_with_tag = match kind {
1277            ContinuationKind::Initial => ContinuationKind::Default, // Initial → Default when continuing
1278            ContinuationKind::Default => ContinuationKind::Default,
1279            ContinuationKind::Rerun { .. } => ContinuationKind::Rerun { tag },
1280            ContinuationKind::Branch { .. } => ContinuationKind::Branch { tag },
1281            ContinuationKind::Compaction => ContinuationKind::Compaction,
1282        };
1283
1284        // Move tools temporarily into context for the loop; restored after
1285        let mut context = AgentContext {
1286            system_prompt: self.system_prompt.clone(),
1287            messages: self.messages.clone(),
1288            tools: std::mem::take(&mut self.tools),
1289            agent_id: Some(self.agent_id.clone()),
1290            session_id: Some(self.session_id.clone()),
1291            loop_id: Some(loop_id),
1292            parent_loop_id,
1293            continuation_kind: Some(kind_with_tag),
1294            session: self.session.take(),
1295            user_context: Vec::new(),
1296            inrun_context: Vec::new(),
1297            active_node_id: None,
1298            next_node_id: 0,
1299        };
1300
1301        let _new_messages = agent_loop_continue(&mut context, &config, tx, cancel).await;
1302
1303        self.tools = context.tools;
1304        self.messages = context.messages;
1305        self.session = context.session;
1306        self.is_streaming = false;
1307        self.cancel = None;
1308    }
1309
1310    // ── State ─────────────────────────────────────────────────────────────────
1311
1312    fn messages(&self) -> &[AgentMessage] {
1313        &self.messages
1314    }
1315
1316    fn is_streaming(&self) -> bool {
1317        self.is_streaming
1318    }
1319
1320    fn agent_id(&self) -> &str {
1321        &self.agent_id
1322    }
1323
1324    fn session_id(&self) -> &str {
1325        &self.session_id
1326    }
1327
1328    fn last_loop_id(&self) -> Option<&str> {
1329        self.last_loop_id.as_deref()
1330    }
1331
1332    // ── Message mutation ──────────────────────────────────────────────────────
1333
1334    fn clear_messages(&mut self) {
1335        self.messages.clear();
1336    }
1337
1338    fn append_message(&mut self, msg: AgentMessage) {
1339        self.messages.push(msg);
1340    }
1341
1342    fn replace_messages(&mut self, msgs: Vec<AgentMessage>) {
1343        self.messages = msgs;
1344    }
1345
1346    fn save_messages(&self) -> Result<String, serde_json::Error> {
1347        serde_json::to_string(&self.messages)
1348    }
1349
1350    fn restore_messages(&mut self, json: &str) -> Result<(), serde_json::Error> {
1351        let msgs: Vec<AgentMessage> = serde_json::from_str(json)?;
1352        self.messages = msgs;
1353        Ok(())
1354    }
1355
1356    fn set_tools(&mut self, tools: Vec<Arc<dyn AgentTool>>) {
1357        self.tools = tools;
1358    }
1359
1360    // ── Control ───────────────────────────────────────────────────────────────
1361
1362    fn abort(&self) {
1363        if let Some(ref cancel) = self.cancel {
1364            cancel.cancel();
1365        }
1366    }
1367
1368    fn reset(&mut self) {
1369        self.messages.clear();
1370        self.clear_all_queues();
1371        self.is_streaming = false;
1372        self.cancel = None;
1373    }
1374
1375    // ── Steering/follow-up queues ─────────────────────────────────────────────
1376
1377    /*
1378    RUST QUIRK: `&self` vs `&mut self` — `steer()` takes shared reference
1379
1380    Usually, methods that modify the struct take `&mut self` (exclusive borrow).
1381    But `steer()` takes `&self` (shared borrow). How can it modify the queue?
1382
1383    Answer: Interior mutability via `Arc<Mutex<...>>`.
1384    The Mutex provides runtime-checked exclusive access inside a shared reference.
1385    You call `.lock()` to acquire the lock (blocks until available), then mutate.
1386
1387    This design allows `steer()` to be called from OTHER threads or closures
1388    that only have &-access to the BasicAgent (e.g., a button click handler).
1389
1390    Lock acquisition uses `lock_queue()` (see top of file) which tolerates
1391    `Mutex` poisoning. A panic in a hook or tool callback would otherwise crash
1392    every subsequent `steer()` / `follow_up()` call even though the underlying
1393    queue is recoverable data.
1394    */
1395    fn steer(&self, msg: AgentMessage) {
1396        lock_queue(&self.steering_queue).push(msg);
1397    }
1398
1399    fn follow_up(&self, msg: AgentMessage) {
1400        lock_queue(&self.follow_up_queue).push(msg);
1401    }
1402
1403    fn clear_steering_queue(&self) {
1404        lock_queue(&self.steering_queue).clear();
1405    }
1406
1407    fn clear_follow_up_queue(&self) {
1408        lock_queue(&self.follow_up_queue).clear();
1409    }
1410
1411    fn set_steering_mode(&mut self, mode: QueueMode) {
1412        self.steering_mode = mode;
1413    }
1414
1415    fn set_follow_up_mode(&mut self, mode: QueueMode) {
1416        self.follow_up_mode = mode;
1417    }
1418
1419    // ── Configuration access ─────────────────────────────────────────────
1420
1421    fn profile(&self) -> Option<&AgentProfile> {
1422        self.profile.as_ref()
1423    }
1424
1425    fn system_prompt(&self) -> &str {
1426        &self.system_prompt
1427    }
1428
1429    fn model_config(&self) -> Option<&ModelConfig> {
1430        Some(&self.model_config)
1431    }
1432
1433    fn thinking_level(&self) -> ThinkingLevel {
1434        self.thinking_level
1435    }
1436
1437    fn temperature(&self) -> Option<f32> {
1438        self.temperature
1439    }
1440
1441    fn max_tokens(&self) -> Option<u32> {
1442        self.max_tokens
1443    }
1444
1445    fn context_config(&self) -> Option<&ContextConfig> {
1446        self.context_config.as_ref()
1447    }
1448
1449    fn execution_limits(&self) -> Option<&ExecutionLimits> {
1450        self.execution_limits.as_ref()
1451    }
1452
1453    fn cache_config(&self) -> CacheConfig {
1454        self.cache_config.clone()
1455    }
1456
1457    fn tool_execution(&self) -> ToolExecutionStrategy {
1458        self.tool_execution.clone()
1459    }
1460
1461    fn tool_timeout(&self) -> Option<std::time::Duration> {
1462        self.tool_timeout
1463    }
1464
1465    fn response_format(&self) -> crate::provider::ResponseFormat {
1466        self.response_format.clone()
1467    }
1468
1469    fn retry_config(&self) -> crate::provider::retry::RetryConfig {
1470        self.retry_config.clone()
1471    }
1472
1473    // ── Session ──────────────────────────────────────────────────────────
1474
1475    fn session(&self) -> Option<&crate::session::Session> {
1476        self.session.as_ref()
1477    }
1478
1479    fn workspace(&self) -> Option<&std::path::Path> {
1480        self.workspace.as_deref()
1481    }
1482
1483    // ── Hook setters ─────────────────────────────────────────────────────
1484
1485    fn set_before_turn(&mut self, f: Option<BeforeTurnFn>) {
1486        self.before_turn = f;
1487    }
1488
1489    fn set_after_turn(&mut self, f: Option<AfterTurnFn>) {
1490        self.after_turn = f;
1491    }
1492
1493    fn set_before_loop(&mut self, f: Option<BeforeLoopFn>) {
1494        self.before_loop = f;
1495    }
1496
1497    fn set_after_loop(&mut self, f: Option<AfterLoopFn>) {
1498        self.after_loop = f;
1499    }
1500
1501    fn set_before_tool_execution(&mut self, f: Option<BeforeToolExecutionFn>) {
1502        self.before_tool_execution = f;
1503    }
1504
1505    fn set_after_tool_execution(&mut self, f: Option<AfterToolExecutionFn>) {
1506        self.after_tool_execution = f;
1507    }
1508
1509    fn set_before_tool_execution_update(&mut self, f: Option<BeforeToolExecutionUpdateFn>) {
1510        self.before_tool_execution_update = f;
1511    }
1512
1513    fn set_after_tool_execution_update(&mut self, f: Option<AfterToolExecutionUpdateFn>) {
1514        self.after_tool_execution_update = f;
1515    }
1516
1517    fn set_convert_to_llm(&mut self, f: Option<ConvertToLlmFn>) {
1518        self.convert_to_llm = f;
1519    }
1520
1521    fn set_transform_context(&mut self, f: Option<TransformContextFn>) {
1522        self.transform_context = f;
1523    }
1524
1525    fn set_block_compaction_strategy(
1526        &mut self,
1527        s: Option<Arc<dyn crate::context::BlockCompactionStrategy>>,
1528    ) {
1529        // G5: delegate to context_config.compaction
1530        if let Some(ref mut ctx) = self.context_config {
1531            ctx.compaction.block_strategy = s;
1532        }
1533    }
1534
1535    fn set_before_compaction_start(&mut self, f: Option<BeforeCompactionStartFn>) {
1536        self.before_compaction_start = f;
1537    }
1538
1539    fn set_after_compaction_end(&mut self, f: Option<AfterCompactionEndFn>) {
1540        self.after_compaction_end = f;
1541    }
1542
1543    fn set_context_translation(&mut self, s: Option<Arc<dyn ContextTranslationStrategy>>) {
1544        self.context_translation = s;
1545    }
1546
1547    fn context_translation(&self) -> Option<Arc<dyn ContextTranslationStrategy>> {
1548        self.context_translation.clone()
1549    }
1550}
1551
1552#[cfg(test)]
1553mod tests {
1554    use super::*;
1555
1556    /// Helper: poison a `Mutex<Vec<u32>>` by panicking inside a guard on a child thread.
1557    fn poison_mutex<T: Send + 'static>(m: Arc<Mutex<T>>) {
1558        let m = m.clone();
1559        let _ = std::thread::spawn(move || {
1560            let _guard = m.lock().unwrap();
1561            panic!("intentional panic to poison the mutex");
1562        })
1563        .join(); // join — captures and drops the panic
1564    }
1565
1566    #[test]
1567    fn lock_queue_recovers_inner_vec_after_poison() {
1568        let q: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(vec![1, 2, 3]));
1569        poison_mutex(q.clone());
1570        assert!(
1571            q.is_poisoned(),
1572            "test pre-condition: mutex should be poisoned"
1573        );
1574
1575        // lock_queue must not panic; it must surface the original Vec.
1576        let guard = lock_queue(&q);
1577        assert_eq!(*guard, vec![1, 2, 3]);
1578    }
1579
1580    #[test]
1581    fn basic_agent_steer_survives_queue_poison() {
1582        let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1583
1584        // Poison the steering queue by panicking inside a guard.
1585        let q = agent.steering_queue.clone();
1586        let _ = std::thread::spawn(move || {
1587            let _g = q.lock().unwrap();
1588            panic!("poison the steering queue");
1589        })
1590        .join();
1591        assert!(agent.steering_queue.is_poisoned());
1592
1593        // The public steer/clear API should still work.
1594        agent.steer(AgentMessage::Llm(LlmMessage::new(Message::user("hi"))));
1595        assert_eq!(lock_queue(&agent.steering_queue).len(), 1);
1596        agent.clear_steering_queue();
1597        assert_eq!(lock_queue(&agent.steering_queue).len(), 0);
1598    }
1599
1600    #[test]
1601    fn basic_agent_follow_up_survives_queue_poison() {
1602        let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"));
1603
1604        let q = agent.follow_up_queue.clone();
1605        let _ = std::thread::spawn(move || {
1606            let _g = q.lock().unwrap();
1607            panic!("poison the follow-up queue");
1608        })
1609        .join();
1610        assert!(agent.follow_up_queue.is_poisoned());
1611
1612        agent.follow_up(AgentMessage::Llm(LlmMessage::new(Message::user("more"))));
1613        assert_eq!(lock_queue(&agent.follow_up_queue).len(), 1);
1614        agent.clear_follow_up_queue();
1615        assert_eq!(lock_queue(&agent.follow_up_queue).len(), 0);
1616    }
1617}