1use crate::agent::branch_summary::{collect_entries_for_branch_summary, generate_branch_summary};
2use crate::agent::compaction::{
3 self, CompactionReason, CompactionResult, CompactionSettings, compact, prepare_compaction,
4};
5use crate::agent::extension::Extension;
6use crate::agent::session::SessionManager;
7use crate::agent::session_storage::{InMemorySessionStorage, SessionMetadata, SessionStorage};
8use crate::agent::types::{message_dedup_key, message_text, tool_result_message, user_message};
9use std::collections::HashSet;
10use yoagent::types::AgentMessage;
11use yoagent::types::Content;
12
13#[derive(Debug, Clone)]
18pub enum CompactionEvent {
19 Start { reason: CompactionReason },
21 End {
23 reason: CompactionReason,
24 result: CompactionResult,
25 aborted: bool,
26 will_retry: bool,
27 error_message: Option<String>,
28 },
29}
30
31pub type CompactionEventCallback = Box<dyn Fn(&CompactionEvent) + Send + Sync>;
33
34#[allow(clippy::enum_variant_names)]
37pub(crate) enum PendingSessionWrite {
38 ModelChange { provider: String, model_id: String },
39 ThinkingLevelChange(String),
40 ActiveToolsChange(Vec<String>),
41}
42
43pub struct AgentSession {
61 session: crate::agent::session::Session,
63 session_dir: std::path::PathBuf,
65 cwd: std::path::PathBuf,
67 persist: bool,
69 flushed: bool,
71 last_model: Option<(String, String)>,
73 last_thinking_level: String,
75 last_active_tools: Option<Vec<String>>,
77 persisted_message_ids: HashSet<String>,
80 persisted_tool_call_ids: HashSet<String>,
82 compaction_settings: CompactionSettings,
84 context_window: u64,
86 model_name: String,
88 compaction_api_key: Option<String>,
90 model_config: Option<yoagent::provider::model::ModelConfig>,
92 thinking_level: yoagent::types::ThinkingLevel,
94 extensions: Vec<Box<dyn Extension>>,
96 event_listeners: Vec<CompactionEventCallback>,
98 overflow_recovery_attempted: bool,
100 compaction_cancel: crate::agent::extension::Cancel,
102 pending_writes: Vec<PendingSessionWrite>,
104}
105
106impl AgentSession {
107 pub fn new(mgr: SessionManager) -> Self {
109 let ctx = mgr.build_session_context();
111
112 let cwd = mgr.cwd().to_path_buf();
114 let session_dir = mgr.session_dir().to_path_buf();
115 let persist = mgr.is_persisted();
116 let session = mgr.into_session();
117
118 let has_thinking_entries = !session.find_entries("thinking_level_change").is_empty();
121 let last_thinking_level = if has_thinking_entries {
122 ctx.thinking_level
123 } else {
124 String::new()
125 };
126
127 Self {
128 session,
129 session_dir,
130 cwd,
131 persist,
132 flushed: false,
133 last_model: ctx.model,
134 last_thinking_level,
135 last_active_tools: ctx.active_tool_names,
136 persisted_message_ids: HashSet::new(),
137 persisted_tool_call_ids: HashSet::new(),
138 compaction_settings: CompactionSettings::default(),
139 context_window: 200_000,
140 model_name: String::new(),
141 compaction_api_key: None,
142 model_config: None,
143 thinking_level: yoagent::types::ThinkingLevel::Off,
144 extensions: Vec::new(),
145 event_listeners: Vec::new(),
146 overflow_recovery_attempted: false,
147 compaction_cancel: crate::agent::extension::Cancel::new(),
148 pending_writes: Vec::new(),
149 }
150 }
151
152 pub fn create(cwd: &std::path::Path, session_dir: Option<&std::path::Path>) -> Self {
156 Self::new(SessionManager::create(cwd, session_dir))
157 }
158
159 pub fn open(
161 path: &std::path::Path,
162 session_dir: Option<&std::path::Path>,
163 cwd_override: Option<&std::path::Path>,
164 ) -> Self {
165 Self::new(SessionManager::open(path, session_dir, cwd_override))
166 }
167
168 pub fn in_memory(cwd: &std::path::Path) -> Self {
170 Self::new(SessionManager::in_memory(cwd))
171 }
172
173 pub fn continue_recent(cwd: &std::path::Path, session_dir: Option<&std::path::Path>) -> Self {
175 Self::new(SessionManager::continue_recent(cwd, session_dir))
176 }
177
178 pub fn fork_from(
180 source_path: &std::path::Path,
181 target_cwd: &std::path::Path,
182 session_dir: Option<&std::path::Path>,
183 options: Option<&crate::agent::session::NewSessionOptions>,
184 ) -> std::io::Result<Self> {
185 SessionManager::fork_from(source_path, target_cwd, session_dir, options).map(Self::new)
186 }
187
188 pub fn set_compaction_config(
190 &mut self,
191 api_key: String,
192 model_name: &str,
193 context_window: u64,
194 model_config: Option<yoagent::provider::model::ModelConfig>,
195 ) {
196 self.compaction_api_key = Some(api_key);
197 self.model_name = model_name.to_string();
198 self.context_window = context_window;
199 self.model_config = model_config;
200 }
201
202 pub fn set_auto_compact(&mut self, enabled: bool) {
204 self.compaction_settings.enabled = enabled;
205 }
206
207 pub fn sync_thinking_level(&mut self) {
210 let ctx = self.session.build_session_context();
211 let level_str = ctx.thinking_level.to_lowercase();
212 self.thinking_level = match level_str.as_str() {
213 "off" => yoagent::types::ThinkingLevel::Off,
214 "minimal" => yoagent::types::ThinkingLevel::Minimal,
215 "low" => yoagent::types::ThinkingLevel::Low,
216 "medium" => yoagent::types::ThinkingLevel::Medium,
217 "high" => yoagent::types::ThinkingLevel::High,
218 _ => yoagent::types::ThinkingLevel::Off,
219 };
220 }
221
222 pub fn compaction_settings_mut(&mut self) -> &mut CompactionSettings {
224 &mut self.compaction_settings
225 }
226
227 pub fn compaction_settings(&self) -> &CompactionSettings {
229 &self.compaction_settings
230 }
231
232 pub fn set_extensions(&mut self, extensions: Vec<Box<dyn Extension>>) {
234 self.extensions = extensions;
235 }
236
237 pub fn abort_compaction(&self) {
241 self.compaction_cancel.cancel();
242 }
243
244 pub fn on_compaction_event(&mut self, callback: CompactionEventCallback) {
246 self.event_listeners.push(callback);
247 }
248
249 fn emit_compaction_event(&self, event: &CompactionEvent) {
251 for listener in &self.event_listeners {
252 listener(event);
253 }
254 }
255
256 pub fn reset_overflow_recovery(&mut self) {
258 self.overflow_recovery_attempted = false;
259 self.compaction_cancel = crate::agent::extension::Cancel::new();
260 }
261
262 pub fn is_context_overflow_error(msg: &AgentMessage) -> bool {
265 let text = message_text(msg);
266 let lower = text.to_lowercase();
267 lower.contains("413")
269 || lower.contains("request_too_large")
270 || lower.contains("prompt too long")
271 || lower.contains("context_length_exceeded")
272 || lower.contains("context overflow")
273 || lower.contains("max context length")
274 || lower.contains("exceeded max tokens")
275 || lower.contains("maximum context length")
276 }
277
278 pub fn session(&self) -> &crate::agent::session::Session {
283 &self.session
284 }
285
286 pub fn session_mut(&mut self) -> &mut crate::agent::session::Session {
288 &mut self.session
289 }
290
291 pub fn into_session(self) -> crate::agent::session::Session {
293 self.session
294 }
295
296 pub fn ensure_flushed(&mut self) {
298 if self.flushed || !self.persist {
299 return;
300 }
301 let id = self.session.session_id();
302 let cwd_str = self.cwd.to_string_lossy().to_string();
303 let parent_session = self.session.metadata().parent_session_path.clone();
304 let created_at = self.session.metadata().created_at.clone();
305 let file_ts = created_at.replace([':', '.'], "-");
306 let file_path = self.session_dir.join(format!("{}_{}.jsonl", file_ts, id));
307
308 let existing_entries = self.session.get_entries();
309
310 match crate::agent::session_storage::JsonlSessionStorage::create(
311 file_path,
312 &cwd_str,
313 &id,
314 parent_session,
315 ) {
316 Ok(mut file_storage) => {
317 for entry in &existing_entries {
318 if let Err(e) = file_storage.append_entry(entry.clone()) {
319 eprintln!("Warning: failed to write entry to session file: {}", e);
320 }
321 }
322 self.session = crate::agent::session::Session::new(Box::new(file_storage));
323 self.flushed = true;
324 }
325 Err(e) => {
326 eprintln!("Warning: failed to create session file: {}", e);
327 self.flushed = true;
328 }
329 }
330 }
331
332 pub fn cwd(&self) -> &std::path::Path {
335 &self.cwd
336 }
337
338 pub fn session_dir(&self) -> &std::path::Path {
339 &self.session_dir
340 }
341
342 pub fn is_persisted(&self) -> bool {
343 self.persist
344 }
345
346 pub fn session_id(&self) -> String {
347 self.session.session_id()
348 }
349
350 pub fn session_file(&self) -> Option<std::path::PathBuf> {
351 self.session.session_file()
352 }
353
354 pub fn session_name(&self) -> Option<String> {
355 self.session.session_name()
356 }
357
358 pub(crate) fn publish_session_write(&mut self, write: PendingSessionWrite) {
362 self.pending_writes.push(write);
363 }
364
365 pub fn flush_pending_writes(&mut self) {
368 for write in self.pending_writes.drain(..) {
369 match write {
370 PendingSessionWrite::ModelChange { provider, model_id } => {
371 self.session.append_model_change(&provider, &model_id);
372 }
373 PendingSessionWrite::ThinkingLevelChange(level) => {
374 self.session.append_thinking_level_change(&level);
375 }
376 PendingSessionWrite::ActiveToolsChange(tools) => {
377 self.session.append_active_tools_change(&tools);
378 }
379 }
380 }
381 }
382
383 pub fn on_model_change(&mut self, provider: &str, model_id: &str) -> bool {
388 let new = (provider.to_string(), model_id.to_string());
389 if self.last_model.as_ref() != Some(&new) {
390 self.publish_session_write(PendingSessionWrite::ModelChange {
391 provider: provider.to_string(),
392 model_id: model_id.to_string(),
393 });
394 self.last_model = Some(new);
395 true
396 } else {
397 false
398 }
399 }
400
401 pub fn on_thinking_level_change(&mut self, level: &str) -> bool {
404 if self.last_thinking_level != level {
405 self.publish_session_write(PendingSessionWrite::ThinkingLevelChange(level.to_string()));
406 self.last_thinking_level = level.to_string();
407 true
408 } else {
409 false
410 }
411 }
412
413 pub fn on_active_tools_change(&mut self, tools: &[String]) -> bool {
416 let tools_vec = tools.to_vec();
417 if self.last_active_tools.as_ref() != Some(&tools_vec) {
418 self.publish_session_write(PendingSessionWrite::ActiveToolsChange(tools_vec.clone()));
419 self.last_active_tools = Some(tools_vec);
420 true
421 } else {
422 false
423 }
424 }
425
426 pub fn new_session(&mut self) {
431 let meta = SessionMetadata {
433 id: uuid::Uuid::new_v4().to_string(),
434 created_at: chrono::Utc::now().to_rfc3339(),
435 cwd: self.cwd.to_string_lossy().to_string(),
436 path: None,
437 parent_session_path: None,
438 };
439 let storage = Box::new(InMemorySessionStorage::new(meta));
440 self.session = crate::agent::session::Session::new(storage);
441 self.flushed = false;
442 self.persisted_message_ids.clear();
443 self.persisted_tool_call_ids.clear();
444 self.last_model = None;
445 self.last_thinking_level = String::new();
446 self.last_active_tools = None;
447 self.compaction_cancel = crate::agent::extension::Cancel::new();
448 }
449
450 pub fn send_user_message(&mut self, content: &str) -> String {
453 let msg = user_message(content);
454 let id = self.session.append_message(&msg);
455 self.persisted_message_ids.insert(message_dedup_key(&msg));
456 id
457 }
458
459 pub fn send_user_message_obj(&mut self, msg: &AgentMessage) -> String {
462 let id = self.session.append_message(msg);
463 self.persisted_message_ids.insert(message_dedup_key(msg));
464 id
465 }
466
467 pub fn on_agent_event(&mut self, event: &yoagent::types::AgentEvent) {
478 use yoagent::types::AgentEvent as YoEvent;
479 match event {
480 YoEvent::ToolExecutionEnd {
481 tool_call_id,
482 tool_name,
483 result,
484 is_error,
485 ..
486 } => {
487 let content = result
488 .content
489 .iter()
490 .filter_map(|c| {
491 if let Content::Text { text } = c {
492 Some(text.clone())
493 } else {
494 None
495 }
496 })
497 .collect::<Vec<_>>()
498 .join("");
499 let msg = tool_result_message(tool_call_id, tool_name, content, *is_error);
500 self.persist_message(&msg);
501 self.flush_pending_writes();
503 }
504 YoEvent::MessageEnd { message } => {
505 if crate::agent::types::message_is_user(message) {
508 self.reset_overflow_recovery();
509 }
510 if crate::agent::types::message_is_extension(message) {
514 self.persist_extension_message(message);
515 } else {
516 self.persist_message_end(message);
517 }
518 }
519 YoEvent::AgentEnd { messages } => {
520 self.on_agent_end(messages);
521 }
522 _ => {}
523 }
524 }
525
526 pub fn on_agent_end(&mut self, messages: &[AgentMessage]) {
533 for msg in messages {
534 if crate::agent::types::message_is_user(msg) {
535 continue;
536 }
537 if crate::agent::types::message_error(msg).is_some() {
542 continue;
543 }
544 if crate::agent::types::message_is_tool_result(msg)
546 && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
547 && self.persisted_tool_call_ids.contains(tcid)
548 {
549 continue;
550 }
551 if !self.persisted_message_ids.contains(&message_dedup_key(msg)) {
552 self.session.append_message(msg);
553 self.persisted_message_ids.insert(message_dedup_key(msg));
554 }
555 }
556 self.flush_pending_writes();
558 }
559
560 pub async fn check_auto_compact(&mut self) -> Result<bool, String> {
566 Ok(self
567 ._run_compaction(CompactionReason::Threshold, None, false)
568 .await?
569 .is_some())
570 }
571
572 pub async fn check_overflow_compact(&mut self, will_retry: bool) -> Result<bool, String> {
576 if self.overflow_recovery_attempted {
577 return Ok(false);
578 }
579 self.overflow_recovery_attempted = true;
580 Ok(self
581 ._run_compaction(CompactionReason::Overflow, None, will_retry)
582 .await?
583 .is_some())
584 }
585
586 pub async fn run_manual_compact(
589 &mut self,
590 custom_instructions: Option<&str>,
591 ) -> Result<String, String> {
592 let result = self
593 ._run_compaction(CompactionReason::Manual, custom_instructions, false)
594 .await?;
595 Ok(result.map(|r| r.summary).unwrap_or_default())
596 }
597
598 async fn _run_compaction(
601 &mut self,
602 reason: CompactionReason,
603 custom_instructions: Option<&str>,
604 will_retry: bool,
605 ) -> Result<Option<CompactionResult>, String> {
606 if reason == CompactionReason::Threshold && !self.compaction_settings.enabled {
608 return Ok(None);
609 }
610
611 if self.compaction_api_key.is_none() || self.model_name.is_empty() {
612 return Ok(None);
613 }
614
615 self.compaction_cancel = crate::agent::extension::Cancel::new();
618 let cancel = self.compaction_cancel.clone();
619
620 self.emit_compaction_event(&CompactionEvent::Start { reason });
622
623 if cancel.is_cancelled() {
625 return Ok(None);
626 }
627
628 let entries = self.session.get_entries();
629
630 if reason == CompactionReason::Threshold {
632 let context_msgs = self.session.build_session_context().messages;
633 let context_tokens = compaction::estimate_context_tokens(&context_msgs);
634 if !compaction::should_compact(
635 context_tokens,
636 self.context_window,
637 &self.compaction_settings,
638 ) {
639 return Ok(None);
640 }
641 }
642
643 let Some(prep) = prepare_compaction(&entries, &self.compaction_settings) else {
644 return Ok(None);
645 };
646
647 let mut from_hook = false;
649 let mut hook_summary: Option<String> = None;
650 let mut hook_details: Option<serde_json::Value> = None;
651
652 for ext in &self.extensions {
653 if cancel.is_cancelled() {
654 break;
655 }
656 if let Some(result) = ext.before_compact(
657 &prep.first_kept_entry_id,
658 prep.tokens_before,
659 &reason.to_string(),
660 &cancel,
661 ) {
662 if result.cancel {
663 self.emit_compaction_event(&CompactionEvent::End {
664 reason,
665 aborted: true,
666 will_retry: false,
667 error_message: Some("Compaction cancelled by extension".to_string()),
668 result: CompactionResult {
669 summary: String::new(),
670 first_kept_entry_id: prep.first_kept_entry_id.clone(),
671 tokens_before: prep.tokens_before,
672 estimated_tokens_after: 0,
673 details: None,
674 },
675 });
676 return Ok(None);
677 }
678 if result.summary.is_some() {
679 hook_summary = result.summary;
680 hook_details = result.details;
681 from_hook = true;
682 break;
683 }
684 }
685 }
686
687 let result = if let Some(summary) = hook_summary {
688 CompactionResult {
690 summary,
691 first_kept_entry_id: prep.first_kept_entry_id.clone(),
692 tokens_before: prep.tokens_before,
693 estimated_tokens_after: 0, details: hook_details,
695 }
696 } else {
697 let api_key = self.compaction_api_key.as_ref().unwrap();
699 compact(
700 &prep,
701 api_key,
702 &self.model_name,
703 custom_instructions,
704 self.thinking_level,
705 self.model_config.clone(),
706 )
707 .await?
708 };
709
710 self.session.append_compaction(
712 &result.summary,
713 &result.first_kept_entry_id,
714 result.tokens_before,
715 result.details.clone(),
716 Some(from_hook),
717 );
718
719 let context_after = self.session.build_session_context().messages;
721 let estimated_tokens_after = compaction::estimate_context_tokens(&context_after);
722
723 let final_result = CompactionResult {
724 estimated_tokens_after,
725 ..result
726 };
727
728 for ext in &self.extensions {
730 if cancel.is_cancelled() {
731 break;
732 }
733 ext.after_compact(
734 &final_result.summary,
735 &final_result.first_kept_entry_id,
736 final_result.tokens_before,
737 final_result.estimated_tokens_after,
738 from_hook,
739 &reason.to_string(),
740 &cancel,
741 );
742 }
743
744 self.emit_compaction_event(&CompactionEvent::End {
746 reason,
747 result: final_result.clone(),
748 aborted: false,
749 will_retry,
750 error_message: None,
751 });
752
753 Ok(Some(final_result))
754 }
755
756 pub async fn summarize_branch_navigation(
766 &mut self,
767 old_leaf_id: Option<&str>,
768 target_id: &str,
769 ) -> Result<String, String> {
770 if self.compaction_api_key.is_none() || self.model_name.is_empty() {
771 return Err("No provider configured for summarization".to_string());
772 }
773
774 let (entries, _common_ancestor) =
775 collect_entries_for_branch_summary(self.session(), old_leaf_id, target_id);
776
777 if entries.is_empty() {
778 return Err("No abandoned entries to summarize".to_string());
779 }
780
781 let api_key = self.compaction_api_key.as_ref().unwrap();
782 generate_branch_summary(
783 &mut self.session,
784 &entries,
785 target_id,
786 api_key,
787 &self.model_name,
788 self.thinking_level,
789 self.model_config.clone(),
790 )
791 .await
792 }
793
794 pub async fn set_branch(&mut self, branch_from_id: &str) -> Result<Option<String>, String> {
798 let old_leaf = self.session.get_leaf_id();
799
800 let summary = if self.compaction_api_key.is_some()
801 && !self.model_name.is_empty()
802 && let Some(ref old) = old_leaf
803 && old != branch_from_id
804 {
805 match self
807 .summarize_branch_navigation(Some(old), branch_from_id)
808 .await
809 {
810 Ok(s) => Some(s),
811 Err(e) => {
812 eprintln!("Warning: branch summarization failed: {}", e);
814 None
815 }
816 }
817 } else {
818 None
819 };
820
821 self.session
822 .set_leaf_id(Some(branch_from_id))
823 .map_err(|e| format!("Failed to set branch: {}", e))?;
824
825 Ok(summary)
826 }
827
828 pub fn persist_tool_result(
831 &mut self,
832 tool_call_id: &str,
833 tool_name: &str,
834 content: String,
835 is_error: bool,
836 ) {
837 let msg = tool_result_message(tool_call_id, tool_name, content, is_error);
838 self.persist_message(&msg);
839 }
840
841 pub fn persist_extension_message(&mut self, msg: &AgentMessage) {
845 let Some(kind) = crate::agent::types::message_extension_kind(msg) else {
846 return;
847 };
848 let text = crate::agent::types::message_extension_text(msg)
849 .unwrap_or_else(|| crate::agent::types::message_text(msg));
850 let content = serde_json::json!({"text": text});
851 self.session
852 .append_custom_message_entry(kind, content, true, None);
853 }
854
855 pub fn persist_message_end(&mut self, msg: &AgentMessage) {
861 if crate::agent::types::message_is_tool_result(msg)
864 && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
865 && self.persisted_tool_call_ids.contains(tcid)
866 {
867 return;
868 }
869 self.persist_message(msg);
871 }
872
873 fn persist_message(&mut self, msg: &AgentMessage) {
879 if crate::agent::types::message_is_tool_result(msg)
881 && let Some(tcid) = crate::agent::types::message_tool_call_id(msg)
882 {
883 if self.persisted_tool_call_ids.contains(tcid) {
884 return;
885 }
886 self.session.append_message(msg);
887 self.persisted_tool_call_ids.insert(tcid.to_string());
888 self.persisted_message_ids.insert(message_dedup_key(msg));
889 return;
890 }
891 if self.persisted_message_ids.contains(&message_dedup_key(msg)) {
893 return;
894 }
895 self.session.append_message(msg);
896 self.persisted_message_ids.insert(message_dedup_key(msg));
897 }
898}