opi-coding-agent 0.5.0

Interactive coding agent CLI with file editing and shell execution
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
//! Session lifecycle coordinator bridging harness, session writer,
//! compaction engine, and usage accumulation.

use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};

use opi_agent::compaction::{CompactionConfig, CompactionEngine, DefaultCompactionHooks, Entry};
use opi_agent::message::{AgentMessage, CompactionSummaryMessage};
use opi_agent::session::{
    CompactionEntry, LeafEntry, MessageEntry, SessionEntry, SessionHeader, SessionWriter,
};
use opi_agent::session_event::{CompactionReason, CompactionResult};
use opi_ai::message::Message;
use opi_ai::stream::{CumulativeUsage, Usage};

use crate::pricing::lookup_pricing;

static ENTRY_SEQ: AtomicU64 = AtomicU64::new(1);

/// Result of a compaction triggered during `on_turn_end`.
///
/// The harness uses these fields to (a) install `[summary, ...kept]` as the
/// new Agent message buffer, (b) emit `CompactionStart`/`End` events, and
/// (c) propagate token-before/after on the session event protocol.
pub struct CompactionResultOutput {
    pub summary: CompactionSummaryMessage,
    /// New Agent message buffer to install: `[summary, ...kept_messages]`.
    pub new_agent_messages: Vec<AgentMessage>,
    pub reason: CompactionReason,
    pub tokens_before: u64,
    pub tokens_after: u64,
    pub first_kept_entry_id: String,
}

pub struct SessionCoordinator {
    writer: SessionWriter,
    compaction: CompactionEngine,
    usage: CumulativeUsage,
    session_id: String,
    session_path: PathBuf,
    model: String,
    /// Entries accumulated so far, used as compaction input.
    /// Indexed in parallel with `agent_message_indices`.
    entries: Vec<Entry>,
    /// For each `entries[i]`, the index into the Agent's internal message
    /// buffer when that entry was appended. Used to compute which Agent
    /// messages survive a compaction.
    agent_message_indices: Vec<usize>,
    /// Running count of how many messages the Agent has accumulated.
    /// Updated on every `on_turn_end` call.
    agent_message_count: usize,
    /// Cumulative token count at the last compaction. The threshold check
    /// uses `current_total - watermark` so compaction doesn't re-trigger
    /// every turn after the threshold is crossed once.
    compaction_watermark_tokens: u64,
}

impl SessionCoordinator {
    pub fn new(
        dir: &Path,
        cwd: &str,
        compaction_config: CompactionConfig,
        model: impl Into<String>,
    ) -> std::io::Result<Self> {
        let id = generate_session_id();
        let timestamp = now_iso();
        let header = SessionHeader::new(id.clone(), timestamp, cwd.into(), None);
        let path = dir.join(format!("{id}.jsonl"));
        std::fs::create_dir_all(dir)?;
        let writer = SessionWriter::create(&path, header)?;
        Ok(Self {
            writer,
            compaction: CompactionEngine::new(compaction_config),
            usage: CumulativeUsage::default(),
            session_id: id,
            session_path: path,
            model: model.into(),
            entries: Vec::new(),
            agent_message_indices: Vec::new(),
            agent_message_count: 0,
            compaction_watermark_tokens: 0,
        })
    }

