1use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use opi_agent::compaction::{CompactionConfig, CompactionEngine, DefaultCompactionHooks, Entry};
8use opi_agent::message::{AgentMessage, CompactionSummaryMessage};
9use opi_agent::session::{
10 CompactionEntry, LeafEntry, MessageEntry, SessionEntry, SessionHeader, SessionWriter,
11};
12use opi_agent::session_event::{CompactionReason, CompactionResult};
13use opi_ai::message::Message;
14use opi_ai::stream::{CumulativeUsage, Usage};
15
16use crate::pricing::lookup_pricing;
17
18static ENTRY_SEQ: AtomicU64 = AtomicU64::new(1);
19
20pub struct CompactionResultOutput {
26 pub summary: CompactionSummaryMessage,
27 pub new_agent_messages: Vec<AgentMessage>,
29 pub reason: CompactionReason,
30 pub tokens_before: u64,
31 pub tokens_after: u64,
32 pub first_kept_entry_id: String,
33}
34
35pub struct SessionCoordinator {
36 writer: SessionWriter,
37 compaction: CompactionEngine,
38 usage: CumulativeUsage,
39 session_id: String,
40 session_path: PathBuf,
41 model: String,
42 entries: Vec<Entry>,
45 agent_message_indices: Vec<usize>,
49 agent_message_count: usize,
52 compaction_watermark_tokens: u64,
56}
57
58impl SessionCoordinator {
59 pub fn new(
60 dir: &Path,
61 cwd: &str,
62 compaction_config: CompactionConfig,
63 model: impl Into<String>,
64 ) -> std::io::Result<Self> {
65 let id = generate_session_id();
66 let timestamp = now_iso();
67 let header = SessionHeader::new(id.clone(), timestamp, cwd.into(), None);
68 let path = dir.join(format!("{id}.jsonl"));
69 std::fs::create_dir_all(dir)?;
70 let writer = SessionWriter::create(&path, header)?;
71 Ok(Self {
72 writer,
73 compaction: CompactionEngine::new(compaction_config),
74 usage: CumulativeUsage::default(),
75 session_id: id,
76 session_path: path,
77 model: model.into(),
78 entries: Vec::new(),
79 agent_message_indices: Vec::new(),
80 agent_message_count: 0,
81 compaction_watermark_tokens: 0,
82 })
83 }
84
85 pub fn open_existing(
103 path: PathBuf,
104 session_id: String,
105 existing_entries: &[SessionEntry],
106 prior_agent_message_count: usize,
107 compaction_config: CompactionConfig,
108 model: impl Into<String>,
109 ) -> std::io::Result<Self> {
110 let writer = SessionWriter::open(&path)?;
111
112 advance_seq_from_entries(existing_entries);
114
115 let ordered = crate::session_cli::select_ordered_entries(existing_entries);
120
121 let mut entries: Vec<Entry> = Vec::new();
122 let mut indices: Vec<usize> = Vec::new();
123 let mut agent_idx: usize = 0;
124 let mut total_input: u64 = 0;
125 let mut total_output: u64 = 0;
126 let mut total_cache_read: u64 = 0;
127 let mut total_cache_write: u64 = 0;
128 let mut user_count: u32 = 0;
133
134 for entry in ordered {
135 match entry {
136 SessionEntry::Message(m) => {
137 match &m.message {
140 Message::Assistant(a) => {
141 total_input += a.usage.input_tokens as u64;
142 total_output += a.usage.output_tokens as u64;
143 total_cache_read += a.usage.cache_read_tokens as u64;
144 total_cache_write += a.usage.cache_write_tokens as u64;
145 }
146 Message::User(_) => {
147 user_count += 1;
148 }
149 _ => {}
150 }
151 entries.push(Entry {
152 id: m.id.clone(),
153 message: AgentMessage::Llm(m.message.clone()),
154 });
155 indices.push(agent_idx);
156 agent_idx += 1;
157 }
158 SessionEntry::Compaction(c) => {
159 let kept_start = entries.iter().position(|e| e.id == c.first_kept_entry_id);
160 let kept: Vec<Entry> = match kept_start {
161 Some(idx) => entries.split_off(idx),
162 None => Vec::new(),
163 };
164 let kept_count = kept.len();
165 let summary_entry = Entry {
170 id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
171 message: AgentMessage::CompactionSummary(CompactionSummaryMessage {
172 summary: c.summary.clone(),
173 first_kept_entry_id: c.first_kept_entry_id.clone(),
174 tokens_before: c.tokens_before,
175 tokens_after: c.tokens_after,
176 }),
177 };
178 let mut rebuilt = Vec::with_capacity(1 + kept_count);
179 rebuilt.push(summary_entry);
180 rebuilt.extend(kept);
181 entries = rebuilt;
182 indices = (0..=kept_count).collect();
183 agent_idx = 1 + kept_count;
184 }
185 SessionEntry::Leaf(_) => {}
186 _ => {}
187 }
188 }
189
190 let usage = CumulativeUsage::from_totals(
191 total_input,
192 total_output,
193 total_cache_read,
194 total_cache_write,
195 user_count,
196 );
197 let watermark = usage.as_usage().total_tokens();
198
199 Ok(Self {
200 writer,
201 compaction: CompactionEngine::new(compaction_config),
202 usage,
203 session_id,
204 session_path: path,
205 model: model.into(),
206 entries,
207 agent_message_indices: indices,
208 agent_message_count: prior_agent_message_count,
209 compaction_watermark_tokens: watermark,
210 })
211 }
212
213 pub fn on_turn_end(
225 &mut self,
226 new_messages: &[AgentMessage],
227 usage: &Usage,
228 turn_start_agent_index: usize,
229 ) -> Result<Option<CompactionReason>, std::io::Error> {
230 self.usage.accumulate(usage);
231
232 let mut agent_idx = turn_start_agent_index;
233 for msg in new_messages {
234 if let AgentMessage::Llm(m) = msg {
235 let entry_id = format!("msg-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed));
236 let entry = SessionEntry::Message(MessageEntry {
237 id: entry_id.clone(),
238 parent_id: None,
239 timestamp: now_iso(),
240 message: m.clone(),
241 });
242 self.writer.append(&entry)?;
243 self.entries.push(Entry {
244 id: entry_id,
245 message: msg.clone(),
246 });
247 self.agent_message_indices.push(agent_idx);
248 }
249 agent_idx += 1;
250 }
251 self.agent_message_count = agent_idx;
252
253 let total_tokens = self.usage.as_usage().total_tokens();
257 let delta = total_tokens.saturating_sub(self.compaction_watermark_tokens);
258 if self
259 .compaction
260 .should_compact(delta, CompactionReason::Threshold)
261 {
262 Ok(Some(CompactionReason::Threshold))
263 } else {
264 Ok(None)
265 }
266 }
267
268 pub fn execute_compaction(
277 &mut self,
278 reason: CompactionReason,
279 ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
280 self.run_compaction(reason)
281 }
282
283 pub fn on_turn_end_simple(
287 &mut self,
288 new_messages: &[AgentMessage],
289 usage: &Usage,
290 ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
291 let start = self.agent_message_count;
292 let reason = self.on_turn_end(new_messages, usage, start)?;
293 match reason {
294 Some(r) => self.execute_compaction(r),
295 None => Ok(None),
296 }
297 }
298
299 fn run_compaction(
300 &mut self,
301 reason: CompactionReason,
302 ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
303 let hooks = DefaultCompactionHooks;
304 match self.compaction.compact(&self.entries, reason, &hooks) {
305 Ok(output) => {
306 let split = self.entries.len() - output.kept_entries.len();
307 let kept_indices: Vec<usize> = self
308 .agent_message_indices
309 .iter()
310 .skip(split)
311 .copied()
312 .collect();
313 let kept_messages: Vec<AgentMessage> = output
314 .kept_entries
315 .iter()
316 .map(|e| e.message.clone())
317 .collect();
318
319 let summary = CompactionSummaryMessage {
320 summary: output.summary_text.clone(),
321 first_kept_entry_id: output.first_kept_entry_id.clone(),
322 tokens_before: output.tokens_before,
323 tokens_after: output.tokens_after,
324 };
325
326 let compaction_entry = SessionEntry::Compaction(CompactionEntry {
327 id: format!("cmp-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
328 parent_id: None,
329 timestamp: now_iso(),
330 summary: output.summary_text.clone(),
331 first_kept_entry_id: output.first_kept_entry_id.clone(),
332 tokens_before: output.tokens_before,
333 tokens_after: output.tokens_after,
334 });
335
336 self.writer.append(&compaction_entry)?;
340
341 let mut new_entries = Vec::with_capacity(1 + output.kept_entries.len());
345 new_entries.push(Entry {
346 id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
347 message: AgentMessage::CompactionSummary(summary.clone()),
348 });
349 new_entries.extend(output.kept_entries);
350 self.entries = new_entries;
351 self.agent_message_indices = (0..=kept_indices.len()).collect();
352 self.agent_message_count = 1 + kept_messages.len();
353
354 self.compaction_watermark_tokens = self.usage.as_usage().total_tokens();
357
358 let mut new_agent_messages = Vec::with_capacity(1 + kept_messages.len());
360 new_agent_messages.push(AgentMessage::CompactionSummary(summary.clone()));
361 new_agent_messages.extend(kept_messages);
362
363 Ok(Some(CompactionResultOutput {
364 summary,
365 new_agent_messages,
366 reason: output.reason,
367 tokens_before: output.tokens_before,
368 tokens_after: output.tokens_after,
369 first_kept_entry_id: output.first_kept_entry_id,
370 }))
371 }
372 Err(_) => {
373 Ok(None)
375 }
376 }
377 }
378
379 pub fn session_id(&self) -> &str {
380 &self.session_id
381 }
382
383 pub fn session_path(&self) -> &Path {
384 &self.session_path
385 }
386
387 pub fn usage(&self) -> &CumulativeUsage {
388 &self.usage
389 }
390
391 pub fn compaction_engine(&self) -> &CompactionEngine {
392 &self.compaction
393 }
394
395 pub fn compaction_entries(&self) -> &[Entry] {
398 &self.entries
399 }
400
401 pub fn append_leaf(&mut self, entry_id: &str) -> Result<(), std::io::Error> {
403 let entry = SessionEntry::Leaf(LeafEntry {
404 id: format!("leaf-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
405 parent_id: None,
406 timestamp: now_iso(),
407 entry_id: entry_id.to_owned(),
408 });
409 self.writer.append(&entry)
410 }
411
412 pub fn cost_summary(&self) -> Option<opi_ai::stream::CostBreakdown> {
415 let pricing = lookup_pricing(&self.model)?;
416 Some(opi_ai::stream::calculate_cost(
417 &self.usage.as_usage(),
418 &pricing,
419 ))
420 }
421
422 pub fn model(&self) -> &str {
423 &self.model
424 }
425}
426
427fn entry_seq(id: &str) -> u64 {
430 id.split_once('-')
431 .and_then(|(_, rest)| rest.parse::<u64>().ok())
432 .unwrap_or(0)
433}
434
435fn advance_seq_from_entries(entries: &[SessionEntry]) {
438 let max_seq = entries
439 .iter()
440 .map(|e| entry_seq(e.entry_id()))
441 .max()
442 .unwrap_or(0);
443 if max_seq > 0 {
444 ENTRY_SEQ.fetch_max(max_seq + 1, Ordering::Relaxed);
445 }
446}
447pub fn to_wire_result(out: &CompactionResultOutput) -> CompactionResult {
448 CompactionResult {
449 summary: out.summary.summary.clone(),
450 first_kept_entry_id: out.first_kept_entry_id.clone(),
451 tokens_before: out.tokens_before,
452 tokens_after: out.tokens_after,
453 }
454}
455
456fn generate_session_id() -> String {
457 use std::time::{SystemTime, UNIX_EPOCH};
458 let ts = SystemTime::now()
459 .duration_since(UNIX_EPOCH)
460 .unwrap_or_default()
461 .as_millis();
462 format!("{ts:x}")
463}
464
465fn now_iso() -> String {
466 let secs = std::time::SystemTime::now()
467 .duration_since(std::time::UNIX_EPOCH)
468 .unwrap_or_default()
469 .as_secs();
470 let days = secs / 86400;
471 let tod = secs % 86400;
472 let h = tod / 3600;
473 let m = (tod % 3600) / 60;
474 let s = tod % 60;
475 let (y, mo, d) = days_to_ymd(days);
476 format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z")
477}
478
479fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
480 let mut year = 1970u64;
481 loop {
482 let diy = if is_leap(year) { 366 } else { 365 };
483 if days < diy {
484 break;
485 }
486 days -= diy;
487 year += 1;
488 }
489 let md = [
490 31,
491 if is_leap(year) { 29 } else { 28 },
492 31,
493 30,
494 31,
495 30,
496 31,
497 31,
498 30,
499 31,
500 30,
501 31,
502 ];
503 let mut month = 0u64;
504 for &d in &md {
505 if days < d {
506 break;
507 }
508 days -= d;
509 month += 1;
510 }
511 (year, month + 1, days + 1)
512}
513
514fn is_leap(y: u64) -> bool {
515 (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
516}