Skip to main content

opi_coding_agent/
session_coordinator.rs

1//! Session lifecycle coordinator bridging harness, session writer,
2//! compaction engine, and usage accumulation.
3
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use opi_agent::compaction::{CompactionConfig, CompactionEngine, DefaultCompactionHooks, Entry};
8use opi_agent::message::{AgentMessage, CompactionSummaryMessage};
9use opi_agent::session::{
10    CompactionEntry, LeafEntry, MessageEntry, SessionEntry, SessionHeader, SessionWriter,
11};
12use opi_agent::session_event::{CompactionReason, CompactionResult};
13use opi_ai::message::Message;
14use opi_ai::stream::{CumulativeUsage, Usage};
15
16use crate::pricing::lookup_pricing;
17
18static ENTRY_SEQ: AtomicU64 = AtomicU64::new(1);
19
20/// Result of a compaction triggered during `on_turn_end`.
21///
22/// The harness uses these fields to (a) install `[summary, ...kept]` as the
23/// new Agent message buffer, (b) emit `CompactionStart`/`End` events, and
24/// (c) propagate token-before/after on the session event protocol.
25pub struct CompactionResultOutput {
26    pub summary: CompactionSummaryMessage,
27    /// New Agent message buffer to install: `[summary, ...kept_messages]`.
28    pub new_agent_messages: Vec<AgentMessage>,
29    pub reason: CompactionReason,
30    pub tokens_before: u64,
31    pub tokens_after: u64,
32    pub first_kept_entry_id: String,
33}
34
35pub struct SessionCoordinator {
36    writer: SessionWriter,
37    compaction: CompactionEngine,
38    usage: CumulativeUsage,
39    session_id: String,
40    session_path: PathBuf,
41    model: String,
42    /// Entries accumulated so far, used as compaction input.
43    /// Indexed in parallel with `agent_message_indices`.
44    entries: Vec<Entry>,
45    /// For each `entries[i]`, the index into the Agent's internal message
46    /// buffer when that entry was appended. Used to compute which Agent
47    /// messages survive a compaction.
48    agent_message_indices: Vec<usize>,
49    /// Running count of how many messages the Agent has accumulated.
50    /// Updated on every `on_turn_end` call.
51    agent_message_count: usize,
52    /// Cumulative token count at the last compaction. The threshold check
53    /// uses `current_total - watermark` so compaction doesn't re-trigger
54    /// every turn after the threshold is crossed once.
55    compaction_watermark_tokens: u64,
56}
57
58impl SessionCoordinator {
59    pub fn new(
60        dir: &Path,
61        cwd: &str,
62        compaction_config: CompactionConfig,
63        model: impl Into<String>,
64    ) -> std::io::Result<Self> {
65        let id = generate_session_id();
66        let timestamp = now_iso();
67        let header = SessionHeader::new(id.clone(), timestamp, cwd.into(), None);
68        let path = dir.join(format!("{id}.jsonl"));
69        std::fs::create_dir_all(dir)?;
70        let writer = SessionWriter::create(&path, header)?;
71        Ok(Self {
72            writer,
73            compaction: CompactionEngine::new(compaction_config),
74            usage: CumulativeUsage::default(),
75            session_id: id,
76            session_path: path,
77            model: model.into(),
78            entries: Vec::new(),
79            agent_message_indices: Vec::new(),
80            agent_message_count: 0,
81            compaction_watermark_tokens: 0,
82        })
83    }
84
85    /// Open an existing session file for appending (resume).
86    ///
87    /// `existing_entries` are the prior session entries already loaded by the
88    /// caller via `SessionReader::read_all`. Only entries on the active branch
89    /// (determined by the last `Leaf` pointer) are replayed into the internal
90    /// compaction buffer — matching the ordering used by `reconstruct_context`
91    /// for the Agent's message buffer. Legacy sessions without Leaf entries
92    /// fall back to file-order replay.
93    ///
94    /// Compaction entries are honored by replaying their semantics: the
95    /// kept tail (entries from `first_kept_entry_id` onward, persisted before
96    /// the marker) is preserved, the summary occupies a synthetic slot at
97    /// index 0 of the post-compaction agent buffer, and indices are rebuilt
98    /// to match the runtime layout `[summary, ...kept_tail, ...post_marker]`.
99    /// If `first_kept_entry_id` cannot be located among the already-replayed
100    /// entries (corrupt or forward-incompatible session), the buffer is
101    /// reset entirely — matching the legacy defensive behavior.
102    pub fn open_existing(
103        path: PathBuf,
104        session_id: String,
105        existing_entries: &[SessionEntry],
106        prior_agent_message_count: usize,
107        compaction_config: CompactionConfig,
108        model: impl Into<String>,
109    ) -> std::io::Result<Self> {
110        let writer = SessionWriter::open(&path)?;
111
112        // Advance the global sequence counter past any existing IDs.
113        advance_seq_from_entries(existing_entries);
114
115        // Replay entries in active-branch order (not raw file order) to seed
116        // the compaction buffer. This uses the same Leaf-based branch
117        // selection as reconstruct_context so the coordinator's internal
118        // state stays aligned with the Agent's message buffer.
119        let ordered = crate::session_cli::select_ordered_entries(existing_entries);
120
121        let mut entries: Vec<Entry> = Vec::new();
122        let mut indices: Vec<usize> = Vec::new();
123        let mut agent_idx: usize = 0;
124        let mut total_input: u64 = 0;
125        let mut total_output: u64 = 0;
126        let mut total_cache_read: u64 = 0;
127        let mut total_cache_write: u64 = 0;
128        // Count turns as user messages — each user prompt drives exactly one
129        // on_turn_end call. Counting assistant messages would overcount because
130        // a single user turn can produce multiple assistant messages (tool call
131        // + final response).
132        let mut user_count: u32 = 0;
133
134        for entry in ordered {
135            match entry {
136                SessionEntry::Message(m) => {
137                    // Accumulate usage from persisted assistant messages and
138                    // count turns by user messages.
139                    match &m.message {
140                        Message::Assistant(a) => {
141                            total_input += a.usage.input_tokens as u64;
142                            total_output += a.usage.output_tokens as u64;
143                            total_cache_read += a.usage.cache_read_tokens as u64;
144                            total_cache_write += a.usage.cache_write_tokens as u64;
145                        }
146                        Message::User(_) => {
147                            user_count += 1;
148                        }
149                        _ => {}
150                    }
151                    entries.push(Entry {
152                        id: m.id.clone(),
153                        message: AgentMessage::Llm(m.message.clone()),
154                    });
155                    indices.push(agent_idx);
156                    agent_idx += 1;
157                }
158                SessionEntry::Compaction(c) => {
159                    let kept_start = entries.iter().position(|e| e.id == c.first_kept_entry_id);
160                    let kept: Vec<Entry> = match kept_start {
161                        Some(idx) => entries.split_off(idx),
162                        None => Vec::new(),
163                    };
164                    let kept_count = kept.len();
165                    // Rebuild entries with the compaction summary at index 0,
166                    // followed by the kept tail. This mirrors the runtime
167                    // compaction layout so a subsequent compaction sees the
168                    // full context including prior summaries.
169                    let summary_entry = Entry {
170                        id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
171                        message: AgentMessage::CompactionSummary(CompactionSummaryMessage {
172                            summary: c.summary.clone(),
173                            first_kept_entry_id: c.first_kept_entry_id.clone(),
174                            tokens_before: c.tokens_before,
175                            tokens_after: c.tokens_after,
176                        }),
177                    };
178                    let mut rebuilt = Vec::with_capacity(1 + kept_count);
179                    rebuilt.push(summary_entry);
180                    rebuilt.extend(kept);
181                    entries = rebuilt;
182                    indices = (0..=kept_count).collect();
183                    agent_idx = 1 + kept_count;
184                }
185                SessionEntry::Leaf(_) => {}
186                _ => {}
187            }
188        }
189
190        let usage = CumulativeUsage::from_totals(
191            total_input,
192            total_output,
193            total_cache_read,
194            total_cache_write,
195            user_count,
196        );
197        let watermark = usage.as_usage().total_tokens();
198
199        Ok(Self {
200            writer,
201            compaction: CompactionEngine::new(compaction_config),
202            usage,
203            session_id,
204            session_path: path,
205            model: model.into(),
206            entries,
207            agent_message_indices: indices,
208            agent_message_count: prior_agent_message_count,
209            compaction_watermark_tokens: watermark,
210        })
211    }
212
213    /// Persist only the new messages from a completed turn.
214    ///
215    /// `new_messages` should contain only the messages produced during this
216    /// turn (not the full conversation history). The caller is responsible for
217    /// slicing appropriately. `turn_start_agent_index` is the index in the
218    /// Agent's full message buffer where `new_messages[0]` lives.
219    ///
220    /// Returns `Ok(Some(CompactionReason))` if compaction should be triggered
221    /// (the caller should emit `CompactionStart`, then call
222    /// `execute_compaction`). Returns `Ok(None)` if no compaction is needed.
223    /// Returns `Err` if a session write failed.
224    pub fn on_turn_end(
225        &mut self,
226        new_messages: &[AgentMessage],
227        usage: &Usage,
228        turn_start_agent_index: usize,
229    ) -> Result<Option<CompactionReason>, std::io::Error> {
230        self.usage.accumulate(usage);
231
232        let mut agent_idx = turn_start_agent_index;
233        for msg in new_messages {
234            if let AgentMessage::Llm(m) = msg {
235                let entry_id = format!("msg-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed));
236                let entry = SessionEntry::Message(MessageEntry {
237                    id: entry_id.clone(),
238                    parent_id: None,
239                    timestamp: now_iso(),
240                    message: m.clone(),
241                });
242                self.writer.append(&entry)?;
243                self.entries.push(Entry {
244                    id: entry_id,
245                    message: msg.clone(),
246                });
247                self.agent_message_indices.push(agent_idx);
248            }
249            agent_idx += 1;
250        }
251        self.agent_message_count = agent_idx;
252
253        // Check threshold-based compaction after each turn.
254        // Use tokens accumulated since the last compaction (watermark) so
255        // compaction doesn't re-trigger every turn after the first crossing.
256        let total_tokens = self.usage.as_usage().total_tokens();
257        let delta = total_tokens.saturating_sub(self.compaction_watermark_tokens);
258        if self
259            .compaction
260            .should_compact(delta, CompactionReason::Threshold)
261        {
262            Ok(Some(CompactionReason::Threshold))
263        } else {
264            Ok(None)
265        }
266    }
267
268    /// Execute compaction after `on_turn_end` returned `Some(reason)`.
269    /// The caller should emit `CompactionStart` before calling this and
270    /// `CompactionEnd` afterwards.
271    ///
272    /// Returns `Err` if the compaction marker could not be persisted — in this
273    /// case the in-memory state is left unchanged (no buffer replacement, no
274    /// watermark advance) so the session file stays consistent with the
275    /// runtime.
276    pub fn execute_compaction(
277        &mut self,
278        reason: CompactionReason,
279    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
280        self.run_compaction(reason)
281    }
282
283    /// Backwards-compatible variant used by tests that don't track Agent indices.
284    /// Assumes `new_messages` are appended starting at the current message count.
285    /// Runs compaction inline if needed (no separate event emission).
286    pub fn on_turn_end_simple(
287        &mut self,
288        new_messages: &[AgentMessage],
289        usage: &Usage,
290    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
291        let start = self.agent_message_count;
292        let reason = self.on_turn_end(new_messages, usage, start)?;
293        match reason {
294            Some(r) => self.execute_compaction(r),
295            None => Ok(None),
296        }
297    }
298
299    fn run_compaction(
300        &mut self,
301        reason: CompactionReason,
302    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
303        let hooks = DefaultCompactionHooks;
304        match self.compaction.compact(&self.entries, reason, &hooks) {
305            Ok(output) => {
306                let split = self.entries.len() - output.kept_entries.len();
307                let kept_indices: Vec<usize> = self
308                    .agent_message_indices
309                    .iter()
310                    .skip(split)
311                    .copied()
312                    .collect();
313                let kept_messages: Vec<AgentMessage> = output
314                    .kept_entries
315                    .iter()
316                    .map(|e| e.message.clone())
317                    .collect();
318
319                let summary = CompactionSummaryMessage {
320                    summary: output.summary_text.clone(),
321                    first_kept_entry_id: output.first_kept_entry_id.clone(),
322                    tokens_before: output.tokens_before,
323                    tokens_after: output.tokens_after,
324                };
325
326                let compaction_entry = SessionEntry::Compaction(CompactionEntry {
327                    id: format!("cmp-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
328                    parent_id: None,
329                    timestamp: now_iso(),
330                    summary: output.summary_text.clone(),
331                    first_kept_entry_id: output.first_kept_entry_id.clone(),
332                    tokens_before: output.tokens_before,
333                    tokens_after: output.tokens_after,
334                });
335
336                // Persist the compaction marker BEFORE mutating in-memory state.
337                // If this fails, the runtime context remains un-compacted so
338                // the session file and memory stay consistent.
339                self.writer.append(&compaction_entry)?;
340
341                // Reset internal entries to [summary, ...kept]. The summary
342                // must be included so that a subsequent compaction can see the
343                // full context including earlier compaction summaries.
344                let mut new_entries = Vec::with_capacity(1 + output.kept_entries.len());
345                new_entries.push(Entry {
346                    id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
347                    message: AgentMessage::CompactionSummary(summary.clone()),
348                });
349                new_entries.extend(output.kept_entries);
350                self.entries = new_entries;
351                self.agent_message_indices = (0..=kept_indices.len()).collect();
352                self.agent_message_count = 1 + kept_messages.len();
353
354                // Advance the watermark so the next threshold check measures
355                // tokens accumulated from this point forward.
356                self.compaction_watermark_tokens = self.usage.as_usage().total_tokens();
357
358                // Build the new Agent buffer: [summary, ...kept].
359                let mut new_agent_messages = Vec::with_capacity(1 + kept_messages.len());
360                new_agent_messages.push(AgentMessage::CompactionSummary(summary.clone()));
361                new_agent_messages.extend(kept_messages);
362
363                Ok(Some(CompactionResultOutput {
364                    summary,
365                    new_agent_messages,
366                    reason: output.reason,
367                    tokens_before: output.tokens_before,
368                    tokens_after: output.tokens_after,
369                    first_kept_entry_id: output.first_kept_entry_id,
370                }))
371            }
372            Err(_) => {
373                // Nothing to compact (too few entries) — no-op.
374                Ok(None)
375            }
376        }
377    }
378
379    pub fn session_id(&self) -> &str {
380        &self.session_id
381    }
382
383    pub fn session_path(&self) -> &Path {
384        &self.session_path
385    }
386
387    pub fn usage(&self) -> &CumulativeUsage {
388        &self.usage
389    }
390
391    pub fn compaction_engine(&self) -> &CompactionEngine {
392        &self.compaction
393    }
394
395    /// Read-only view of the entries currently tracked for compaction.
396    /// Exposed for tests that need to assert resume correctness.
397    pub fn compaction_entries(&self) -> &[Entry] {
398        &self.entries
399    }
400
401    /// Append a Leaf pointer marking the selected active branch tip.
402    pub fn append_leaf(&mut self, entry_id: &str) -> Result<(), std::io::Error> {
403        let entry = SessionEntry::Leaf(LeafEntry {
404            id: format!("leaf-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
405            parent_id: None,
406            timestamp: now_iso(),
407            entry_id: entry_id.to_owned(),
408        });
409        self.writer.append(&entry)
410    }
411
412    /// Compute the cost summary from the accumulated usage and the model
413    /// pricing table. Returns `None` if no pricing is known for the model.
414    pub fn cost_summary(&self) -> Option<opi_ai::stream::CostBreakdown> {
415        let pricing = lookup_pricing(&self.model)?;
416        Some(opi_ai::stream::calculate_cost(
417            &self.usage.as_usage(),
418            &pricing,
419        ))
420    }
421
422    pub fn model(&self) -> &str {
423        &self.model
424    }
425}
426
427/// Extract the numeric suffix from entry IDs like `msg-3` or `cmp-7`.
428/// Returns 0 for entries that don't match the pattern.
429fn entry_seq(id: &str) -> u64 {
430    id.split_once('-')
431        .and_then(|(_, rest)| rest.parse::<u64>().ok())
432        .unwrap_or(0)
433}
434
435/// Advance the global `ENTRY_SEQ` past any IDs found in existing session
436/// entries so resumed sessions don't produce duplicate IDs.
437fn advance_seq_from_entries(entries: &[SessionEntry]) {
438    let max_seq = entries
439        .iter()
440        .map(|e| entry_seq(e.entry_id()))
441        .max()
442        .unwrap_or(0);
443    if max_seq > 0 {
444        ENTRY_SEQ.fetch_max(max_seq + 1, Ordering::Relaxed);
445    }
446}
447pub fn to_wire_result(out: &CompactionResultOutput) -> CompactionResult {
448    CompactionResult {
449        summary: out.summary.summary.clone(),
450        first_kept_entry_id: out.first_kept_entry_id.clone(),
451        tokens_before: out.tokens_before,
452        tokens_after: out.tokens_after,
453    }
454}
455
456fn generate_session_id() -> String {
457    use std::time::{SystemTime, UNIX_EPOCH};
458    let ts = SystemTime::now()
459        .duration_since(UNIX_EPOCH)
460        .unwrap_or_default()
461        .as_millis();
462    format!("{ts:x}")
463}
464
465fn now_iso() -> String {
466    let secs = std::time::SystemTime::now()
467        .duration_since(std::time::UNIX_EPOCH)
468        .unwrap_or_default()
469        .as_secs();
470    let days = secs / 86400;
471    let tod = secs % 86400;
472    let h = tod / 3600;
473    let m = (tod % 3600) / 60;
474    let s = tod % 60;
475    let (y, mo, d) = days_to_ymd(days);
476    format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z")
477}
478
479fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
480    let mut year = 1970u64;
481    loop {
482        let diy = if is_leap(year) { 366 } else { 365 };
483        if days < diy {
484            break;
485        }
486        days -= diy;
487        year += 1;
488    }
489    let md = [
490        31,
491        if is_leap(year) { 29 } else { 28 },
492        31,
493        30,
494        31,
495        30,
496        31,
497        31,
498        30,
499        31,
500        30,
501        31,
502    ];
503    let mut month = 0u64;
504    for &d in &md {
505        if days < d {
506            break;
507        }
508        days -= d;
509        month += 1;
510    }
511    (year, month + 1, days + 1)
512}
513
514fn is_leap(y: u64) -> bool {
515    (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
516}