    /// Open an existing session file for appending (resume).
    ///
    /// `existing_entries` are the prior session entries already loaded by the
    /// caller via `SessionReader::read_all`. Only entries on the active branch
    /// (determined by the last `Leaf` pointer) are replayed into the internal
    /// compaction buffer — matching the ordering used by `reconstruct_context`
    /// for the Agent's message buffer. Legacy sessions without Leaf entries
    /// fall back to file-order replay.
    ///
    /// Compaction entries are honored by replaying their semantics: the
    /// kept tail (entries from `first_kept_entry_id` onward, persisted before
    /// the marker) is preserved, the summary occupies a synthetic slot at
    /// index 0 of the post-compaction agent buffer, and indices are rebuilt
    /// to match the runtime layout `[summary, ...kept_tail, ...post_marker]`.
    /// If `first_kept_entry_id` cannot be located among the already-replayed
    /// entries (corrupt or forward-incompatible session), the buffer is
    /// reset entirely — matching the legacy defensive behavior.
    pub fn open_existing(
        path: PathBuf,
        session_id: String,
        existing_entries: &[SessionEntry],
        prior_agent_message_count: usize,
        compaction_config: CompactionConfig,
        model: impl Into<String>,
    ) -> std::io::Result<Self> {
        let writer = SessionWriter::open(&path)?;

        // Advance the global sequence counter past any existing IDs.
        advance_seq_from_entries(existing_entries);

        // Replay entries in active-branch order (not raw file order) to seed
        // the compaction buffer. This uses the same Leaf-based branch
        // selection as reconstruct_context so the coordinator's internal
        // state stays aligned with the Agent's message buffer.
        let ordered = crate::session_cli::select_ordered_entries(existing_entries);

        let mut entries: Vec<Entry> = Vec::new();
        let mut indices: Vec<usize> = Vec::new();
        let mut agent_idx: usize = 0;
        let mut total_input: u64 = 0;
        let mut total_output: u64 = 0;
        let mut total_cache_read: u64 = 0;
        let mut total_cache_write: u64 = 0;
        // Count turns as user messages — each user prompt drives exactly one
        // on_turn_end call. Counting assistant messages would overcount because
        // a single user turn can produce multiple assistant messages (tool call
        // + final response).
        let mut user_count: u32 = 0;

        for entry in ordered {
            match entry {
                SessionEntry::Message(m) => {
                    // Accumulate usage from persisted assistant messages and
                    // count turns by user messages.
                    match &m.message {
                        Message::Assistant(a) => {
                            total_input += a.usage.input_tokens as u64;
                            total_output += a.usage.output_tokens as u64;
                            total_cache_read += a.usage.cache_read_tokens as u64;
                            total_cache_write += a.usage.cache_write_tokens as u64;
                        }
                        Message::User(_) => {
                            user_count += 1;
                        }
                        _ => {}
                    }
                    entries.push(Entry {
                        id: m.id.clone(),
                        message: AgentMessage::Llm(m.message.clone()),
                    });
                    indices.push(agent_idx);
                    agent_idx += 1;
                }
                SessionEntry::Compaction(c) => {
                    let kept_start = entries.iter().position(|e| e.id == c.first_kept_entry_id);
                    let kept: Vec<Entry> = match kept_start {
                        Some(idx) => entries.split_off(idx),
                        None => Vec::new(),
                    };
                    let kept_count = kept.len();
                    // Rebuild entries with the compaction summary at index 0,
                    // followed by the kept tail. This mirrors the runtime
                    // compaction layout so a subsequent compaction sees the
                    // full context including prior summaries.
                    let summary_entry = Entry {
                        id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
                        message: AgentMessage::CompactionSummary(CompactionSummaryMessage {
                            summary: c.summary.clone(),
                            first_kept_entry_id: c.first_kept_entry_id.clone(),
                            tokens_before: c.tokens_before,
                            tokens_after: c.tokens_after,
                        }),
                    };
                    let mut rebuilt = Vec::with_capacity(1 + kept_count);
                    rebuilt.push(summary_entry);
                    rebuilt.extend(kept);
                    entries = rebuilt;
                    indices = (0..=kept_count).collect();
                    agent_idx = 1 + kept_count;
                }
                SessionEntry::Leaf(_) => {}
                _ => {}
            }
        }

        let usage = CumulativeUsage::from_totals(
            total_input,
            total_output,
            total_cache_read,
            total_cache_write,
            user_count,
        );
        let watermark = usage.as_usage().total_tokens();

        Ok(Self {
            writer,
            compaction: CompactionEngine::new(compaction_config),
            usage,
            session_id,
            session_path: path,
            model: model.into(),
            entries,
            agent_message_indices: indices,
            agent_message_count: prior_agent_message_count,
            compaction_watermark_tokens: watermark,
        })
    }

