Skip to main content

rab/agent/
agent_session.rs

1use crate::agent::branch_summary::{collect_entries_for_branch_summary, generate_branch_summary};
2use crate::agent::compaction::{
3    self, CompactionReason, CompactionResult, CompactionSettings, compact, prepare_compaction,
4};
5use crate::agent::extension::Extension;
6use crate::agent::session::SessionManager;
7use crate::agent::session_storage::{InMemorySessionStorage, SessionMetadata, SessionStorage};
8use crate::agent::types::{message_dedup_key, message_text, tool_result_message, user_message};
9use std::collections::HashSet;
10use yoagent::types::AgentMessage;
11use yoagent::types::Content;
12
13// ── Compaction lifecycle events ─────────────────────────────────────
14
15/// Events emitted during the compaction lifecycle.
16/// Matches pi's `compaction_start` / `compaction_end` event semantics.
17#[derive(Debug, Clone)]
18pub enum CompactionEvent {
19    /// Compaction has started with the given reason.
20    Start { reason: CompactionReason },
21    /// Compaction completed successfully.
22    End {
23        reason: CompactionReason,
24        result: CompactionResult,
25        aborted: bool,
26        will_retry: bool,
27        error_message: Option<String>,
28    },
29}
30
31/// Callback for compaction lifecycle events.
32pub type CompactionEventCallback = Box<dyn Fn(&CompactionEvent) + Send + Sync>;
33
34/// A deferred session write, queued during an agent run.
35/// Pi-compatible: batched and flushed at turn boundaries.
36#[allow(clippy::enum_variant_names)]
37pub(crate) enum PendingSessionWrite {
38    ModelChange { provider: String, model_id: String },
39    ThinkingLevelChange(String),
40    ActiveToolsChange(Vec<String>),
41}
42
43/// Bridges the agent loop events and session persistence.
44///
45/// Handles:
46/// - Event-driven message persistence (persist tool results as they arrive)
47/// - Automatic model/thinking/tool change detection and persistence
48///
49/// Usage:
50/// ```ignore
51/// let mut agent_session = AgentSession::new(session);
52///
53/// // In your agent event handler:
54/// agent_session.handle_event(&event);
55///
56/// // For model/thinking/tool changes at runtime:
57/// agent_session.on_model_change("opencode_go", "deepseek-v4-pro");
58/// agent_session.on_thinking_level_change("high");
59/// ```
60pub struct AgentSession {
61    /// The core session (wraps SessionStorage).
62    session: crate::agent::session::Session,
63    /// Session storage directory on disk.
64    session_dir: std::path::PathBuf,
65    /// Working directory for this session.
66    cwd: std::path::PathBuf,
67    /// Whether session persistence is enabled.
68    persist: bool,
69    /// Whether the session file has been written at least once (lazy write).
70    flushed: bool,
71    /// Last known model for change detection.
72    last_model: Option<(String, String)>,
73    /// Last known thinking level for change detection.
74    last_thinking_level: String,
75    /// Last known active tool names for change detection.
76    last_active_tools: Option<Vec<String>>,
77    /// IDs of messages already persisted via event-driven persistence,
78    /// to avoid duplicates when AgentEnd fires.
79    persisted_message_ids: HashSet<String>,
80    /// Tool call IDs already persisted (for tool result dedup).
81    persisted_tool_call_ids: HashSet<String>,
82    /// Compaction settings (default: enabled).
83    compaction_settings: CompactionSettings,
84    /// Model context window in tokens (for shouldCompact check).
85    context_window: u64,
86    /// Model name to use for compaction LLM calls.
87    model_name: String,
88    /// API key for compaction LLM calls.
89    compaction_api_key: Option<String>,
90    /// Model configuration for compaction LLM calls (base URL, compat flags, etc.).
91    model_config: Option<yoagent::provider::model::ModelConfig>,
92    /// Current thinking level from the session (for compaction summarization).
93    thinking_level: yoagent::types::ThinkingLevel,
94    /// Registered extensions (for compaction hooks).
95    extensions: Vec<Box<dyn Extension>>,
96    /// Lifecycle event listeners.
97    event_listeners: Vec<CompactionEventCallback>,
98    /// Whether overflow recovery has already been attempted (prevents loops).
99    overflow_recovery_attempted: bool,
100    /// Cancellation token for in-progress compaction (pi-compatible abort).
101    compaction_cancel: crate::agent::extension::Cancel,
102    /// Queued session writes, flushed at turn boundaries (pi-compatible).
103    pending_writes: Vec<PendingSessionWrite>,
104}
105
106impl AgentSession {
107    /// Create a new AgentSession from a SessionManager (extracts inner Session + config).
108    pub fn new(mgr: SessionManager) -> Self {
109        // Snapshot current metadata from the session context for change detection.
110        let ctx = mgr.build_session_context();
111
112        // Extract config before consuming mgr
113        let cwd = mgr.cwd().to_path_buf();
114        let session_dir = mgr.session_dir().to_path_buf();
115        let persist = mgr.is_persisted();
116        let session = mgr.into_session();
117
118        // If the session has no thinking level change entries, set last_thinking_level
119        // to empty so the first on_thinking_level_change always detects a change.
120        let has_thinking_entries = !session.find_entries("thinking_level_change").is_empty();
121        let last_thinking_level = if has_thinking_entries {
122            ctx.thinking_level
123        } else {
124            String::new()
125        };
126
127        Self {
128            session,
129            session_dir,
130            cwd,
131            persist,
132            flushed: false,
133            last_model: ctx.model,
134            last_thinking_level,
135            last_active_tools: ctx.active_tool_names,
136            persisted_message_ids: HashSet::new(),
137            persisted_tool_call_ids: HashSet::new(),
138            compaction_settings: CompactionSettings::default(),
139            context_window: 200_000,
140            model_name: String::new(),
141            compaction_api_key: None,
142            model_config: None,
143            thinking_level: yoagent::types::ThinkingLevel::Off,
144            extensions: Vec::new(),
145            event_listeners: Vec::new(),
146            overflow_recovery_attempted: false,
147            compaction_cancel: crate::agent::extension::Cancel::new(),
148            pending_writes: Vec::new(),
149        }
150    }
151
152    // ── Static factory methods ─────────────────────────────────
153
154    /// Create a new persisted session.
155    pub fn create(cwd: &std::path::Path, session_dir: Option<&std::path::Path>) -> Self {
156        Self::new(SessionManager::create(cwd, session_dir))
157    }
158
159    /// Open a specific session file.
160    pub fn open(
161        path: &std::path::Path,
162        session_dir: Option<&std::path::Path>,
163        cwd_override: Option<&std::path::Path>,
164    ) -> Self {
165        Self::new(SessionManager::open(path, session_dir, cwd_override))
166    }
167
168    /// Create an in-memory session (no persistence).
169    pub fn in_memory(cwd: &std::path::Path) -> Self {
170        Self::new(SessionManager::in_memory(cwd))
171    }
172
173    /// Continue most recent session or create new.
174    pub fn continue_recent(cwd: &std::path::Path, session_dir: Option<&std::path::Path>) -> Self {
175        Self::new(SessionManager::continue_recent(cwd, session_dir))
176    }
177
178    /// Fork a session from another project directory.
179    pub fn fork_from(
180        source_path: &std::path::Path,
181        target_cwd: &std::path::Path,
182        session_dir: Option<&std::path::Path>,
183        options: Option<&crate::agent::session::NewSessionOptions>,
184    ) -> std::io::Result<Self> {
185        SessionManager::fork_from(source_path, target_cwd, session_dir, options).map(Self::new)
186    }
187
188    /// Configure compaction with API key, model, context window, and model config.
189    pub fn set_compaction_config(
190        &mut self,
191        api_key: String,
192        model_name: &str,
193        context_window: u64,
194        model_config: Option<yoagent::provider::model::ModelConfig>,
195    ) {
196        self.compaction_api_key = Some(api_key);
197        self.model_name = model_name.to_string();
198        self.context_window = context_window;
199        self.model_config = model_config;
200    }
201
202    /// Enable or disable auto-compaction.
203    pub fn set_auto_compact(&mut self, enabled: bool) {
204        self.compaction_settings.enabled = enabled;
205    }
206
207    /// Sync the thinking level from the session context.
208    /// Should be called after the session context changes.
209    pub fn sync_thinking_level(&mut self) {
210        let ctx = self.session.build_session_context();
211        let level_str = ctx.thinking_level.to_lowercase();
212        self.thinking_level = match level_str.as_str() {
213            "off" => yoagent::types::ThinkingLevel::Off,
214            "minimal" => yoagent::types::ThinkingLevel::Minimal,
215            "low" => yoagent::types::ThinkingLevel::Low,
216            "medium" => yoagent::types::ThinkingLevel::Medium,
217            "high" => yoagent::types::ThinkingLevel::High,
218            _ => yoagent::types::ThinkingLevel::Off,
219        };
220    }
221
222    /// Get the current compaction settings (mutable, for modification).
223    pub fn compaction_settings_mut(&mut self) -> &mut CompactionSettings {
224        &mut self.compaction_settings
225    }
226
227    /// Get the current compaction settings.
228    pub fn compaction_settings(&self) -> &CompactionSettings {
229        &self.compaction_settings
230    }
231
232    /// Set the list of extensions (for compaction hooks).
233    pub fn set_extensions(&mut self, extensions: Vec<Box<dyn Extension>>) {
234        self.extensions = extensions;
235    }
236
237    /// Abort any in-progress compaction (matching pi's `abortCompaction()`).
238    /// The cancellation will be picked up by extension hooks on their next
239    /// `cancel.is_cancelled()` check.
240    pub fn abort_compaction(&self) {
241        self.compaction_cancel.cancel();
242    }
243
244    /// Register a compaction lifecycle event listener.
245    pub fn on_compaction_event(&mut self, callback: CompactionEventCallback) {
246        self.event_listeners.push(callback);
247    }
248
249    /// Emit a compaction event to all registered listeners.
250    fn emit_compaction_event(&self, event: &CompactionEvent) {
251        for listener in &self.event_listeners {
252            listener(event);
253        }
254    }
255
256    /// Reset overflow recovery state (called when starting a new turn).
257    pub fn reset_overflow_recovery(&mut self) {
258        self.overflow_recovery_attempted = false;
259        self.compaction_cancel = crate::agent::extension::Cancel::new();
260    }
261
262    /// Check if a provider error indicates context overflow.
263    /// Matches pi's context overflow detection patterns.
264    pub fn is_context_overflow_error(msg: &AgentMessage) -> bool {
265        let text = message_text(msg);
266        let lower = text.to_lowercase();
267        // Pi-compatible: detect HTTP 413, "prompt too long", "context_length_exceeded", etc.
268        lower.contains("413")
269            || lower.contains("request_too_large")
270            || lower.contains("prompt too long")
271            || lower.contains("context_length_exceeded")
272            || lower.contains("context overflow")
273            || lower.contains("max context length")
274            || lower.contains("exceeded max tokens")
275            || lower.contains("maximum context length")
276    }
277
278    // ── Accessors ─────────────────────────────────────────────────
279
280    /// Borrow the underlying session manager.
281    /// Borrow the underlying Session.
282    pub fn session(&self) -> &crate::agent::session::Session {
283        &self.session
284    }
285
286    /// Mutably borrow the underlying Session.
287    pub fn session_mut(&mut self) -> &mut crate::agent::session::Session {
288        &mut self.session
289    }
290
291    /// Consume and return the inner Session.
292    pub fn into_session(self) -> crate::agent::session::Session {
293        self.session
294    }
295
296    /// Ensure the session file has been written (lazy write on first assistant message).
297    pub fn ensure_flushed(&mut self) {
298        if self.flushed || !self.persist {
299            return;
300        }
301        let id = self.session.session_id();
302        let cwd_str = self.cwd.to_string_lossy().to_string();
303        let parent_session = self.session.metadata().parent_session_path.clone();
304        let created_at = self.session.metadata().created_at.clone();
305        let file_ts = created_at.replace([':', '.'], "-");
306        let file_path = self.session_dir.join(format!("{}_{}.jsonl", file_ts, id));
307
308        let existing_entries = self.session.get_entries();
309
310        match crate::agent::session_storage::JsonlSessionStorage::create(
311            file_path,
312            &cwd_str,
313            &id,
314            parent_session,
315        ) {
316            Ok(mut file_storage) => {
317                for entry in &existing_entries {
318                    if let Err(e) = file_storage.append_entry(entry.clone()) {
319                        eprintln!("Warning: failed to write entry to session file: {}", e);
320                    }
321                }
322                self.session = crate::agent::session::Session::new(Box::new(file_storage));
323                self.flushed = true;
324            }
325            Err(e) => {
326                eprintln!("Warning: failed to create session file: {}", e);
327                self.flushed = true;
328            }
329        }
330    }
331
332    // ── App-level accessors ────────────────────────────────────
333
334    pub fn cwd(&self) -> &std::path::Path {
335        &self.cwd
336    }
337
338    pub fn session_dir(&self) -> &std::path::Path {
339        &self.session_dir
340    }
341
342    pub fn is_persisted(&self) -> bool {
343        self.persist
344    }
345
346    pub fn session_id(&self) -> String {
347        self.session.session_id()
348    }
349
350    pub fn session_file(&self) -> Option<std::path::PathBuf> {
351        self.session.session_file()
352    }
353
354    pub fn session_name(&self) -> Option<String> {
355        self.session.session_name()
356    }
357
358    // ── Pending writes (pi-compatible batching) ─────────────────
359
360    /// Queue a session write for batching. Pi-compatible: flushes at turn boundaries.
361    pub(crate) fn publish_session_write(&mut self, write: PendingSessionWrite) {
362        self.pending_writes.push(write);
363    }
364
365    /// Flush all queued writes to the underlying session storage.
366    /// Called at the end of `on_agent_event` and `on_agent_end`.
367    pub fn flush_pending_writes(&mut self) {
368        for write in self.pending_writes.drain(..) {
369            match write {
370                PendingSessionWrite::ModelChange { provider, model_id } => {
371                    self.session.append_model_change(&provider, &model_id);
372                }
373                PendingSessionWrite::ThinkingLevelChange(level) => {
374                    self.session.append_thinking_level_change(&level);
375                }
376                PendingSessionWrite::ActiveToolsChange(tools) => {
377                    self.session.append_active_tools_change(&tools);
378                }
379            }
380        }
381    }
382
383    // ── Model / thinking / tool change tracking ─────────────────
384
385    /// Persist a model change if it differs from the last known model.
386    /// Returns true if a change entry was enqueued.
387    pub fn on_model_change(&mut self, provider: &str, model_id: &str) -> bool {
388        let new = (provider.to_string(), model_id.to_string());
389        if self.last_model.as_ref() != Some(&new) {
390            self.publish_session_write(PendingSessionWrite::ModelChange {
391                provider: provider.to_string(),
392                model_id: model_id.to_string(),
393            });
394            self.last_model = Some(new);
395            true
396        } else {
397            false
398        }
399    }
400
401    /// Persist a thinking level change if it differs from the last known level.
402    /// Returns true if a change entry was enqueued.
403    pub fn on_thinking_level_change(&mut self, level: &str) -> bool {
404        if self.last_thinking_level != level {
405            self.publish_session_write(PendingSessionWrite::ThinkingLevelChange(level.to_string()));
406            self.last_thinking_level = level.to_string();
407            true
408        } else {
409            false
410        }
411    }
412
413    /// Persist an active tools change if it differs from the last known set.
414    /// Returns true if a change entry was enqueued.
415    pub fn on_active_tools_change(&mut self, tools: &[String]) -> bool {
416        let tools_vec = tools.to_vec();
417        if self.last_active_tools.as_ref() != Some(&tools_vec) {
418            self.publish_session_write(PendingSessionWrite::ActiveToolsChange(tools_vec.clone()));
419            self.last_active_tools = Some(tools_vec);
420            true
421        } else {
422            false
423        }
424    }
425
426    // ── User message submission ───────────────────────────────────
427
428    /// Reset the session (creates a new empty session) and clear
429    /// all tracked state so the new session starts fresh.
430    pub fn new_session(&mut self) {
431        // Create a fresh in-memory session
432        let meta = SessionMetadata {
433            id: uuid::Uuid::new_v4().to_string(),
434            created_at: chrono::Utc::now().to_rfc3339(),
435            cwd: self.cwd.to_string_lossy().to_string(),
436            path: None,
437            parent_session_path: None,
438        };
439        let storage = Box::new(InMemorySessionStorage::new(meta));
440        self.session = crate::agent::session::Session::new(storage);
441        self.flushed = false;
442        self.persisted_message_ids.clear();
443        self.persisted_tool_call_ids.clear();
444        self.last_model = None;
445        self.last_thinking_level = String::new();
446        self.last_active_tools = None;
447        self.compaction_cancel = crate::agent::extension::Cancel::new();
448    }
449
450    /// Append a user message to the session and register it as persisted.
451    /// Returns the entry id.
452    pub fn send_user_message(&mut self, content: &str) -> String {
453        let msg = user_message(content);
454        let id = self.session.append_message(&msg);
455        self.persisted_message_ids.insert(message_dedup_key(&msg));
456        id
457    }
458
459    /// Append a user message (pre-constructed) to the session.
460    /// Returns the entry id.
461    pub fn send_user_message_obj(&mut self, msg: &AgentMessage) -> String {
462        let id = self.session.append_message(msg);
463        self.persisted_message_ids.insert(message_dedup_key(msg));
464        id
465    }
466
467    // ── Event-driven persistence ──────────────────────────────────
468
469    /// Process an agent event for automatic persistence (pi-compatible).
470    ///
471    /// - `ToolResult` events are persisted immediately (crash-safe).
472    /// - `MessageEnd` persists every message in real-time (pi-compatible, crash-safe).
473    /// - `AgentEnd` persists any remaining assistant messages not yet captured.
474    ///
475    /// Call this from your agent event handler alongside any UI updates.
476    /// This is the mode-agnostic persistence handler, matching pi's `_handleAgentEvent`.
477    pub fn on_agent_event(&mut self, event: &yoagent::types::AgentEvent) {
478        use yoagent::types::AgentEvent as YoEvent;
479        match event {
480            YoEvent::ToolExecutionEnd {
481                tool_call_id,
482                tool_name,
483                result,
484                is_error,
485                ..
486            } => {
487                let content = result
488                    .content
489                    .iter()
490                    .filter_map(|c| {
491                        if let Content::Text { text } = c {
492                            Some(text.clone())
493                        } else {
494                            None
495                        }
496                    })
497                    .collect::<Vec<_>>()
498                    .join("");
499                let msg = tool_result_message(tool_call_id, tool_name, content, *is_error);
500                self.persist_message(&msg);
501                // Pi-compatible: flush tool result writes immediately (crash-safe)
502                self.flush_pending_writes();
503            }
504            YoEvent::MessageEnd { message } => {
505                // Pi-compatible: reset overflow recovery when a user message arrives
506                // (matches pi's _overflowRecoveryAttempted reset in message_start for user role).
507                if crate::agent::types::message_is_user(message) {
508                    self.reset_overflow_recovery();
509                }
510                // Pi-compatible: persist every message immediately on message_end,
511                // not deferred to agent_end. Extension messages use custom_message
512                // entries (excluded from LLM context); all others use regular messages.
513                if crate::agent::types::message_is_extension(message) {
514                    self.persist_extension_message(message);
515                } else {
516                    self.persist_message_end(message);
517                }
518            }
519            YoEvent::AgentEnd { messages } => {
520                self.on_agent_end(messages);
521            }
522            _ => {}
523        }
524    }
525
526    /// Persist all new messages from an agent run that haven't been
527    /// persisted yet (e.g. assistant messages not captured by event-driven
528    /// persistence, or error messages).
529    ///
530    /// Call this when the agent loop finishes, or let `handle_event` do it
531    /// automatically on `AgentEnd`.
532    pub fn on_agent_end(&mut self, messages: &[AgentMessage]) {
533        for msg in messages {
534            if crate::agent::types::message_is_user(msg) {
535                continue;
536            }
537            // Skip Llm-form error messages — they're already persisted as
538            // Extension (custom_message) in the MessageEnd handler and should
539            // not be persisted again as Llm messages, which would be included
540            // in the LLM context on subsequent turns.
541            if crate::agent::types::message_error(msg).is_some() {
542                continue;
543            }
544            // Skip tool results already persisted via event-driven persistence
545            if crate::agent::types::message_is_tool_result(msg)
546                && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
547                && self.persisted_tool_call_ids.contains(tcid)
548            {
549                continue;
550            }
551            if !self.persisted_message_ids.contains(&message_dedup_key(msg)) {
552                self.session.append_message(msg);
553                self.persisted_message_ids.insert(message_dedup_key(msg));
554            }
555        }
556        // Pi-compatible: flush queued metadata writes at turn end
557        self.flush_pending_writes();
558    }
559
560    // ── Compaction ────────────────────────────────────────────────
561
562    /// Check if compaction should run and execute it if needed.
563    /// Should be called after the agent finishes a turn (after on_agent_end).
564    /// Returns `true` if compaction was performed.
565    pub async fn check_auto_compact(&mut self) -> Result<bool, String> {
566        Ok(self
567            ._run_compaction(CompactionReason::Threshold, None, false)
568            .await?
569            .is_some())
570    }
571
572    /// Run compaction after a context overflow error.
573    /// If `will_retry` is true, the agent turn will be retried after compaction.
574    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if recovery already attempted.
575    pub async fn check_overflow_compact(&mut self, will_retry: bool) -> Result<bool, String> {
576        if self.overflow_recovery_attempted {
577            return Ok(false);
578        }
579        self.overflow_recovery_attempted = true;
580        Ok(self
581            ._run_compaction(CompactionReason::Overflow, None, will_retry)
582            .await?
583            .is_some())
584    }
585
586    /// Run compaction manually (ignores auto-compact setting).
587    /// Returns the compaction summary text, or an error message.
588    pub async fn run_manual_compact(
589        &mut self,
590        custom_instructions: Option<&str>,
591    ) -> Result<String, String> {
592        let result = self
593            ._run_compaction(CompactionReason::Manual, custom_instructions, false)
594            .await?;
595        Ok(result.map(|r| r.summary).unwrap_or_default())
596    }
597
598    /// Internal: run compaction with the given reason, emitting lifecycle events.
599    /// Returns the CompactionResult if compaction was performed, or None if skipped.
600    async fn _run_compaction(
601        &mut self,
602        reason: CompactionReason,
603        custom_instructions: Option<&str>,
604        will_retry: bool,
605    ) -> Result<Option<CompactionResult>, String> {
606        // For threshold compaction, check if auto-compact is enabled
607        if reason == CompactionReason::Threshold && !self.compaction_settings.enabled {
608            return Ok(None);
609        }
610
611        if self.compaction_api_key.is_none() || self.model_name.is_empty() {
612            return Ok(None);
613        }
614
615        // Create a fresh cancellation token for this compaction run
616        // (pi-compatible: matches AbortController per compaction call)
617        self.compaction_cancel = crate::agent::extension::Cancel::new();
618        let cancel = self.compaction_cancel.clone();
619
620        // Emit compaction_start
621        self.emit_compaction_event(&CompactionEvent::Start { reason });
622
623        // Check for cancellation before proceeding
624        if cancel.is_cancelled() {
625            return Ok(None);
626        }
627
628        let entries = self.session.get_entries();
629
630        // Check threshold for auto-compact
631        if reason == CompactionReason::Threshold {
632            let context_msgs = self.session.build_session_context().messages;
633            let context_tokens = compaction::estimate_context_tokens(&context_msgs);
634            if !compaction::should_compact(
635                context_tokens,
636                self.context_window,
637                &self.compaction_settings,
638            ) {
639                return Ok(None);
640            }
641        }
642
643        let Some(prep) = prepare_compaction(&entries, &self.compaction_settings) else {
644            return Ok(None);
645        };
646
647        // Extension hooks: before_compact
648        let mut from_hook = false;
649        let mut hook_summary: Option<String> = None;
650        let mut hook_details: Option<serde_json::Value> = None;
651
652        for ext in &self.extensions {
653            if cancel.is_cancelled() {
654                break;
655            }
656            if let Some(result) = ext.before_compact(
657                &prep.first_kept_entry_id,
658                prep.tokens_before,
659                &reason.to_string(),
660                &cancel,
661            ) {
662                if result.cancel {
663                    self.emit_compaction_event(&CompactionEvent::End {
664                        reason,
665                        aborted: true,
666                        will_retry: false,
667                        error_message: Some("Compaction cancelled by extension".to_string()),
668                        result: CompactionResult {
669                            summary: String::new(),
670                            first_kept_entry_id: prep.first_kept_entry_id.clone(),
671                            tokens_before: prep.tokens_before,
672                            estimated_tokens_after: 0,
673                            details: None,
674                        },
675                    });
676                    return Ok(None);
677                }
678                if result.summary.is_some() {
679                    hook_summary = result.summary;
680                    hook_details = result.details;
681                    from_hook = true;
682                    break;
683                }
684            }
685        }
686
687        let result = if let Some(summary) = hook_summary {
688            // Extension provided custom summary
689            CompactionResult {
690                summary,
691                first_kept_entry_id: prep.first_kept_entry_id.clone(),
692                tokens_before: prep.tokens_before,
693                estimated_tokens_after: 0, // will be computed after append
694                details: hook_details,
695            }
696        } else {
697            // Call provider for summarization
698            let api_key = self.compaction_api_key.as_ref().unwrap();
699            compact(
700                &prep,
701                api_key,
702                &self.model_name,
703                custom_instructions,
704                self.thinking_level,
705                self.model_config.clone(),
706            )
707            .await?
708        };
709
710        // Append the compaction entry to the session
711        self.session.append_compaction(
712            &result.summary,
713            &result.first_kept_entry_id,
714            result.tokens_before,
715            result.details.clone(),
716            Some(from_hook),
717        );
718
719        // Compute estimated tokens after compaction
720        let context_after = self.session.build_session_context().messages;
721        let estimated_tokens_after = compaction::estimate_context_tokens(&context_after);
722
723        let final_result = CompactionResult {
724            estimated_tokens_after,
725            ..result
726        };
727
728        // Extension hooks: after_compact
729        for ext in &self.extensions {
730            if cancel.is_cancelled() {
731                break;
732            }
733            ext.after_compact(
734                &final_result.summary,
735                &final_result.first_kept_entry_id,
736                final_result.tokens_before,
737                final_result.estimated_tokens_after,
738                from_hook,
739                &reason.to_string(),
740                &cancel,
741            );
742        }
743
744        // Emit compaction_end
745        self.emit_compaction_event(&CompactionEvent::End {
746            reason,
747            result: final_result.clone(),
748            aborted: false,
749            will_retry,
750            error_message: None,
751        });
752
753        Ok(Some(final_result))
754    }
755
756    // ── Branch summarization ───────────────────────────────────────
757
758    /// Summarise the abandoned branch when navigating to a different node.
759    ///
760    /// Collects entries between `old_leaf_id` and the common ancestor with
761    /// `target_id`, summarises them via the provider, and appends a
762    /// `BranchSummaryEntry` to the session.
763    ///
764    /// Returns the summary text, or an error message.
765    pub async fn summarize_branch_navigation(
766        &mut self,
767        old_leaf_id: Option<&str>,
768        target_id: &str,
769    ) -> Result<String, String> {
770        if self.compaction_api_key.is_none() || self.model_name.is_empty() {
771            return Err("No provider configured for summarization".to_string());
772        }
773
774        let (entries, _common_ancestor) =
775            collect_entries_for_branch_summary(self.session(), old_leaf_id, target_id);
776
777        if entries.is_empty() {
778            return Err("No abandoned entries to summarize".to_string());
779        }
780
781        let api_key = self.compaction_api_key.as_ref().unwrap();
782        generate_branch_summary(
783            &mut self.session,
784            &entries,
785            target_id,
786            api_key,
787            &self.model_name,
788            self.thinking_level,
789            self.model_config.clone(),
790        )
791        .await
792    }
793
794    /// Move the leaf pointer to an earlier entry (starts a new branch).
795    /// Optionally summarizes the abandoned path if a provider is configured.
796    /// Returns the branch summary text if summarization was performed.
797    pub async fn set_branch(&mut self, branch_from_id: &str) -> Result<Option<String>, String> {
798        let old_leaf = self.session.get_leaf_id();
799
800        let summary = if self.compaction_api_key.is_some()
801            && !self.model_name.is_empty()
802            && let Some(ref old) = old_leaf
803            && old != branch_from_id
804        {
805            // Summarize the abandoned path
806            match self
807                .summarize_branch_navigation(Some(old), branch_from_id)
808                .await
809            {
810                Ok(s) => Some(s),
811                Err(e) => {
812                    // Non-fatal: still allow the branch move
813                    eprintln!("Warning: branch summarization failed: {}", e);
814                    None
815                }
816            }
817        } else {
818            None
819        };
820
821        self.session
822            .set_leaf_id(Some(branch_from_id))
823            .map_err(|e| format!("Failed to set branch: {}", e))?;
824
825        Ok(summary)
826    }
827
828    /// Persist a tool result message (public so the agent loop can persist crash-safely).
829    /// Deduplicates by tool_call_id.
830    pub fn persist_tool_result(
831        &mut self,
832        tool_call_id: &str,
833        tool_name: &str,
834        content: String,
835        is_error: bool,
836    ) {
837        let msg = tool_result_message(tool_call_id, tool_name, content, is_error);
838        self.persist_message(&msg);
839    }
840
841    /// Persist an Extension message as a `custom_message` session entry (pi-compatible).
842    /// Extension messages are NOT persisted as regular messages — they use the
843    /// `custom_message` entry type which supports `custom_type`, `display`, and `details`.
844    pub fn persist_extension_message(&mut self, msg: &AgentMessage) {
845        let Some(kind) = crate::agent::types::message_extension_kind(msg) else {
846            return;
847        };
848        let text = crate::agent::types::message_extension_text(msg)
849            .unwrap_or_else(|| crate::agent::types::message_text(msg));
850        let content = serde_json::json!({"text": text});
851        self.session
852            .append_custom_message_entry(kind, content, true, None);
853    }
854
855    /// Persist a single message on `message_end` (pi-compatible pattern).
856    ///
857    /// Pi persists every message (user, assistant, toolResult) immediately on `message_end`,
858    /// not deferred to `agent_end`. This method handles dedup for tool results (already
859    /// persisted via `persist_tool_result`) and dedup by text for other message types.
860    pub fn persist_message_end(&mut self, msg: &AgentMessage) {
861        // Tool results are already persisted crash-safely via persist_tool_result on
862        // ToolExecutionEnd — skip them here to avoid duplicates.
863        if crate::agent::types::message_is_tool_result(msg)
864            && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
865            && self.persisted_tool_call_ids.contains(tcid)
866        {
867            return;
868        }
869        // Use persist_message for dedup (checks both tool_call_id and text)
870        self.persist_message(msg);
871    }
872
873    // ── Internal helpers ──────────────────────────────────────────
874
875    /// Persist a single message, skipping if already persisted (dedup).
876    /// Tool results are deduped by tool_call_id; other messages by text.
877    /// Persist a message directly (pi-compatible: messages are written immediately, not queued).
878    fn persist_message(&mut self, msg: &AgentMessage) {
879        // Dedup tool results by tool_call_id
880        if crate::agent::types::message_is_tool_result(msg)
881            && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
882        {
883            if self.persisted_tool_call_ids.contains(tcid) {
884                return;
885            }
886            self.session.append_message(msg);
887            self.persisted_tool_call_ids.insert(tcid.to_string());
888            self.persisted_message_ids.insert(message_dedup_key(msg));
889            return;
890        }
891        // Dedup other messages by dedup key (role + content signature)
892        if self.persisted_message_ids.contains(&message_dedup_key(msg)) {
893            return;
894        }
895        self.session.append_message(msg);
896        self.persisted_message_ids.insert(message_dedup_key(msg));
897    }
898}