Skip to main content

omni_dev/voice/
reconcile.rs

1//! Pure reconciliation of `events.jsonl` into materialised markdown.
2//!
3//! The `reconcile()` function is a pure function from event log to
4//! markdown + new TTL-expiry events; the `voice review` CLI wrapper
5//! (in [`crate::voice::review`]) handles all I/O around it.
6//!
7//! Reuses [`crate::voice::events::project`] for the bulk of the
8//! reconciliation invariants from #799 (sort-by-event-id,
9//! last-write-wins, idempotent expiry, unknown-id drop,
10//! `reflection.error` skip) and adds TTL eviction on top: any
11//! non-expired non-completed item whose effective expiry has elapsed
12//! gets a synthetic `item.expire { reason: ttl, reflection_id: "review" }`
13//! event minted into `new_expiry_events`. Synthetic events use the
14//! injected [`UlidRng`] for ids so snapshot tests can pin them.
15
16use std::collections::HashMap;
17
18use chrono::{DateTime, Utc};
19
20use crate::voice::det::UlidRng;
21use crate::voice::events::{
22    project, DecisionId, Event, EventKind, ExpireReason, ItemClass, ItemExpire, ItemId,
23    ProjectedDecision, ProjectedItem, Provenance, ReflectionId,
24};
25use crate::voice::render::{render_decisions_md, render_todos_md};
26use crate::voice::session::TtlDefaults;
27use crate::voice::EventId;
28
29/// Markdown + new events produced by a single reconciliation pass.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct ReviewOutput {
32    /// Contents to write to `todos.md`.
33    pub todos_md: String,
34    /// Contents to write to `decisions.md`.
35    pub decisions_md: String,
36    /// Synthetic `item.expire { reason: ttl }` events to append to
37    /// `events.jsonl`. Empty when no items have aged out.
38    pub new_expiry_events: Vec<Event>,
39}
40
41/// Item plus the metadata reconciliation needs but projection drops:
42/// the create-event id (for sort order) and the create-event timestamp
43/// (for class-default TTL computation).
44#[derive(Debug, Clone)]
45pub struct ReconciledItem {
46    /// The projected item (text, class, priority, valid_until, …).
47    pub item: ProjectedItem,
48    /// Event id of the `item.create` that minted the item — used as
49    /// the within-bucket sort key in `todos.md`.
50    pub created_event_id: EventId,
51}
52
53/// Decision plus its create-event id.
54#[derive(Debug, Clone)]
55pub struct ReconciledDecision {
56    /// The projected decision (text, alternatives).
57    pub decision: ProjectedDecision,
58    /// Event id of the `decision.record` — used for the newest-first
59    /// sort in `decisions.md`.
60    pub created_event_id: EventId,
61}
62
63/// Reconciles an event log into materialised markdown and synthesised
64/// TTL-expiry events.
65///
66/// Pure: no I/O, no clock reads. `now` and `rng` are injected so
67/// snapshot tests can pin them.
68///
69/// `defaults` supplies class-default TTLs (used when an `item.create`
70/// omits `valid_until` and no `item.update` ever set it).
71pub fn reconcile(
72    events: &[Event],
73    defaults: &TtlDefaults,
74    now: DateTime<Utc>,
75    rng: &mut dyn UlidRng,
76) -> ReviewOutput {
77    // Projection handles every #799 invariant except TTL.
78    let mut state = project(events.iter().cloned());
79
80    // Walk events once to capture per-item / per-decision creation
81    // metadata that projection doesn't surface (created_ts is needed
82    // for class-default TTL; created_event_id for sort order).
83    let mut item_created: HashMap<ItemId, (DateTime<Utc>, EventId)> = HashMap::new();
84    let mut decision_created: HashMap<DecisionId, EventId> = HashMap::new();
85    let mut sorted = events.to_vec();
86    sorted.sort_by_key(|e| e.event_id);
87    for event in &sorted {
88        match &event.kind {
89            EventKind::ItemCreate(c) => {
90                item_created
91                    .entry(c.item_id)
92                    .or_insert((event.ts, event.event_id));
93            }
94            EventKind::DecisionRecord(d) => {
95                decision_created
96                    .entry(d.decision_id)
97                    .or_insert(event.event_id);
98            }
99            _ => {}
100        }
101    }
102
103    // TTL pass — synthesise `item.expire` for each non-expired,
104    // non-completed item whose effective expiry has elapsed.
105    let mut new_expiry_events = Vec::new();
106    for (id, item) in &mut state.items {
107        if item.expired.is_some() || item.completed {
108            continue;
109        }
110        let Some((created_ts, _)) = item_created.get(id) else {
111            // Item exists in projection without a create event — only
112            // possible if projection's drop-unknown invariant lets a
113            // bare update through, but it doesn't. Defensive skip.
114            continue;
115        };
116        let effective_expiry = item
117            .valid_until
118            .unwrap_or_else(|| *created_ts + ttl_for(&item.class, defaults));
119        if effective_expiry > now {
120            continue;
121        }
122        let event = Event {
123            event_id: rng.next_ulid(),
124            ts: now,
125            reflection_id: ReflectionId::Review,
126            provenance: Provenance {
127                transcript_span: None,
128                model: None,
129                prompt_version: None,
130            },
131            kind: EventKind::ItemExpire(ItemExpire {
132                item_id: *id,
133                reason: ExpireReason::Ttl,
134                superseded_by: None,
135            }),
136        };
137        new_expiry_events.push(event);
138        item.expired = Some(ExpireReason::Ttl);
139    }
140
141    // Render todos: non-expired, non-completed, class ∈ {Todo, Question}.
142    let todos: Vec<ReconciledItem> = state
143        .items
144        .values()
145        .filter(|i| i.expired.is_none() && !i.completed)
146        .filter(|i| matches!(i.class, ItemClass::Todo | ItemClass::Question))
147        .filter_map(|i| {
148            item_created.get(&i.id).map(|(_, eid)| ReconciledItem {
149                item: i.clone(),
150                created_event_id: *eid,
151            })
152        })
153        .collect();
154    let todos_md = render_todos_md(&todos);
155
156    // Render decisions in insertion order (which is event-id order, by
157    // construction of project()); render_decisions_md applies the
158    // newest-first sort.
159    let decisions: Vec<ReconciledDecision> = state
160        .decisions
161        .iter()
162        .filter_map(|d| {
163            decision_created.get(&d.id).map(|eid| ReconciledDecision {
164                decision: d.clone(),
165                created_event_id: *eid,
166            })
167        })
168        .collect();
169    let decisions_md = render_decisions_md(&decisions);
170
171    ReviewOutput {
172        todos_md,
173        decisions_md,
174        new_expiry_events,
175    }
176}
177
178fn ttl_for(class: &ItemClass, defaults: &TtlDefaults) -> chrono::Duration {
179    let std_dur = match class {
180        ItemClass::Todo => defaults.todo,
181        ItemClass::Research => defaults.research,
182        ItemClass::Question => defaults.question,
183    };
184    // `chrono::Duration::from_std` only fails on durations exceeding
185    // ~292 billion years; a malformed config that hits that bound
186    // collapses to zero TTL (the corrupt-input-expires-immediately
187    // policy).
188    chrono::Duration::from_std(std_dur).unwrap_or(chrono::Duration::zero())
189}
190
191#[cfg(test)]
192#[allow(clippy::unwrap_used, clippy::expect_used)]
193mod tests {
194    use super::*;
195    use crate::voice::det::CountingUlidRng;
196    use crate::voice::events::{
197        DecisionRecord, ItemComplete, ItemCreate, ItemUpdate, Priority, TranscriptSpan,
198    };
199    use chrono::TimeZone;
200
201    fn now() -> DateTime<Utc> {
202        Utc.with_ymd_and_hms(2026, 6, 1, 0, 0, 0).unwrap()
203    }
204
205    fn provenance() -> Provenance {
206        Provenance {
207            transcript_span: Some(TranscriptSpan {
208                start_event_id: ulid::Ulid::from_parts(0, 1),
209                end_event_id: ulid::Ulid::from_parts(0, 2),
210            }),
211            model: Some("m".into()),
212            prompt_version: Some("p".into()),
213        }
214    }
215
216    fn event_at(event_id: u128, ts: DateTime<Utc>, kind: EventKind) -> Event {
217        Event {
218            event_id: ulid::Ulid::from_parts(0, event_id),
219            ts,
220            reflection_id: ReflectionId::Ulid(ulid::Ulid::from_parts(0, 100)),
221            provenance: provenance(),
222            kind,
223        }
224    }
225
226    fn id(n: u128) -> ItemId {
227        ulid::Ulid::from_parts(0, n)
228    }
229
230    fn create_todo(eid: u128, item: u128, text: &str, ts: DateTime<Utc>) -> Event {
231        event_at(
232            eid,
233            ts,
234            EventKind::ItemCreate(ItemCreate {
235                item_id: id(item),
236                class: ItemClass::Todo,
237                text: text.into(),
238                priority: None,
239                valid_until: None,
240                tags: None,
241            }),
242        )
243    }
244
245    #[test]
246    fn ttl_expiry_emits_synthetic_event_for_overdue_item() {
247        let created_ts = now() - chrono::Duration::days(10);
248        let valid_until = now() - chrono::Duration::days(1);
249        let events = vec![event_at(
250            1,
251            created_ts,
252            EventKind::ItemCreate(ItemCreate {
253                item_id: id(50),
254                class: ItemClass::Todo,
255                text: "expired".into(),
256                priority: None,
257                valid_until: Some(valid_until),
258                tags: None,
259            }),
260        )];
261        let mut rng = CountingUlidRng::new();
262        let out = reconcile(&events, &TtlDefaults::default(), now(), &mut rng);
263        assert_eq!(out.new_expiry_events.len(), 1);
264        let e = &out.new_expiry_events[0];
265        assert_eq!(e.reflection_id, ReflectionId::Review);
266        assert!(e.provenance.transcript_span.is_none());
267        assert!(e.provenance.model.is_none());
268        assert!(e.provenance.prompt_version.is_none());
269        match &e.kind {
270            EventKind::ItemExpire(ie) => {
271                assert_eq!(ie.item_id, id(50));
272                assert_eq!(ie.reason, ExpireReason::Ttl);
273            }
274            other => panic!("expected ItemExpire, got {other:?}"),
275        }
276    }
277
278    #[test]
279    fn ttl_class_default_used_when_valid_until_absent() {
280        // Todo default is 7 days; create at T-8d expires, T-5d does not.
281        let stale = create_todo(1, 1, "stale", now() - chrono::Duration::days(8));
282        let fresh = create_todo(2, 2, "fresh", now() - chrono::Duration::days(5));
283        let mut rng = CountingUlidRng::new();
284        let out = reconcile(&[stale, fresh], &TtlDefaults::default(), now(), &mut rng);
285        assert_eq!(out.new_expiry_events.len(), 1);
286        let expired_id = match &out.new_expiry_events[0].kind {
287            EventKind::ItemExpire(ie) => ie.item_id,
288            _ => panic!(),
289        };
290        assert_eq!(expired_id, id(1));
291    }
292
293    #[test]
294    fn ttl_pass_idempotent() {
295        let stale = create_todo(1, 1, "stale", now() - chrono::Duration::days(8));
296        let mut rng = CountingUlidRng::new();
297        let first = reconcile(
298            std::slice::from_ref(&stale),
299            &TtlDefaults::default(),
300            now(),
301            &mut rng,
302        );
303        assert_eq!(first.new_expiry_events.len(), 1);
304        // Replay: the original event + the synthesised expiry event,
305        // run through reconcile again — no new expiries should fire.
306        let mut combined = vec![stale];
307        combined.extend(first.new_expiry_events);
308        let second = reconcile(&combined, &TtlDefaults::default(), now(), &mut rng);
309        assert!(second.new_expiry_events.is_empty());
310    }
311
312    #[test]
313    fn omits_completed_and_already_expired_items() {
314        let e_create = create_todo(1, 1, "do it", now() - chrono::Duration::days(1));
315        let e_complete = event_at(
316            2,
317            now(),
318            EventKind::ItemComplete(ItemComplete {
319                item_id: id(1),
320                note: None,
321            }),
322        );
323        let mut rng = CountingUlidRng::new();
324        let out = reconcile(
325            &[e_create, e_complete],
326            &TtlDefaults::default(),
327            now(),
328            &mut rng,
329        );
330        assert!(out.new_expiry_events.is_empty());
331        assert!(out.todos_md.lines().all(|l| !l.contains("do it")));
332    }
333
334    #[test]
335    fn todos_md_groups_by_priority_and_sorts() {
336        let high = event_at(
337            1,
338            now(),
339            EventKind::ItemCreate(ItemCreate {
340                item_id: id(10),
341                class: ItemClass::Todo,
342                text: "high one".into(),
343                priority: Some(Priority::High),
344                valid_until: None,
345                tags: None,
346            }),
347        );
348        let normal = create_todo(2, 11, "normal one", now());
349        let low = event_at(
350            3,
351            now(),
352            EventKind::ItemCreate(ItemCreate {
353                item_id: id(12),
354                class: ItemClass::Todo,
355                text: "low one".into(),
356                priority: Some(Priority::Low),
357                valid_until: None,
358                tags: None,
359            }),
360        );
361        let mut rng = CountingUlidRng::new();
362        let out = reconcile(
363            &[normal, low, high],
364            &TtlDefaults::default(),
365            now(),
366            &mut rng,
367        );
368        let high_pos = out.todos_md.find("high one").unwrap();
369        let normal_pos = out.todos_md.find("normal one").unwrap();
370        let low_pos = out.todos_md.find("low one").unwrap();
371        assert!(high_pos < normal_pos && normal_pos < low_pos);
372    }
373
374    #[test]
375    fn decisions_md_sorts_newest_first() {
376        let older = event_at(
377            5,
378            now(),
379            EventKind::DecisionRecord(DecisionRecord {
380                decision_id: id(50),
381                text: "older".into(),
382                alternatives: None,
383            }),
384        );
385        let newer = event_at(
386            7,
387            now(),
388            EventKind::DecisionRecord(DecisionRecord {
389                decision_id: id(51),
390                text: "newer".into(),
391                alternatives: Some(vec!["alt".into()]),
392            }),
393        );
394        let mut rng = CountingUlidRng::new();
395        let out = reconcile(&[older, newer], &TtlDefaults::default(), now(), &mut rng);
396        let newer_pos = out.decisions_md.find("newer").unwrap();
397        let older_pos = out.decisions_md.find("older").unwrap();
398        assert!(newer_pos < older_pos);
399    }
400
401    #[test]
402    fn update_keeps_valid_until_authoritative_for_ttl() {
403        // Create at T-10d with explicit valid_until at T-1d (expired);
404        // an update at T-5d bumps valid_until to T+5d. Reconciliation
405        // should pick up the update and *not* synthesise an expiry.
406        let create = event_at(
407            1,
408            now() - chrono::Duration::days(10),
409            EventKind::ItemCreate(ItemCreate {
410                item_id: id(1),
411                class: ItemClass::Todo,
412                text: "x".into(),
413                priority: None,
414                valid_until: Some(now() - chrono::Duration::days(1)),
415                tags: None,
416            }),
417        );
418        let update = event_at(
419            2,
420            now() - chrono::Duration::days(5),
421            EventKind::ItemUpdate(ItemUpdate {
422                item_id: id(1),
423                valid_until: Some(now() + chrono::Duration::days(5)),
424                ..Default::default()
425            }),
426        );
427        let mut rng = CountingUlidRng::new();
428        let out = reconcile(&[create, update], &TtlDefaults::default(), now(), &mut rng);
429        assert!(
430            out.new_expiry_events.is_empty(),
431            "{:?}",
432            out.new_expiry_events
433        );
434    }
435}