    /// Persist only the new messages from a completed turn.
    ///
    /// `new_messages` should contain only the messages produced during this
    /// turn (not the full conversation history). The caller is responsible for
    /// slicing appropriately. `turn_start_agent_index` is the index in the
    /// Agent's full message buffer where `new_messages[0]` lives.
    ///
    /// Returns `Ok(Some(CompactionReason))` if compaction should be triggered
    /// (the caller should emit `CompactionStart`, then call
    /// `execute_compaction`). Returns `Ok(None)` if no compaction is needed.
    /// Returns `Err` if a session write failed.
    pub fn on_turn_end(
        &mut self,
        new_messages: &[AgentMessage],
        usage: &Usage,
        turn_start_agent_index: usize,
    ) -> Result<Option<CompactionReason>, std::io::Error> {
        self.usage.accumulate(usage);

        let mut agent_idx = turn_start_agent_index;
        for msg in new_messages {
            if let AgentMessage::Llm(m) = msg {
                let entry_id = format!("msg-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed));
                let entry = SessionEntry::Message(MessageEntry {
                    id: entry_id.clone(),
                    parent_id: None,
                    timestamp: now_iso(),
                    message: m.clone(),
                });
                self.writer.append(&entry)?;
                self.entries.push(Entry {
                    id: entry_id,
                    message: msg.clone(),
                });
                self.agent_message_indices.push(agent_idx);
            }
            agent_idx += 1;
        }
        self.agent_message_count = agent_idx;

        // Check threshold-based compaction after each turn.
        // Use tokens accumulated since the last compaction (watermark) so
        // compaction doesn't re-trigger every turn after the first crossing.
        let total_tokens = self.usage.as_usage().total_tokens();
        let delta = total_tokens.saturating_sub(self.compaction_watermark_tokens);
        if self
            .compaction
            .should_compact(delta, CompactionReason::Threshold)
        {
            Ok(Some(CompactionReason::Threshold))
        } else {
            Ok(None)
        }
    }

    /// Execute compaction after `on_turn_end` returned `Some(reason)`.
    /// The caller should emit `CompactionStart` before calling this and
    /// `CompactionEnd` afterwards.
    ///
    /// Returns `Err` if the compaction marker could not be persisted — in this
    /// case the in-memory state is left unchanged (no buffer replacement, no
    /// watermark advance) so the session file stays consistent with the
    /// runtime.
    pub fn execute_compaction(
        &mut self,
        reason: CompactionReason,
    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
        self.run_compaction(reason)
    }

    /// Backwards-compatible variant used by tests that don't track Agent indices.
    /// Assumes `new_messages` are appended starting at the current message count.
    /// Runs compaction inline if needed (no separate event emission).
    pub fn on_turn_end_simple(
        &mut self,
        new_messages: &[AgentMessage],
        usage: &Usage,
    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
        let start = self.agent_message_count;
        let reason = self.on_turn_end(new_messages, usage, start)?;
        match reason {
            Some(r) => self.execute_compaction(r),
            None => Ok(None),
        }
    }

    fn run_compaction(
        &mut self,
        reason: CompactionReason,
    ) -> Result<Option<CompactionResultOutput>, std::io::Error> {
        let hooks = DefaultCompactionHooks;
        match self.compaction.compact(&self.entries, reason, &hooks) {
            Ok(output) => {
                let split = self.entries.len() - output.kept_entries.len();
                let kept_indices: Vec<usize> = self
                    .agent_message_indices
                    .iter()
                    .skip(split)
                    .copied()
                    .collect();
                let kept_messages: Vec<AgentMessage> = output
                    .kept_entries
                    .iter()
                    .map(|e| e.message.clone())
                    .collect();

                let summary = CompactionSummaryMessage {
                    summary: output.summary_text.clone(),
                    first_kept_entry_id: output.first_kept_entry_id.clone(),
                    tokens_before: output.tokens_before,
                    tokens_after: output.tokens_after,
                };

                let compaction_entry = SessionEntry::Compaction(CompactionEntry {
                    id: format!("cmp-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
                    parent_id: None,
                    timestamp: now_iso(),
                    summary: output.summary_text.clone(),
                    first_kept_entry_id: output.first_kept_entry_id.clone(),
                    tokens_before: output.tokens_before,
                    tokens_after: output.tokens_after,
                });

                // Persist the compaction marker BEFORE mutating in-memory state.
                // If this fails, the runtime context remains un-compacted so
                // the session file and memory stay consistent.
                self.writer.append(&compaction_entry)?;

                // Reset internal entries to [summary, ...kept]. The summary
                // must be included so that a subsequent compaction can see the
                // full context including earlier compaction summaries.
                let mut new_entries = Vec::with_capacity(1 + output.kept_entries.len());
                new_entries.push(Entry {
                    id: format!("sum-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
                    message: AgentMessage::CompactionSummary(summary.clone()),
                });
                new_entries.extend(output.kept_entries);
                self.entries = new_entries;
                self.agent_message_indices = (0..=kept_indices.len()).collect();
                self.agent_message_count = 1 + kept_messages.len();

                // Advance the watermark so the next threshold check measures
                // tokens accumulated from this point forward.
                self.compaction_watermark_tokens = self.usage.as_usage().total_tokens();

                // Build the new Agent buffer: [summary, ...kept].
                let mut new_agent_messages = Vec::with_capacity(1 + kept_messages.len());
                new_agent_messages.push(AgentMessage::CompactionSummary(summary.clone()));
                new_agent_messages.extend(kept_messages);

                Ok(Some(CompactionResultOutput {
                    summary,
                    new_agent_messages,
                    reason: output.reason,
                    tokens_before: output.tokens_before,
                    tokens_after: output.tokens_after,
                    first_kept_entry_id: output.first_kept_entry_id,
                }))
            }
            Err(_) => {
                // Nothing to compact (too few entries) — no-op.
                Ok(None)
            }
        }
    }

    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    pub fn session_path(&self) -> &Path {
        &self.session_path
    }

    pub fn usage(&self) -> &CumulativeUsage {
        &self.usage
    }

    pub fn compaction_engine(&self) -> &CompactionEngine {
        &self.compaction
    }

    /// Read-only view of the entries currently tracked for compaction.
    /// Exposed for tests that need to assert resume correctness.
    pub fn compaction_entries(&self) -> &[Entry] {
        &self.entries
    }

    /// Append a Leaf pointer marking the selected active branch tip.
    pub fn append_leaf(&mut self, entry_id: &str) -> Result<(), std::io::Error> {
        let entry = SessionEntry::Leaf(LeafEntry {
            id: format!("leaf-{}", ENTRY_SEQ.fetch_add(1, Ordering::Relaxed)),
            parent_id: None,
            timestamp: now_iso(),
            entry_id: entry_id.to_owned(),
        });
        self.writer.append(&entry)
    }

    /// Compute the cost summary from the accumulated usage and the model
    /// pricing table. Returns `None` if no pricing is known for the model.
    pub fn cost_summary(&self) -> Option<opi_ai::stream::CostBreakdown> {
        let pricing = lookup_pricing(&self.model)?;
        Some(opi_ai::stream::calculate_cost(
            &self.usage.as_usage(),
            &pricing,
        ))
    }

    pub fn model(&self) -> &str {
        &self.model
    }
}

/// Extract the numeric suffix from entry IDs like `msg-3` or `cmp-7`.
/// Returns 0 for entries that don't match the pattern.
fn entry_seq(id: &str) -> u64 {
    id.split_once('-')
        .and_then(|(_, rest)| rest.parse::<u64>().ok())
        .unwrap_or(0)
}

/// Advance the global `ENTRY_SEQ` past any IDs found in existing session
/// entries so resumed sessions don't produce duplicate IDs.
fn advance_seq_from_entries(entries: &[SessionEntry]) {
    let max_seq = entries
        .iter()
        .map(|e| entry_seq(e.entry_id()))
        .max()
        .unwrap_or(0);
    if max_seq > 0 {
        ENTRY_SEQ.fetch_max(max_seq + 1, Ordering::Relaxed);
    }
}
pub fn to_wire_result(out: &CompactionResultOutput) -> CompactionResult {
    CompactionResult {
        summary: out.summary.summary.clone(),
        first_kept_entry_id: out.first_kept_entry_id.clone(),
        tokens_before: out.tokens_before,
        tokens_after: out.tokens_after,
    }
}

fn generate_session_id() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let ts = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis();
    format!("{ts:x}")
}

fn now_iso() -> String {
    let secs = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs();
    let days = secs / 86400;
    let tod = secs % 86400;
    let h = tod / 3600;
    let m = (tod % 3600) / 60;
    let s = tod % 60;
    let (y, mo, d) = days_to_ymd(days);
    format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z")
}

fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
    let mut year = 1970u64;
    loop {
        let diy = if is_leap(year) { 366 } else { 365 };
        if days < diy {
            break;
        }
        days -= diy;
        year += 1;
    }
    let md = [
        31,
        if is_leap(year) { 29 } else { 28 },
        31,
        30,
        31,
        30,
        31,
        31,
        30,
        31,
        30,
        31,
    ];
    let mut month = 0u64;
    for &d in &md {
        if days < d {
            break;
        }
        days -= d;
        month += 1;
    }
    (year, month + 1, days + 1)
}

fn is_leap(y: u64) -> bool {
    (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
}