Skip to main content

opi_agent/
session.rs

1//! Session v1 JSONL storage (S9.3).
2//!
3//! Append-only, versioned JSONL format for session persistence. The first line
4//! is a header; subsequent lines are tree entries forming a conversation tree.
5
6use std::io::Write;
7use std::path::Path;
8
9use serde::{Deserialize, Serialize};
10
11/// Current session format version.
12const FORMAT_VERSION: u32 = 1;
13
14/// Session header — the first line of a JSONL file (S9.3).
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SessionHeader {
17    #[serde(rename = "type")]
18    pub type_: String,
19    pub version: u32,
20    pub id: String,
21    pub timestamp: String,
22    pub cwd: String,
23    pub parent_session: Option<String>,
24}
25
26impl SessionHeader {
27    pub fn new(id: String, timestamp: String, cwd: String, parent_session: Option<String>) -> Self {
28        Self {
29            type_: "session".to_owned(),
30            version: FORMAT_VERSION,
31            id,
32            timestamp,
33            cwd,
34            parent_session,
35        }
36    }
37}
38
39/// A message tree entry (S9.3 `message` type).
40///
41/// The `message` field uses the provider-facing `Message` type (S7.1), not
42/// `AgentMessage`. Each S9.3 entry type maps to its own payload structure.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MessageEntry {
45    pub id: String,
46    pub parent_id: Option<String>,
47    pub timestamp: String,
48    pub message: opi_ai::message::Message,
49}
50
51/// A compaction tree entry (S9.3 `compaction` type).
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CompactionEntry {
54    pub id: String,
55    pub parent_id: Option<String>,
56    pub timestamp: String,
57    pub summary: String,
58    pub first_kept_entry_id: String,
59    pub tokens_before: u64,
60    pub tokens_after: u64,
61}
62
63/// A leaf pointer entry (S9.3 `leaf` type).
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct LeafEntry {
66    pub id: String,
67    pub parent_id: Option<String>,
68    pub timestamp: String,
69    pub entry_id: String,
70}
71
72/// All tree entry types (S9.3).
73#[non_exhaustive]
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum SessionEntry {
77    Message(MessageEntry),
78    Compaction(CompactionEntry),
79    Leaf(LeafEntry),
80}
81
82impl SessionEntry {
83    /// Return the entry's unique ID regardless of variant.
84    pub fn entry_id(&self) -> &str {
85        match self {
86            SessionEntry::Message(m) => &m.id,
87            SessionEntry::Compaction(c) => &c.id,
88            SessionEntry::Leaf(l) => &l.id,
89        }
90    }
91}
92
93/// Crash recovery status returned by `SessionReader`.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum CrashRecovery {
96    Clean,
97    TruncatedLine,
98    CorruptEntries {
99        count: usize,
100    },
101    /// Both corruption and truncation detected.
102    CorruptEntriesWithTruncation {
103        count: usize,
104    },
105}
106
107impl CrashRecovery {
108    /// Return the number of corrupt/unparseable entries found during load.
109    pub fn corrupt_count(&self) -> usize {
110        match self {
111            CrashRecovery::Clean | CrashRecovery::TruncatedLine => 0,
112            CrashRecovery::CorruptEntries { count }
113            | CrashRecovery::CorruptEntriesWithTruncation { count } => *count,
114        }
115    }
116}
117
118/// Append-only JSONL writer with crash-safe flush.
119pub struct SessionWriter {
120    file: std::fs::File,
121}
122
123impl SessionWriter {
124    /// Create a new session file with the given header.
125    pub fn create(path: &Path, header: SessionHeader) -> std::io::Result<Self> {
126        let mut file = std::fs::File::create(path)?;
127        let header_json = serde_json::to_string(&header)?;
128        writeln!(file, "{header_json}")?;
129        file.sync_all()?;
130        Ok(Self { file })
131    }
132
133    /// Open an existing session file for appending (seeks to end).
134    ///
135    /// If the file's last line is incomplete (no trailing newline from a
136    /// crashed write), the incomplete tail is truncated so subsequent appends
137    /// land on a clean line boundary.
138    pub fn open(path: &Path) -> std::io::Result<Self> {
139        use std::io::{Read, Seek, SeekFrom};
140
141        // Open read+write (not append) so set_len works on Windows.
142        let mut file = std::fs::OpenOptions::new()
143            .read(true)
144            .write(true)
145            .open(path)?;
146
147        // Check whether the file ends with a newline. If not, truncate the
148        // incomplete trailing line so the first appended entry lands cleanly.
149        let len = file.seek(SeekFrom::End(0))?;
150        if len > 0 {
151            let mut last = [0u8; 1];
152            file.seek(SeekFrom::End(-1))?;
153            file.read_exact(&mut last)?;
154            if last[0] != b'\n' {
155                // Scan backwards for the last newline to find the truncation point.
156                let mut pos = len;
157                let mut buf = [0u8; 1];
158                let mut found_newline = false;
159                loop {
160                    if pos == 0 {
161                        // No newline found — truncate the whole file to empty.
162                        break;
163                    }
164                    pos -= 1;
165                    file.seek(SeekFrom::Start(pos))?;
166                    file.read_exact(&mut buf)?;
167                    if buf[0] == b'\n' {
168                        found_newline = true;
169                        break;
170                    }
171                }
172                // When a newline was found, keep it (truncate to pos+1) so the
173                // next append starts on a fresh line. Without this the prior
174                // complete entry and the new entry would be concatenated on one
175                // line, corrupting the JSONL.
176                file.set_len(if found_newline { pos + 1 } else { pos })?;
177            }
178            file.seek(SeekFrom::End(0))?;
179        }
180
181        Ok(Self { file })
182    }
183
184    /// Append a session entry as a new JSONL line.
185    pub fn append(&mut self, entry: &SessionEntry) -> std::io::Result<()> {
186        let json = serde_json::to_string(entry)?;
187        writeln!(self.file, "{json}")?;
188        self.file.sync_all()
189    }
190}
191
192/// JSONL reader with crash recovery.
193pub struct SessionReader;
194
195impl SessionReader {
196    /// Read all entries from a session file (strict mode — errors on corrupt data).
197    pub fn read_all(path: &Path) -> std::io::Result<(SessionHeader, Vec<SessionEntry>)> {
198        let (header, entries, _recovery) = Self::read_with_recovery(path)?;
199        Ok((header, entries))
200    }
201
202    /// Read all entries with crash recovery metadata.
203    pub fn read_with_recovery(
204        path: &Path,
205    ) -> std::io::Result<(SessionHeader, Vec<SessionEntry>, CrashRecovery)> {
206        let content = std::fs::read_to_string(path)?;
207
208        if content.is_empty() {
209            return Err(std::io::Error::new(
210                std::io::ErrorKind::UnexpectedEof,
211                "empty session file",
212            ));
213        }
214
215        let last_line_incomplete = !content.ends_with('\n') && !content.ends_with('\r');
216
217        // Single-pass: collect lines, then parse.
218        let all_lines: Vec<&str> = content.lines().collect();
219        if all_lines.is_empty() {
220            return Err(std::io::Error::new(
221                std::io::ErrorKind::UnexpectedEof,
222                "empty session file",
223            ));
224        }
225
226        // First line is the header.
227        let header: SessionHeader = serde_json::from_str(all_lines[0]).map_err(|e| {
228            std::io::Error::new(
229                std::io::ErrorKind::InvalidData,
230                format!("invalid session header: {e}"),
231            )
232        })?;
233
234        // Validate header type and version.
235        if header.type_ != "session" {
236            return Err(std::io::Error::new(
237                std::io::ErrorKind::InvalidData,
238                format!("expected header type 'session', got '{}'", header.type_),
239            ));
240        }
241        if header.version != FORMAT_VERSION {
242            return Err(std::io::Error::new(
243                std::io::ErrorKind::InvalidData,
244                format!(
245                    "unsupported session version {}, expected {}",
246                    header.version, FORMAT_VERSION
247                ),
248            ));
249        }
250
251        let data_lines = &all_lines[1..];
252        let total = data_lines.len();
253        let mut entries = Vec::new();
254        let mut corrupt_count = 0;
255
256        for (i, line) in data_lines.iter().enumerate() {
257            if line.trim().is_empty() {
258                continue;
259            }
260            // Skip the last line if the file ended without a newline (truncated write).
261            if last_line_incomplete && i == total - 1 {
262                continue;
263            }
264            match serde_json::from_str::<SessionEntry>(line) {
265                Ok(entry) => entries.push(entry),
266                Err(_) => corrupt_count += 1,
267            }
268        }
269
270        let recovery = match (corrupt_count > 0, last_line_incomplete) {
271            (true, true) => CrashRecovery::CorruptEntriesWithTruncation {
272                count: corrupt_count,
273            },
274            (true, false) => CrashRecovery::CorruptEntries {
275                count: corrupt_count,
276            },
277            (false, true) => CrashRecovery::TruncatedLine,
278            (false, false) => CrashRecovery::Clean,
279        };
280
281        Ok((header, entries, recovery))
282    }
283}