Skip to main content

bones_core/
compact.rs

1//! Lattice-based log compaction for the bones event log.
2//!
3//! Over time the event log grows. Compaction replaces event sequences for
4//! completed items with a single `item.snapshot` event — the semilattice join
5//! of all events for that item. Compaction is coordination-free: each replica
6//! compacts independently and converges to identical state.
7//!
8//! # Snapshot Semantics
9//!
10//! **Snapshots are lattice elements, not regular updates.**
11//!
12//! - For every LWW field the snapshot carries the winning `(stamp, wall_ts,
13//!   agent_id, event_hash, value)` tuple — not just the value.
14//! - For OR-Sets and G-Sets the snapshot carries the full set state.
15//! - Applying a snapshot uses `merge(state, snapshot_state)` — a field-wise
16//!   lattice join, *not* "overwrite with snapshot clock".
17//!
18//! This is critical: if a snapshot used a single event clock for all fields,
19//! it would incorrectly dominate concurrent events that were not observed at
20//! compaction time, violating semantic preservation.
21//!
22//! # Redaction Interaction
23//!
24//! Snapshots check the redaction set before including field values. Compaction
25//! must never reintroduce redacted content.
26//!
27//! # Audit Metadata
28//!
29//! Each snapshot carries `_compacted_from` (count of original events),
30//! `_earliest_ts`, and `_latest_ts` timestamps for audit trail.
31
32use std::collections::BTreeMap;
33use std::collections::HashSet;
34
35use anyhow::{Context, Result, bail};
36use serde::{Deserialize, Serialize};
37
38use crate::clock::itc::Stamp;
39use crate::crdt::OrSet;
40use crate::crdt::gset::GSet;
41use crate::crdt::item_state::WorkItemState;
42use crate::crdt::lww::LwwRegister;
43use crate::crdt::state::{EpochPhaseState, Phase};
44use crate::event::Event;
45use crate::event::data::{EventData, SnapshotData};
46use crate::event::types::EventType;
47use crate::event::writer;
48use crate::model::item::{Kind, Size, Urgency};
49use crate::model::item_id::ItemId;
50
51// ---------------------------------------------------------------------------
52// Snapshot payload — per-field CRDT clocks
53// ---------------------------------------------------------------------------
54
55/// Serializable representation of a single LWW register with its clock.
56///
57/// Preserves the full tie-breaking chain for correct lattice merge.
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct LwwSnapshot<T> {
60    pub value: T,
61    pub stamp: Stamp,
62    pub wall_ts: u64,
63    pub agent_id: String,
64    pub event_hash: String,
65}
66
67impl<T: Clone> From<&LwwRegister<T>> for LwwSnapshot<T> {
68    fn from(reg: &LwwRegister<T>) -> Self {
69        Self {
70            value: reg.value.clone(),
71            stamp: reg.stamp.clone(),
72            wall_ts: reg.wall_ts,
73            agent_id: reg.agent_id.clone(),
74            event_hash: reg.event_hash.clone(),
75        }
76    }
77}
78
79impl<T: Clone> From<&LwwSnapshot<T>> for LwwRegister<T> {
80    fn from(snap: &LwwSnapshot<T>) -> Self {
81        Self {
82            value: snap.value.clone(),
83            stamp: snap.stamp.clone(),
84            wall_ts: snap.wall_ts,
85            agent_id: snap.agent_id.clone(),
86            event_hash: snap.event_hash.clone(),
87        }
88    }
89}
90
91/// Full snapshot payload encoding every CRDT field with its clock metadata.
92///
93/// This is the `state` JSON inside an `item.snapshot` event's [`SnapshotData`].
94/// It preserves enough information for field-wise lattice join on merge.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct SnapshotPayload {
97    /// Item identifier.
98    pub item_id: String,
99
100    // -- LWW scalar fields with per-field clocks --
101    pub title: LwwSnapshot<String>,
102    pub description: LwwSnapshot<String>,
103    pub kind: LwwSnapshot<Kind>,
104    pub size: LwwSnapshot<Option<Size>>,
105    pub urgency: LwwSnapshot<Urgency>,
106    pub parent: LwwSnapshot<String>,
107    pub deleted: LwwSnapshot<bool>,
108
109    // -- Epoch+Phase lifecycle state --
110    pub state: EpochPhaseState,
111
112    // -- OR-Set fields (full state with elements and tombstones) --
113    pub assignees: OrSet<String>,
114    pub labels: OrSet<String>,
115    pub blocked_by: OrSet<String>,
116    pub related_to: OrSet<String>,
117
118    // -- G-Set (grow-only comment hashes) --
119    pub comments: GSet<String>,
120
121    // -- Timestamps --
122    pub created_at: u64,
123    pub updated_at: u64,
124
125    // -- Audit metadata --
126    /// Number of original events that were compacted into this snapshot.
127    #[serde(rename = "_compacted_from")]
128    pub compacted_from: usize,
129    /// Wall-clock timestamp (microseconds) of the earliest original event.
130    #[serde(rename = "_earliest_ts")]
131    pub earliest_ts: i64,
132    /// Wall-clock timestamp (microseconds) of the latest original event.
133    #[serde(rename = "_latest_ts")]
134    pub latest_ts: i64,
135}
136
137// ---------------------------------------------------------------------------
138// WorkItemState ↔ SnapshotPayload conversion
139// ---------------------------------------------------------------------------
140
141impl WorkItemState {
142    /// Serialize the full CRDT aggregate to a [`SnapshotPayload`].
143    ///
144    /// This captures per-field clock metadata needed for correct lattice
145    /// merge when the snapshot is applied on another replica.
146    #[must_use]
147    pub fn to_snapshot_payload(
148        &self,
149        item_id: &str,
150        compacted_from: usize,
151        earliest_ts: i64,
152        latest_ts: i64,
153    ) -> SnapshotPayload {
154        SnapshotPayload {
155            item_id: item_id.to_string(),
156            title: LwwSnapshot::from(&self.title),
157            description: LwwSnapshot::from(&self.description),
158            kind: LwwSnapshot::from(&self.kind),
159            size: LwwSnapshot::from(&self.size),
160            urgency: LwwSnapshot::from(&self.urgency),
161            parent: LwwSnapshot::from(&self.parent),
162            deleted: LwwSnapshot::from(&self.deleted),
163            state: self.state.clone(),
164            assignees: self.assignees.clone(),
165            labels: self.labels.clone(),
166            blocked_by: self.blocked_by.clone(),
167            related_to: self.related_to.clone(),
168            comments: self.comments.clone(),
169            created_at: self.created_at,
170            updated_at: self.updated_at,
171            compacted_from,
172            earliest_ts,
173            latest_ts,
174        }
175    }
176
177    /// Reconstruct a [`WorkItemState`] from a [`SnapshotPayload`].
178    ///
179    /// The resulting state can be merged with other states via the normal
180    /// `WorkItemState::merge` — this is how snapshots participate in the
181    /// lattice.
182    #[must_use]
183    pub fn from_snapshot_payload(payload: &SnapshotPayload) -> Self {
184        Self {
185            title: LwwRegister::from(&payload.title),
186            description: LwwRegister::from(&payload.description),
187            kind: LwwRegister::from(&payload.kind),
188            state: payload.state.clone(),
189            size: LwwRegister::from(&payload.size),
190            urgency: LwwRegister::from(&payload.urgency),
191            parent: LwwRegister::from(&payload.parent),
192            assignees: payload.assignees.clone(),
193            labels: payload.labels.clone(),
194            blocked_by: payload.blocked_by.clone(),
195            related_to: payload.related_to.clone(),
196            comments: payload.comments.clone(),
197            deleted: LwwRegister::from(&payload.deleted),
198            created_at: payload.created_at,
199            updated_at: payload.updated_at,
200        }
201    }
202}
203
204// ---------------------------------------------------------------------------
205// CompactionReport
206// ---------------------------------------------------------------------------
207
208/// Report from compacting eligible items.
209#[derive(Debug, Clone, Serialize)]
210pub struct CompactionReport {
211    /// Number of items compacted.
212    pub items_compacted: usize,
213    /// Number of events replaced by snapshots.
214    pub events_replaced: usize,
215    /// Number of snapshot events created (one per compacted item).
216    pub snapshots_created: usize,
217    /// Items skipped (not eligible).
218    pub items_skipped: usize,
219}
220
221// ---------------------------------------------------------------------------
222// Core compaction functions
223// ---------------------------------------------------------------------------
224
225/// Compact all events for a single item into one `item.snapshot` event.
226///
227/// Replays `events` to produce final CRDT state, then creates a single
228/// snapshot event encoding the full `WorkItemState` with per-field clocks.
229///
230/// # Arguments
231///
232/// * `item_id` — The work item identifier.
233/// * `events` — All events for this item, in causal/chronological order.
234/// * `agent` — Agent identifier for the snapshot event.
235/// * `redacted_hashes` — Set of event hashes that have been redacted.
236///   If any source events are redacted, the compaction is skipped (returns None).
237///
238/// # Returns
239///
240/// `Some(Event)` with the snapshot, or `None` if the item has redacted events
241/// (compaction must not reintroduce redacted content).
242///
243/// # Panics
244///
245/// Panics if the `SnapshotPayload` cannot be serialized to JSON. This should
246/// never happen with valid CRDT state.
247#[must_use]
248pub fn compact_item<S: ::std::hash::BuildHasher>(
249    item_id: &str,
250    events: &[Event],
251    agent: &str,
252    redacted_hashes: &HashSet<String, S>,
253) -> Option<Event> {
254    if events.is_empty() {
255        return None;
256    }
257
258    // Check for redacted events — refuse to compact if any source events
259    // are redacted, since the snapshot would reintroduce the content.
260    for event in events {
261        if redacted_hashes.contains(&event.event_hash) {
262            return None;
263        }
264    }
265
266    // Replay all events to build the final CRDT state.
267    let mut state = WorkItemState::new();
268    for event in events {
269        state.apply_event(event);
270    }
271
272    // Compute audit metadata.
273    let earliest_ts = events.iter().map(|e| e.wall_ts_us).min().unwrap_or(0);
274    let latest_ts = events.iter().map(|e| e.wall_ts_us).max().unwrap_or(0);
275
276    // Build the snapshot payload.
277    let payload = state.to_snapshot_payload(item_id, events.len(), earliest_ts, latest_ts);
278
279    // Serialize payload to JSON value for SnapshotData.
280    let state_json =
281        serde_json::to_value(&payload).expect("SnapshotPayload should always serialize");
282
283    // Build the snapshot event.
284    // Use the latest event's timestamp + 1µs for the snapshot,
285    // and reference all leaf events as parents.
286    let snapshot_ts = latest_ts + 1;
287    let parents: Vec<String> = events
288        .iter()
289        .map(|e| e.event_hash.clone())
290        .collect::<HashSet<_>>()
291        .into_iter()
292        .collect::<Vec<_>>();
293
294    let mut sorted_parents = parents;
295    sorted_parents.sort();
296
297    let itc = events
298        .last()
299        .map_or_else(|| "itc:AQ".to_string(), |e| e.itc.clone());
300
301    let item_id_parsed = ItemId::new_unchecked(item_id);
302
303    let mut snapshot_event = Event {
304        wall_ts_us: snapshot_ts,
305        agent: agent.to_string(),
306        itc,
307        parents: sorted_parents,
308        event_type: EventType::Snapshot,
309        item_id: item_id_parsed,
310        data: EventData::Snapshot(SnapshotData {
311            state: state_json,
312            extra: BTreeMap::new(),
313        }),
314        event_hash: String::new(), // Will be computed
315    };
316
317    // Compute and set the event hash.
318    snapshot_event.event_hash =
319        writer::compute_event_hash(&snapshot_event).expect("snapshot event should always hash");
320
321    Some(snapshot_event)
322}
323
324/// Check if a work item is eligible for compaction.
325///
326/// An item is eligible if:
327/// - Its lifecycle state is Done or Archived
328/// - It has been in that terminal state for at least `min_age_days`
329/// - It is not soft-deleted (deleted items are already lightweight)
330///
331/// # Arguments
332///
333/// * `state` — The current CRDT state of the item.
334/// * `min_age_days` — Minimum days in done/archived before compaction.
335/// * `now_us` — Current wall-clock time in microseconds.
336#[must_use]
337pub fn is_eligible(state: &WorkItemState, min_age_days: u32, now_us: i64) -> bool {
338    // Must be in a terminal phase.
339    let phase = state.phase();
340    if phase != Phase::Done && phase != Phase::Archived {
341        return false;
342    }
343
344    // Must not be soft-deleted.
345    if state.is_deleted() {
346        return false;
347    }
348
349    // Check age: updated_at represents when the item last changed state.
350    let age_us = now_us.saturating_sub(state.updated_at.cast_signed());
351    let min_age_us = i64::from(min_age_days) * 24 * 60 * 60 * 1_000_000;
352
353    age_us >= min_age_us
354}
355
356/// Compact eligible items from a collection of events grouped by item ID.
357///
358/// # Arguments
359///
360/// * `events_by_item` — Map from `item_id` to all events for that item.
361/// * `agent` — Agent identifier for snapshot events.
362/// * `min_age_days` — Minimum days in done/archived before compaction.
363/// * `now_us` — Current wall-clock time in microseconds.
364/// * `redacted_hashes` — Set of event hashes that have been redacted.
365///
366/// # Returns
367///
368/// A tuple of (`snapshot_events`, report).
369pub fn compact_items<S: ::std::hash::BuildHasher>(
370    events_by_item: &BTreeMap<String, Vec<Event>>,
371    agent: &str,
372    min_age_days: u32,
373    now_us: i64,
374    redacted_hashes: &HashSet<String, S>,
375) -> (Vec<Event>, CompactionReport) {
376    let mut snapshots = Vec::new();
377    let mut report = CompactionReport {
378        items_compacted: 0,
379        events_replaced: 0,
380        snapshots_created: 0,
381        items_skipped: 0,
382    };
383
384    for (item_id, events) in events_by_item {
385        if events.is_empty() {
386            report.items_skipped += 1;
387            continue;
388        }
389
390        // Skip items that already consist of a single snapshot event.
391        if events.len() == 1 && events[0].event_type == EventType::Snapshot {
392            report.items_skipped += 1;
393            continue;
394        }
395
396        // Replay to determine eligibility.
397        let mut state = WorkItemState::new();
398        for event in events {
399            state.apply_event(event);
400        }
401
402        if !is_eligible(&state, min_age_days, now_us) {
403            report.items_skipped += 1;
404            continue;
405        }
406
407        // Attempt compaction.
408        match compact_item(item_id, events, agent, redacted_hashes) {
409            Some(snapshot) => {
410                report.items_compacted += 1;
411                report.events_replaced += events.len();
412                report.snapshots_created += 1;
413                snapshots.push(snapshot);
414            }
415            None => {
416                // Redacted events prevented compaction.
417                report.items_skipped += 1;
418            }
419        }
420    }
421
422    (snapshots, report)
423}
424
425/// Verify that compacted state matches uncompacted state.
426///
427/// Replays original events to produce a `WorkItemState`, then reconstructs
428/// a `WorkItemState` from the snapshot event's payload and compares them
429/// field by field.
430///
431/// # Returns
432///
433/// `Ok(true)` if the states match, `Ok(false)` if they diverge,
434/// or `Err` if the snapshot event cannot be parsed.
435///
436/// # Errors
437///
438/// Returns an error if the snapshot event cannot be deserialized into a
439/// `SnapshotPayload`.
440pub fn verify_compaction(
441    item_id: &str,
442    original_events: &[Event],
443    snapshot_event: &Event,
444) -> Result<bool> {
445    // Replay original events.
446    let mut original_state = WorkItemState::new();
447    for event in original_events {
448        original_state.apply_event(event);
449    }
450
451    // Parse snapshot payload.
452    let payload = extract_snapshot_payload(snapshot_event)
453        .with_context(|| format!("parse snapshot for {item_id}"))?;
454
455    // Reconstruct state from snapshot.
456    let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
457
458    // Compare field by field.
459    Ok(states_match(&original_state, &snapshot_state))
460}
461
462/// Verify lattice join property: merge(original, snapshot) == original.
463///
464/// If compaction is correct, the snapshot is the join of all events, so
465/// merging the snapshot with the original state should produce identical
466/// state (idempotency of join with self's join).
467///
468/// # Errors
469///
470/// Returns an error if the snapshot event cannot be deserialized.
471pub fn verify_lattice_join(original_events: &[Event], snapshot_event: &Event) -> Result<bool> {
472    // Build original state.
473    let mut original_state = WorkItemState::new();
474    for event in original_events {
475        original_state.apply_event(event);
476    }
477
478    // Build snapshot state.
479    let payload = extract_snapshot_payload(snapshot_event)?;
480    let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
481
482    // Merge snapshot into original (should be no-op since snapshot == join).
483    let mut merged = original_state.clone();
484    merged.merge(&snapshot_state);
485
486    Ok(states_match(&original_state, &merged))
487}
488
489/// Extract and deserialize the [`SnapshotPayload`] from an `item.snapshot` event.
490///
491/// # Errors
492///
493/// Returns an error if the event is not a snapshot type or if the payload
494/// cannot be deserialized.
495pub fn extract_snapshot_payload(event: &Event) -> Result<SnapshotPayload> {
496    if event.event_type != EventType::Snapshot {
497        bail!("expected item.snapshot event, got {}", event.event_type);
498    }
499
500    let state_json = match &event.data {
501        EventData::Snapshot(data) => &data.state,
502        _ => bail!("event data is not Snapshot variant"),
503    };
504
505    let payload: SnapshotPayload = serde_json::from_value(state_json.clone())
506        .context("deserialize SnapshotPayload from snapshot event")?;
507
508    Ok(payload)
509}
510
511// ---------------------------------------------------------------------------
512// Comparison helper
513// ---------------------------------------------------------------------------
514
515/// Compare two `WorkItemState` instances for semantic equality.
516///
517/// This checks all fields including CRDT metadata. It is used during
518/// verification to ensure compaction is semantics-preserving.
519fn states_match(a: &WorkItemState, b: &WorkItemState) -> bool {
520    // LWW fields: compare value and clock metadata.
521    a.title.value == b.title.value
522        && a.title.wall_ts == b.title.wall_ts
523        && a.title.agent_id == b.title.agent_id
524        && a.title.event_hash == b.title.event_hash
525        && a.description.value == b.description.value
526        && a.description.wall_ts == b.description.wall_ts
527        && a.kind.value == b.kind.value
528        && a.kind.wall_ts == b.kind.wall_ts
529        && a.size.value == b.size.value
530        && a.size.wall_ts == b.size.wall_ts
531        && a.urgency.value == b.urgency.value
532        && a.urgency.wall_ts == b.urgency.wall_ts
533        && a.parent.value == b.parent.value
534        && a.parent.wall_ts == b.parent.wall_ts
535        && a.deleted.value == b.deleted.value
536        && a.deleted.wall_ts == b.deleted.wall_ts
537        // EpochPhaseState
538        && a.state == b.state
539        // OR-Sets
540        && a.assignees == b.assignees
541        && a.labels == b.labels
542        && a.blocked_by == b.blocked_by
543        && a.related_to == b.related_to
544        // G-Set
545        && a.comments == b.comments
546        // Timestamps
547        && a.created_at == b.created_at
548        && a.updated_at == b.updated_at
549}
550
551// ---------------------------------------------------------------------------
552// Compaction policy configuration
553// ---------------------------------------------------------------------------
554
555/// Configuration for the compaction policy.
556#[derive(Debug, Clone, Serialize, Deserialize)]
557pub struct CompactionPolicy {
558    /// Minimum days an item must be in done/archived state before compaction.
559    /// Default: 30.
560    pub min_age_days: u32,
561
562    /// Target lifecycle states eligible for compaction.
563    /// Default: `["done", "archived"]`.
564    pub target_states: Vec<String>,
565
566    /// If true, perform a dry run (report what would be compacted, but don't
567    /// write any snapshot events).
568    pub dry_run: bool,
569}
570
571impl Default for CompactionPolicy {
572    fn default() -> Self {
573        Self {
574            min_age_days: 30,
575            target_states: vec!["done".to_string(), "archived".to_string()],
576            dry_run: false,
577        }
578    }
579}
580
581// ---------------------------------------------------------------------------
582// Tests
583// ---------------------------------------------------------------------------
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::clock::itc::Stamp;
589    use crate::event::data::*;
590    use crate::model::item::{Kind, Size, State, Urgency};
591    use std::collections::BTreeMap;
592
593    // -----------------------------------------------------------------------
594    // Test helpers
595    // -----------------------------------------------------------------------
596
597    fn make_event(
598        event_type: EventType,
599        data: EventData,
600        wall_ts_us: i64,
601        agent: &str,
602        event_hash: &str,
603        item_id: &str,
604    ) -> Event {
605        let mut stamp = Stamp::seed();
606        stamp.event();
607        Event {
608            wall_ts_us,
609            agent: agent.to_string(),
610            itc: stamp.to_string(),
611            parents: vec![],
612            event_type,
613            item_id: ItemId::new_unchecked(item_id),
614            data,
615            event_hash: event_hash.to_string(),
616        }
617    }
618
619    fn create_event(title: &str, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
620        make_event(
621            EventType::Create,
622            EventData::Create(CreateData {
623                title: title.to_string(),
624                kind: Kind::Task,
625                size: Some(Size::M),
626                urgency: Urgency::Default,
627                labels: vec!["backend".to_string()],
628                parent: None,
629                causation: None,
630                description: Some("A description".to_string()),
631                extra: BTreeMap::new(),
632            }),
633            wall_ts,
634            agent,
635            hash,
636            item_id,
637        )
638    }
639
640    fn move_event(state: State, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
641        make_event(
642            EventType::Move,
643            EventData::Move(MoveData {
644                state,
645                reason: None,
646                extra: BTreeMap::new(),
647            }),
648            wall_ts,
649            agent,
650            hash,
651            item_id,
652        )
653    }
654
655    fn assign_event(
656        target_agent: &str,
657        wall_ts: i64,
658        agent: &str,
659        hash: &str,
660        item_id: &str,
661    ) -> Event {
662        make_event(
663            EventType::Assign,
664            EventData::Assign(AssignData {
665                agent: target_agent.to_string(),
666                action: AssignAction::Assign,
667                extra: BTreeMap::new(),
668            }),
669            wall_ts,
670            agent,
671            hash,
672            item_id,
673        )
674    }
675
676    fn comment_event(body: &str, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
677        make_event(
678            EventType::Comment,
679            EventData::Comment(CommentData {
680                body: body.to_string(),
681                extra: BTreeMap::new(),
682            }),
683            wall_ts,
684            agent,
685            hash,
686            item_id,
687        )
688    }
689
690    fn update_title_event(
691        title: &str,
692        wall_ts: i64,
693        agent: &str,
694        hash: &str,
695        item_id: &str,
696    ) -> Event {
697        make_event(
698            EventType::Update,
699            EventData::Update(UpdateData {
700                field: "title".to_string(),
701                value: serde_json::Value::String(title.to_string()),
702                extra: BTreeMap::new(),
703            }),
704            wall_ts,
705            agent,
706            hash,
707            item_id,
708        )
709    }
710
711    fn sample_item_events(item_id: &str) -> Vec<Event> {
712        vec![
713            create_event("Fix auth retry", 1_000_000, "alice", "blake3:e1", item_id),
714            assign_event("bob", 2_000_000, "alice", "blake3:e2", item_id),
715            move_event(State::Doing, 3_000_000, "bob", "blake3:e3", item_id),
716            comment_event("Found root cause", 4_000_000, "bob", "blake3:e4", item_id),
717            update_title_event(
718                "Fix auth retry logic",
719                5_000_000,
720                "bob",
721                "blake3:e5",
722                item_id,
723            ),
724            move_event(State::Done, 6_000_000, "bob", "blake3:e6", item_id),
725        ]
726    }
727
728    // -----------------------------------------------------------------------
729    // compact_item
730    // -----------------------------------------------------------------------
731
732    #[test]
733    fn compact_item_produces_snapshot() {
734        let events = sample_item_events("bn-test1");
735        let redacted = HashSet::new();
736
737        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted)
738            .expect("should produce snapshot");
739
740        assert_eq!(snapshot.event_type, EventType::Snapshot);
741        assert_eq!(snapshot.item_id.as_str(), "bn-test1");
742        assert_eq!(snapshot.agent, "compactor");
743        assert!(snapshot.event_hash.starts_with("blake3:"));
744    }
745
746    #[test]
747    fn compact_item_empty_events_returns_none() {
748        let redacted = HashSet::new();
749        assert!(compact_item("bn-test1", &[], "compactor", &redacted).is_none());
750    }
751
752    #[test]
753    fn compact_item_redacted_events_returns_none() {
754        let events = sample_item_events("bn-test1");
755        let mut redacted = HashSet::new();
756        redacted.insert("blake3:e3".to_string());
757
758        assert!(compact_item("bn-test1", &events, "compactor", &redacted).is_none());
759    }
760
761    #[test]
762    fn compact_item_snapshot_payload_has_audit_metadata() {
763        let events = sample_item_events("bn-test1");
764        let redacted = HashSet::new();
765
766        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
767        let payload = extract_snapshot_payload(&snapshot).unwrap();
768
769        assert_eq!(payload.compacted_from, 6);
770        assert_eq!(payload.earliest_ts, 1_000_000);
771        assert_eq!(payload.latest_ts, 6_000_000);
772    }
773
774    #[test]
775    fn compact_item_snapshot_preserves_state() {
776        let events = sample_item_events("bn-test1");
777        let redacted = HashSet::new();
778
779        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
780        let payload = extract_snapshot_payload(&snapshot).unwrap();
781
782        assert_eq!(payload.title.value, "Fix auth retry logic");
783        assert_eq!(payload.kind.value, Kind::Task);
784        assert_eq!(payload.size.value, Some(Size::M));
785        assert_eq!(payload.state.phase, Phase::Done);
786        assert_eq!(payload.description.value, "A description");
787        assert!(!payload.deleted.value);
788    }
789
790    // -----------------------------------------------------------------------
791    // verify_compaction
792    // -----------------------------------------------------------------------
793
794    #[test]
795    fn verify_compaction_matches() {
796        let events = sample_item_events("bn-test1");
797        let redacted = HashSet::new();
798
799        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
800        let matches = verify_compaction("bn-test1", &events, &snapshot).unwrap();
801
802        assert!(matches, "compacted state should match replayed state");
803    }
804
805    #[test]
806    fn verify_lattice_join_holds() {
807        let events = sample_item_events("bn-test1");
808        let redacted = HashSet::new();
809
810        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
811        let holds = verify_lattice_join(&events, &snapshot).unwrap();
812
813        assert!(holds, "merge(original, snapshot) should equal original");
814    }
815
816    // -----------------------------------------------------------------------
817    // SnapshotPayload roundtrip
818    // -----------------------------------------------------------------------
819
820    #[test]
821    fn snapshot_payload_serde_roundtrip() {
822        let events = sample_item_events("bn-test1");
823        let redacted = HashSet::new();
824
825        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
826        let payload = extract_snapshot_payload(&snapshot).unwrap();
827
828        // Serialize to JSON and back.
829        let json = serde_json::to_string(&payload).expect("serialize");
830        let roundtripped: SnapshotPayload = serde_json::from_str(&json).expect("deserialize");
831
832        assert_eq!(roundtripped.item_id, payload.item_id);
833        assert_eq!(roundtripped.title.value, payload.title.value);
834        assert_eq!(roundtripped.compacted_from, payload.compacted_from);
835    }
836
837    #[test]
838    fn from_snapshot_payload_roundtrips_state() {
839        let events = sample_item_events("bn-test1");
840
841        // Build original state.
842        let mut original = WorkItemState::new();
843        for event in &events {
844            original.apply_event(event);
845        }
846
847        // Convert to payload and back.
848        let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
849        let reconstructed = WorkItemState::from_snapshot_payload(&payload);
850
851        assert!(states_match(&original, &reconstructed));
852    }
853
854    // -----------------------------------------------------------------------
855    // is_eligible
856    // -----------------------------------------------------------------------
857
858    #[test]
859    fn eligible_done_item_old_enough() {
860        let events = sample_item_events("bn-test1");
861        let mut state = WorkItemState::new();
862        for event in &events {
863            state.apply_event(event);
864        }
865
866        // 31 days after the last event.
867        let now = 6_000_000 + 31 * 24 * 60 * 60 * 1_000_000;
868        assert!(is_eligible(&state, 30, now));
869    }
870
871    #[test]
872    fn not_eligible_done_item_too_new() {
873        let events = sample_item_events("bn-test1");
874        let mut state = WorkItemState::new();
875        for event in &events {
876            state.apply_event(event);
877        }
878
879        // Only 10 days after.
880        let now = 6_000_000 + 10 * 24 * 60 * 60 * 1_000_000;
881        assert!(!is_eligible(&state, 30, now));
882    }
883
884    #[test]
885    fn not_eligible_open_item() {
886        let events = vec![create_event(
887            "Title",
888            1_000_000,
889            "alice",
890            "blake3:c1",
891            "bn-test1",
892        )];
893        let mut state = WorkItemState::new();
894        for event in &events {
895            state.apply_event(event);
896        }
897
898        let now = 1_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
899        assert!(!is_eligible(&state, 30, now));
900    }
901
902    #[test]
903    fn not_eligible_deleted_item() {
904        let events = vec![
905            create_event("Title", 1_000_000, "alice", "blake3:c1", "bn-test1"),
906            move_event(State::Done, 2_000_000, "alice", "blake3:m1", "bn-test1"),
907            make_event(
908                EventType::Delete,
909                EventData::Delete(DeleteData {
910                    reason: Some("dup".to_string()),
911                    extra: BTreeMap::new(),
912                }),
913                3_000_000,
914                "alice",
915                "blake3:d1",
916                "bn-test1",
917            ),
918        ];
919        let mut state = WorkItemState::new();
920        for event in &events {
921            state.apply_event(event);
922        }
923
924        let now = 3_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
925        assert!(!is_eligible(&state, 30, now));
926    }
927
928    // -----------------------------------------------------------------------
929    // compact_items batch
930    // -----------------------------------------------------------------------
931
932    #[test]
933    fn compact_items_batch() {
934        let item1_events = sample_item_events("bn-test1");
935        let item2_events = vec![create_event(
936            "Open item",
937            1_000_000,
938            "alice",
939            "blake3:o1",
940            "bn-test2",
941        )];
942
943        let mut events_by_item = BTreeMap::new();
944        events_by_item.insert("bn-test1".to_string(), item1_events);
945        events_by_item.insert("bn-test2".to_string(), item2_events);
946
947        let now = 6_000_000 + 31 * 24 * 60 * 60 * 1_000_000;
948        let redacted = HashSet::new();
949
950        let (snapshots, report) = compact_items(&events_by_item, "compactor", 30, now, &redacted);
951
952        assert_eq!(snapshots.len(), 1);
953        assert_eq!(report.items_compacted, 1);
954        assert_eq!(report.events_replaced, 6);
955        assert_eq!(report.snapshots_created, 1);
956        assert_eq!(report.items_skipped, 1);
957    }
958
959    #[test]
960    fn compact_items_skips_already_compacted() {
961        // Create an item that's already a single snapshot.
962        let events = sample_item_events("bn-test1");
963        let redacted = HashSet::new();
964        let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
965
966        let mut events_by_item = BTreeMap::new();
967        events_by_item.insert("bn-test1".to_string(), vec![snapshot]);
968
969        let now = 6_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
970        let (snapshots, report) = compact_items(&events_by_item, "compactor", 30, now, &redacted);
971
972        assert_eq!(snapshots.len(), 0);
973        assert_eq!(report.items_skipped, 1);
974    }
975
976    // -----------------------------------------------------------------------
977    // Semilattice property: merge(state, snapshot) == state
978    // -----------------------------------------------------------------------
979
980    #[test]
981    fn snapshot_merge_is_idempotent_with_original() {
982        let events = sample_item_events("bn-test1");
983
984        // Build original state.
985        let mut original = WorkItemState::new();
986        for event in &events {
987            original.apply_event(event);
988        }
989
990        // Build snapshot state from payload.
991        let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
992        let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
993
994        // Merge snapshot into original.
995        let mut merged = original.clone();
996        merged.merge(&snapshot_state);
997
998        assert!(
999            states_match(&original, &merged),
1000            "merge(original, snapshot) should equal original"
1001        );
1002    }
1003
1004    #[test]
1005    fn snapshot_merge_commutative() {
1006        let events = sample_item_events("bn-test1");
1007
1008        let mut original = WorkItemState::new();
1009        for event in &events {
1010            original.apply_event(event);
1011        }
1012
1013        let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
1014        let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
1015
1016        // merge(original, snapshot)
1017        let mut ab = original.clone();
1018        ab.merge(&snapshot_state);
1019
1020        // merge(snapshot, original)
1021        let mut ba = snapshot_state.clone();
1022        ba.merge(&original);
1023
1024        assert!(
1025            states_match(&ab, &ba),
1026            "snapshot merge should be commutative"
1027        );
1028    }
1029
1030    // -----------------------------------------------------------------------
1031    // CompactionPolicy
1032    // -----------------------------------------------------------------------
1033
1034    #[test]
1035    fn compaction_policy_defaults() {
1036        let policy = CompactionPolicy::default();
1037        assert_eq!(policy.min_age_days, 30);
1038        assert_eq!(
1039            policy.target_states,
1040            vec!["done".to_string(), "archived".to_string()]
1041        );
1042        assert!(!policy.dry_run);
1043    }
1044}