Skip to main content

omni_dev/voice/
session.rs

1//! `~/.omni-dev/voice/<id>/` session directory I/O.
2//!
3//! Lays out and reads the session directory format from #799:
4//!
5//! ```text
6//! ~/.omni-dev/voice/<session-id>/
7//!   meta.yaml          # session config (this issue: last_reflected_event_id + ttl defaults)
8//!   transcript.jsonl   # append-only TranscriptEvent stream from `voice transcribe`
9//!   events.jsonl       # append-only Event stream from `voice reflect` (and later `voice review`)
10//!   reflections.log    # per-reflection summary line (cost, latency, status)
11//! ```
12//!
13//! Shared with #804 (`voice review`), which reads the same `events.jsonl`
14//! to produce materialised markdown projections. The session root path is
15//! derived from `dirs::home_dir()` by default; the `OMNI_DEV_VOICE_ROOT`
16//! environment variable overrides it (intended for tests, not a stable
17//! user-facing knob).
18
19use std::fs::{File, OpenOptions};
20use std::io::{BufRead, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22use std::time::Duration;
23
24use anyhow::{bail, Context, Result};
25use serde::{Deserialize, Serialize};
26
27use crate::voice::events::Event;
28use crate::voice::{EventId, TranscriptEvent};
29
30/// Filesystem paths under a single session directory.
31#[derive(Debug, Clone)]
32pub struct SessionPaths {
33    /// Session root (`<voice-root>/<id>`).
34    pub root: PathBuf,
35    /// `meta.yaml` — session config (parsed into [`SessionMeta`]).
36    pub meta: PathBuf,
37    /// `transcript.jsonl` — `TranscriptEvent` log.
38    pub transcript: PathBuf,
39    /// `events.jsonl` — reflection [`Event`] log.
40    pub events: PathBuf,
41    /// `reflections.log` — per-reflection summary lines.
42    pub log: PathBuf,
43}
44
45impl SessionPaths {
46    /// Builds [`SessionPaths`] under `voice_root/<id>` without touching disk.
47    #[must_use]
48    pub fn under(voice_root: &Path, id: &str) -> Self {
49        let root = voice_root.join(id);
50        Self {
51            meta: root.join("meta.yaml"),
52            transcript: root.join("transcript.jsonl"),
53            events: root.join("events.jsonl"),
54            log: root.join("reflections.log"),
55            root,
56        }
57    }
58}
59
60/// Default TTLs per item class (per #799), stored in `meta.yaml` so a
61/// session can override them. Serialised as integer seconds.
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct TtlDefaults {
64    /// TTL for `class: todo` items.
65    #[serde(with = "ttl_secs")]
66    pub todo: Duration,
67    /// TTL for `class: research` items.
68    #[serde(with = "ttl_secs")]
69    pub research: Duration,
70    /// TTL for `class: question` items.
71    #[serde(with = "ttl_secs")]
72    pub question: Duration,
73}
74
75impl Default for TtlDefaults {
76    fn default() -> Self {
77        Self {
78            todo: Duration::from_secs(7 * 86_400),
79            research: Duration::from_secs(30 * 86_400),
80            question: Duration::from_secs(14 * 86_400),
81        }
82    }
83}
84
85mod ttl_secs {
86    use serde::{Deserialize, Deserializer, Serializer};
87    use std::time::Duration;
88
89    pub fn serialize<S: Serializer>(d: &Duration, s: S) -> Result<S::Ok, S::Error> {
90        s.serialize_u64(d.as_secs())
91    }
92
93    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
94        let secs = u64::deserialize(d)?;
95        Ok(Duration::from_secs(secs))
96    }
97}
98
99/// Parsed contents of `meta.yaml`.
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
101pub struct SessionMeta {
102    /// `event_id` of the last `TranscriptEvent::Final` consumed by a
103    /// previous `voice reflect` invocation. `None` until the first
104    /// reflection completes.
105    #[serde(default)]
106    pub last_reflected_event_id: Option<EventId>,
107    /// TTL defaults applied at projection time (consumed by #804).
108    #[serde(default)]
109    pub ttl_defaults: TtlDefaults,
110}
111
112/// Combination of paths and the parsed meta document.
113#[derive(Debug, Clone)]
114pub struct Session {
115    /// On-disk paths under the session root.
116    pub paths: SessionPaths,
117    /// Parsed `meta.yaml` contents.
118    pub meta: SessionMeta,
119}
120
121impl Session {
122    /// Reads all `Final` transcript events from `transcript.jsonl` after
123    /// `meta.last_reflected_event_id`. Non-`Final` events are skipped —
124    /// reflection is driven by committed text only.
125    pub fn read_transcript_finals_after(&self) -> Result<Vec<TranscriptEvent>> {
126        read_transcript_finals_after(&self.paths.transcript, self.meta.last_reflected_event_id)
127    }
128
129    /// Reads `events.jsonl` into a [`Vec<Event>`]. Empty when the file
130    /// doesn't exist or contains no events.
131    pub fn read_events(&self) -> Result<Vec<Event>> {
132        read_events(&self.paths.events)
133    }
134
135    /// Appends events to `events.jsonl`.
136    pub fn append_events(&self, events: &[Event]) -> Result<()> {
137        append_events(&self.paths.events, events)
138    }
139
140    /// Updates `meta.last_reflected_event_id` in memory and on disk.
141    pub fn set_last_reflected(&mut self, id: EventId) -> Result<()> {
142        self.meta.last_reflected_event_id = Some(id);
143        write_meta(&self.paths.meta, &self.meta)
144    }
145
146    /// Appends a single line to `reflections.log` (no implicit newline).
147    pub fn append_log(&self, line: &str) -> Result<()> {
148        append_log_line(&self.paths.log, line)
149    }
150}
151
152/// Resolves the session root: `$OMNI_DEV_VOICE_ROOT` if set, else
153/// `~/.omni-dev/voice`.
154pub fn voice_root() -> Result<PathBuf> {
155    if let Ok(override_root) = std::env::var("OMNI_DEV_VOICE_ROOT") {
156        return Ok(PathBuf::from(override_root));
157    }
158    let home = dirs::home_dir().context(
159        "could not determine HOME directory for ~/.omni-dev/voice; \
160         set OMNI_DEV_VOICE_ROOT to override",
161    )?;
162    Ok(home.join(".omni-dev").join("voice"))
163}
164
165/// Opens an existing session, or creates an empty one if the directory
166/// doesn't exist. Bootstrap is idempotent: re-running against an
167/// already-populated session reads the existing `meta.yaml`.
168pub fn open_or_create(id: &str) -> Result<Session> {
169    let root = voice_root()?;
170    open_or_create_under(&root, id)
171}
172
173/// Variant of [`open_or_create`] that takes an explicit voice root —
174/// useful for tests that drive several sessions under a `tempfile`
175/// directory.
176pub fn open_or_create_under(voice_root: &Path, id: &str) -> Result<Session> {
177    if id.is_empty() {
178        bail!("session id cannot be empty");
179    }
180    if id.contains('/') || id.contains('\\') || id == "." || id == ".." {
181        bail!("session id must not contain path separators: {id:?}");
182    }
183    let paths = SessionPaths::under(voice_root, id);
184    std::fs::create_dir_all(&paths.root)
185        .with_context(|| format!("creating session directory at {}", paths.root.display()))?;
186
187    // Bootstrap empty files (touch only — don't truncate existing ones).
188    for p in [&paths.transcript, &paths.events, &paths.log] {
189        if !p.exists() {
190            OpenOptions::new()
191                .create(true)
192                .append(true)
193                .open(p)
194                .with_context(|| format!("creating {}", p.display()))?;
195        }
196    }
197
198    let meta = if paths.meta.exists() {
199        read_meta(&paths.meta)?
200    } else {
201        let m = SessionMeta::default();
202        write_meta(&paths.meta, &m)?;
203        m
204    };
205
206    Ok(Session { paths, meta })
207}
208
209/// Reads and parses `meta.yaml`.
210pub fn read_meta(path: &Path) -> Result<SessionMeta> {
211    let body = std::fs::read_to_string(path)
212        .with_context(|| format!("reading session meta at {}", path.display()))?;
213    serde_yaml::from_str(&body)
214        .with_context(|| format!("parsing session meta at {}", path.display()))
215}
216
217/// Writes `meta.yaml` atomically (write-temp-then-rename).
218pub fn write_meta(path: &Path, meta: &SessionMeta) -> Result<()> {
219    let body = serde_yaml::to_string(meta).context("serialising session meta to YAML")?;
220    let tmp = path.with_extension("yaml.tmp");
221    std::fs::write(&tmp, body.as_bytes())
222        .with_context(|| format!("writing temp meta at {}", tmp.display()))?;
223    std::fs::rename(&tmp, path)
224        .with_context(|| format!("renaming temp meta to {}", path.display()))?;
225    Ok(())
226}
227
228/// Reads all `TranscriptEvent`s from a JSONL file. Blank lines are
229/// skipped; parse errors include the line number.
230pub fn read_transcript(path: &Path) -> Result<Vec<TranscriptEvent>> {
231    let file =
232        File::open(path).with_context(|| format!("opening transcript at {}", path.display()))?;
233    let reader = BufReader::new(file);
234    let mut events = Vec::new();
235    for (idx, line) in reader.lines().enumerate() {
236        let line = line.with_context(|| format!("reading {}:{}", path.display(), idx + 1))?;
237        if line.trim().is_empty() {
238            continue;
239        }
240        let event: TranscriptEvent = serde_json::from_str(&line)
241            .with_context(|| format!("parsing {}:{}", path.display(), idx + 1))?;
242        events.push(event);
243    }
244    Ok(events)
245}
246
247/// Reads only `Final` transcript events after the optional marker.
248///
249/// Uses stream position (not ULID comparison) — finds the marker line
250/// and returns Finals strictly after it. Errors if the marker is set
251/// but not present in the file.
252pub fn read_transcript_finals_after(
253    path: &Path,
254    after: Option<EventId>,
255) -> Result<Vec<TranscriptEvent>> {
256    let all = read_transcript(path)?;
257    let finals: Vec<TranscriptEvent> = all
258        .into_iter()
259        .filter(|e| matches!(e, TranscriptEvent::Final { .. }))
260        .collect();
261    match after {
262        None => Ok(finals),
263        Some(target) => {
264            let pos = finals.iter().position(|e| match e {
265                TranscriptEvent::Final { event_id, .. } => *event_id == target,
266                _ => false,
267            });
268            match pos {
269                Some(idx) => Ok(finals.into_iter().skip(idx + 1).collect()),
270                None => bail!(
271                    "last_reflected_event_id {target} not found in transcript at {}; \
272                     meta.yaml may be inconsistent with transcript.jsonl",
273                    path.display()
274                ),
275            }
276        }
277    }
278}
279
280/// Reads all reflection [`Event`]s from `events.jsonl`. Returns an empty
281/// vec if the file doesn't exist (greenfield session).
282pub fn read_events(path: &Path) -> Result<Vec<Event>> {
283    if !path.exists() {
284        return Ok(Vec::new());
285    }
286    let file =
287        File::open(path).with_context(|| format!("opening events log at {}", path.display()))?;
288    let reader = BufReader::new(file);
289    let mut events = Vec::new();
290    for (idx, line) in reader.lines().enumerate() {
291        let line = line.with_context(|| format!("reading {}:{}", path.display(), idx + 1))?;
292        if line.trim().is_empty() {
293            continue;
294        }
295        let event: Event = serde_json::from_str(&line)
296            .with_context(|| format!("parsing {}:{}", path.display(), idx + 1))?;
297        events.push(event);
298    }
299    Ok(events)
300}
301
302/// Appends events as JSONL to `path`. Each event is one line, flushed
303/// after the batch. Skips silently when `events` is empty.
304pub fn append_events(path: &Path, events: &[Event]) -> Result<()> {
305    if events.is_empty() {
306        return Ok(());
307    }
308    let file = OpenOptions::new()
309        .create(true)
310        .append(true)
311        .open(path)
312        .with_context(|| format!("opening events log for append at {}", path.display()))?;
313    let mut writer = BufWriter::new(file);
314    for e in events {
315        serde_json::to_writer(&mut writer, e)
316            .with_context(|| format!("serialising event to {}", path.display()))?;
317        writer
318            .write_all(b"\n")
319            .with_context(|| format!("appending newline to {}", path.display()))?;
320    }
321    writer
322        .flush()
323        .with_context(|| format!("flushing events log at {}", path.display()))?;
324    Ok(())
325}
326
327/// Appends a single line (with newline) to `reflections.log`. Creates
328/// the file if it does not exist.
329pub fn append_log_line(path: &Path, line: &str) -> Result<()> {
330    let file = OpenOptions::new()
331        .create(true)
332        .append(true)
333        .open(path)
334        .with_context(|| format!("opening reflections log at {}", path.display()))?;
335    let mut writer = BufWriter::new(file);
336    writer
337        .write_all(line.as_bytes())
338        .with_context(|| format!("writing log line to {}", path.display()))?;
339    if !line.ends_with('\n') {
340        writer
341            .write_all(b"\n")
342            .with_context(|| format!("appending newline to {}", path.display()))?;
343    }
344    writer
345        .flush()
346        .with_context(|| format!("flushing reflections log at {}", path.display()))?;
347    Ok(())
348}
349
350#[cfg(test)]
351#[allow(clippy::unwrap_used, clippy::expect_used)]
352mod tests {
353    use super::*;
354    use crate::voice::events::{
355        EventKind, ItemClass, ItemCreate, Provenance, ReflectionId, TranscriptSpan,
356    };
357    use crate::voice::transcriber::EndpointKind;
358    use chrono::TimeZone;
359    use tempfile::TempDir;
360
361    fn fixed_ts() -> chrono::DateTime<chrono::Utc> {
362        chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
363    }
364
365    fn provenance() -> Provenance {
366        Provenance {
367            transcript_span: Some(TranscriptSpan {
368                start_event_id: ulid::Ulid::from_parts(0, 1),
369                end_event_id: ulid::Ulid::from_parts(0, 2),
370            }),
371            model: Some("m".into()),
372            prompt_version: Some("p".into()),
373        }
374    }
375
376    fn make_event(event_id: u128) -> Event {
377        Event {
378            event_id: ulid::Ulid::from_parts(0, event_id),
379            ts: fixed_ts(),
380            reflection_id: ReflectionId::Ulid(ulid::Ulid::from_parts(0, 100)),
381            provenance: provenance(),
382            kind: EventKind::ItemCreate(ItemCreate {
383                item_id: ulid::Ulid::from_parts(0, 500),
384                class: ItemClass::Todo,
385                text: format!("event {event_id}"),
386                priority: None,
387                valid_until: None,
388                tags: None,
389            }),
390        }
391    }
392
393    fn make_final(event_id: u128, text: &str) -> TranscriptEvent {
394        TranscriptEvent::Final {
395            event_id: ulid::Ulid::from_parts(0, event_id),
396            text: text.to_string(),
397            start: Duration::from_millis(0),
398            end: Duration::from_millis(100),
399            confidence: 0.9,
400            words: None,
401            speaker: None,
402            revisable: false,
403        }
404    }
405
406    #[test]
407    fn open_or_create_bootstraps_an_empty_session() {
408        let tmp = TempDir::new().unwrap();
409        let session = open_or_create_under(tmp.path(), "s1").unwrap();
410        assert!(session.paths.meta.exists());
411        assert!(session.paths.transcript.exists());
412        assert!(session.paths.events.exists());
413        assert!(session.paths.log.exists());
414        assert_eq!(session.meta, SessionMeta::default());
415    }
416
417    #[test]
418    fn open_or_create_is_idempotent() {
419        let tmp = TempDir::new().unwrap();
420        let s1 = open_or_create_under(tmp.path(), "s1").unwrap();
421        let s2 = open_or_create_under(tmp.path(), "s1").unwrap();
422        assert_eq!(s1.meta, s2.meta);
423    }
424
425    #[test]
426    fn open_or_create_preserves_existing_meta() {
427        let tmp = TempDir::new().unwrap();
428        let mut s = open_or_create_under(tmp.path(), "s1").unwrap();
429        s.set_last_reflected(ulid::Ulid::from_parts(0, 42)).unwrap();
430        let reopened = open_or_create_under(tmp.path(), "s1").unwrap();
431        assert_eq!(
432            reopened.meta.last_reflected_event_id,
433            Some(ulid::Ulid::from_parts(0, 42))
434        );
435    }
436
437    #[test]
438    fn rejects_session_id_with_path_separator() {
439        let tmp = TempDir::new().unwrap();
440        assert!(open_or_create_under(tmp.path(), "a/b").is_err());
441        assert!(open_or_create_under(tmp.path(), "a\\b").is_err());
442        assert!(open_or_create_under(tmp.path(), "..").is_err());
443        assert!(open_or_create_under(tmp.path(), ".").is_err());
444        assert!(open_or_create_under(tmp.path(), "").is_err());
445    }
446
447    #[test]
448    fn ttl_defaults_match_799_defaults() {
449        let t = TtlDefaults::default();
450        assert_eq!(t.todo, Duration::from_secs(7 * 86_400));
451        assert_eq!(t.research, Duration::from_secs(30 * 86_400));
452        assert_eq!(t.question, Duration::from_secs(14 * 86_400));
453    }
454
455    #[test]
456    fn meta_yaml_round_trip_preserves_optional_marker() {
457        let tmp = TempDir::new().unwrap();
458        let path = tmp.path().join("meta.yaml");
459        let meta = SessionMeta {
460            last_reflected_event_id: Some(ulid::Ulid::from_parts(0, 7)),
461            ttl_defaults: TtlDefaults::default(),
462        };
463        write_meta(&path, &meta).unwrap();
464        let back = read_meta(&path).unwrap();
465        assert_eq!(meta, back);
466    }
467
468    #[test]
469    fn append_then_read_events_round_trips() {
470        let tmp = TempDir::new().unwrap();
471        let path = tmp.path().join("events.jsonl");
472        append_events(&path, &[make_event(1), make_event(2)]).unwrap();
473        let back = read_events(&path).unwrap();
474        assert_eq!(back.len(), 2);
475        assert_eq!(back[0], make_event(1));
476        assert_eq!(back[1], make_event(2));
477    }
478
479    #[test]
480    fn append_events_with_empty_slice_is_noop() {
481        let tmp = TempDir::new().unwrap();
482        let path = tmp.path().join("events.jsonl");
483        append_events(&path, &[]).unwrap();
484        assert!(!path.exists());
485    }
486
487    #[test]
488    fn read_events_on_missing_file_returns_empty() {
489        let tmp = TempDir::new().unwrap();
490        let result = read_events(&tmp.path().join("nothing.jsonl")).unwrap();
491        assert!(result.is_empty());
492    }
493
494    #[test]
495    fn read_transcript_finals_after_filters_partials_and_endpoints() {
496        let tmp = TempDir::new().unwrap();
497        let path = tmp.path().join("transcript.jsonl");
498        std::fs::write(
499            &path,
500            format!(
501                "{}\n{}\n{}\n",
502                serde_json::to_string(&TranscriptEvent::Partial {
503                    text: "ignored".into(),
504                    start: Duration::ZERO,
505                    end: Duration::from_millis(50),
506                    words: None,
507                    speaker: None,
508                })
509                .unwrap(),
510                serde_json::to_string(&make_final(1, "first")).unwrap(),
511                serde_json::to_string(&TranscriptEvent::Endpoint {
512                    at: Duration::from_secs(1),
513                    kind: EndpointKind::StreamEnd,
514                })
515                .unwrap(),
516            ),
517        )
518        .unwrap();
519        let finals = read_transcript_finals_after(&path, None).unwrap();
520        assert_eq!(finals.len(), 1);
521    }
522
523    #[test]
524    fn read_transcript_finals_after_skips_through_marker() {
525        let tmp = TempDir::new().unwrap();
526        let path = tmp.path().join("transcript.jsonl");
527        let lines = [
528            serde_json::to_string(&make_final(1, "a")).unwrap(),
529            serde_json::to_string(&make_final(2, "b")).unwrap(),
530            serde_json::to_string(&make_final(3, "c")).unwrap(),
531        ];
532        std::fs::write(&path, lines.join("\n")).unwrap();
533        let after_id = ulid::Ulid::from_parts(0, 2);
534        let finals = read_transcript_finals_after(&path, Some(after_id)).unwrap();
535        assert_eq!(finals.len(), 1);
536        match &finals[0] {
537            TranscriptEvent::Final { text, .. } => assert_eq!(text, "c"),
538            other => panic!("expected Final, got {other:?}"),
539        }
540    }
541
542    #[test]
543    fn read_transcript_finals_after_errors_when_marker_missing() {
544        let tmp = TempDir::new().unwrap();
545        let path = tmp.path().join("transcript.jsonl");
546        std::fs::write(
547            &path,
548            serde_json::to_string(&make_final(1, "a")).unwrap() + "\n",
549        )
550        .unwrap();
551        let err =
552            read_transcript_finals_after(&path, Some(ulid::Ulid::from_parts(0, 99))).unwrap_err();
553        assert!(
554            err.to_string().contains("not found in transcript"),
555            "got: {err}"
556        );
557    }
558
559    #[test]
560    fn append_log_line_creates_file_and_adds_newline() {
561        let tmp = TempDir::new().unwrap();
562        let path = tmp.path().join("reflections.log");
563        append_log_line(&path, "first entry").unwrap();
564        append_log_line(&path, "second entry\n").unwrap();
565        let contents = std::fs::read_to_string(&path).unwrap();
566        assert_eq!(contents, "first entry\nsecond entry\n");
567    }
568
569    #[test]
570    fn read_transcript_skips_blank_lines() {
571        let tmp = TempDir::new().unwrap();
572        let path = tmp.path().join("transcript.jsonl");
573        std::fs::write(
574            &path,
575            format!(
576                "\n{}\n\n   \n{}\n",
577                serde_json::to_string(&make_final(1, "a")).unwrap(),
578                serde_json::to_string(&make_final(2, "b")).unwrap(),
579            ),
580        )
581        .unwrap();
582        let events = read_transcript(&path).unwrap();
583        assert_eq!(events.len(), 2);
584    }
585
586    #[test]
587    fn read_transcript_reports_parse_failure_with_line_number() {
588        let tmp = TempDir::new().unwrap();
589        let path = tmp.path().join("transcript.jsonl");
590        let good = serde_json::to_string(&make_final(1, "ok")).unwrap();
591        std::fs::write(&path, format!("{good}\nnot valid json\n")).unwrap();
592        let err = read_transcript(&path).unwrap_err();
593        let msg = err.to_string();
594        assert!(
595            msg.contains("parsing") && msg.contains(":2"),
596            "error should point at line 2: {msg}"
597        );
598    }
599
600    #[test]
601    fn read_events_skips_blank_lines() {
602        let tmp = TempDir::new().unwrap();
603        let path = tmp.path().join("events.jsonl");
604        append_events(&path, &[make_event(1)]).unwrap();
605        // Add a blank line after the existing content.
606        use std::io::Write as _;
607        let mut f = std::fs::OpenOptions::new()
608            .append(true)
609            .open(&path)
610            .unwrap();
611        writeln!(f, "\n  ").unwrap();
612        drop(f);
613        let events = read_events(&path).unwrap();
614        assert_eq!(events.len(), 1);
615    }
616
617    #[test]
618    fn read_events_reports_parse_failure_with_line_number() {
619        let tmp = TempDir::new().unwrap();
620        let path = tmp.path().join("events.jsonl");
621        std::fs::write(&path, "not valid json at all\n").unwrap();
622        let err = read_events(&path).unwrap_err();
623        let msg = err.to_string();
624        assert!(
625            msg.contains("parsing") && msg.contains(":1"),
626            "error should point at line 1: {msg}"
627        );
628    }
629
630    #[test]
631    fn voice_root_respects_override_env_var() {
632        // Env mutation is process-wide; restore on exit. No serial guard
633        // here because no other test in this module reads/writes the var.
634        let original = std::env::var("OMNI_DEV_VOICE_ROOT").ok();
635        std::env::set_var("OMNI_DEV_VOICE_ROOT", "/tmp/overridden");
636        let root = voice_root().unwrap();
637        assert_eq!(root, PathBuf::from("/tmp/overridden"));
638        match original {
639            Some(v) => std::env::set_var("OMNI_DEV_VOICE_ROOT", v),
640            None => std::env::remove_var("OMNI_DEV_VOICE_ROOT"),
641        }
642    }
643}