use std::collections::BTreeMap;
use std::collections::HashSet;
use anyhow::{Context, Result, bail};
use serde::{Deserialize, Serialize};
use crate::clock::itc::Stamp;
use crate::crdt::OrSet;
use crate::crdt::gset::GSet;
use crate::crdt::item_state::WorkItemState;
use crate::crdt::lww::LwwRegister;
use crate::crdt::state::{EpochPhaseState, Phase};
use crate::event::Event;
use crate::event::data::{EventData, SnapshotData};
use crate::event::types::EventType;
use crate::event::writer;
use crate::model::item::{Kind, Size, Urgency};
use crate::model::item_id::ItemId;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LwwSnapshot<T> {
pub value: T,
pub stamp: Stamp,
pub wall_ts: u64,
pub agent_id: String,
pub event_hash: String,
}
impl<T: Clone> From<&LwwRegister<T>> for LwwSnapshot<T> {
fn from(reg: &LwwRegister<T>) -> Self {
Self {
value: reg.value.clone(),
stamp: reg.stamp.clone(),
wall_ts: reg.wall_ts,
agent_id: reg.agent_id.clone(),
event_hash: reg.event_hash.clone(),
}
}
}
impl<T: Clone> From<&LwwSnapshot<T>> for LwwRegister<T> {
fn from(snap: &LwwSnapshot<T>) -> Self {
Self {
value: snap.value.clone(),
stamp: snap.stamp.clone(),
wall_ts: snap.wall_ts,
agent_id: snap.agent_id.clone(),
event_hash: snap.event_hash.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotPayload {
pub item_id: String,
pub title: LwwSnapshot<String>,
pub description: LwwSnapshot<String>,
pub kind: LwwSnapshot<Kind>,
pub size: LwwSnapshot<Option<Size>>,
pub urgency: LwwSnapshot<Urgency>,
pub parent: LwwSnapshot<String>,
pub deleted: LwwSnapshot<bool>,
pub state: EpochPhaseState,
pub assignees: OrSet<String>,
pub labels: OrSet<String>,
pub blocked_by: OrSet<String>,
pub related_to: OrSet<String>,
pub comments: GSet<String>,
pub created_at: u64,
pub updated_at: u64,
#[serde(rename = "_compacted_from")]
pub compacted_from: usize,
#[serde(rename = "_earliest_ts")]
pub earliest_ts: i64,
#[serde(rename = "_latest_ts")]
pub latest_ts: i64,
}
impl WorkItemState {
#[must_use]
pub fn to_snapshot_payload(
&self,
item_id: &str,
compacted_from: usize,
earliest_ts: i64,
latest_ts: i64,
) -> SnapshotPayload {
SnapshotPayload {
item_id: item_id.to_string(),
title: LwwSnapshot::from(&self.title),
description: LwwSnapshot::from(&self.description),
kind: LwwSnapshot::from(&self.kind),
size: LwwSnapshot::from(&self.size),
urgency: LwwSnapshot::from(&self.urgency),
parent: LwwSnapshot::from(&self.parent),
deleted: LwwSnapshot::from(&self.deleted),
state: self.state.clone(),
assignees: self.assignees.clone(),
labels: self.labels.clone(),
blocked_by: self.blocked_by.clone(),
related_to: self.related_to.clone(),
comments: self.comments.clone(),
created_at: self.created_at,
updated_at: self.updated_at,
compacted_from,
earliest_ts,
latest_ts,
}
}
#[must_use]
pub fn from_snapshot_payload(payload: &SnapshotPayload) -> Self {
Self {
title: LwwRegister::from(&payload.title),
description: LwwRegister::from(&payload.description),
kind: LwwRegister::from(&payload.kind),
state: payload.state.clone(),
size: LwwRegister::from(&payload.size),
urgency: LwwRegister::from(&payload.urgency),
parent: LwwRegister::from(&payload.parent),
assignees: payload.assignees.clone(),
labels: payload.labels.clone(),
blocked_by: payload.blocked_by.clone(),
related_to: payload.related_to.clone(),
comments: payload.comments.clone(),
deleted: LwwRegister::from(&payload.deleted),
created_at: payload.created_at,
updated_at: payload.updated_at,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactionReport {
pub items_compacted: usize,
pub events_replaced: usize,
pub snapshots_created: usize,
pub items_skipped: usize,
}
#[must_use]
pub fn compact_item<S: ::std::hash::BuildHasher>(
item_id: &str,
events: &[Event],
agent: &str,
redacted_hashes: &HashSet<String, S>,
) -> Option<Event> {
if events.is_empty() {
return None;
}
for event in events {
if redacted_hashes.contains(&event.event_hash) {
return None;
}
}
let mut state = WorkItemState::new();
for event in events {
state.apply_event(event);
}
let earliest_ts = events.iter().map(|e| e.wall_ts_us).min().unwrap_or(0);
let latest_ts = events.iter().map(|e| e.wall_ts_us).max().unwrap_or(0);
let payload = state.to_snapshot_payload(item_id, events.len(), earliest_ts, latest_ts);
let state_json =
serde_json::to_value(&payload).expect("SnapshotPayload should always serialize");
let snapshot_ts = latest_ts + 1;
let parents: Vec<String> = events
.iter()
.map(|e| e.event_hash.clone())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let mut sorted_parents = parents;
sorted_parents.sort();
let itc = events
.last()
.map_or_else(|| "itc:AQ".to_string(), |e| e.itc.clone());
let item_id_parsed = ItemId::new_unchecked(item_id);
let mut snapshot_event = Event {
wall_ts_us: snapshot_ts,
agent: agent.to_string(),
itc,
parents: sorted_parents,
event_type: EventType::Snapshot,
item_id: item_id_parsed,
data: EventData::Snapshot(SnapshotData {
state: state_json,
extra: BTreeMap::new(),
}),
event_hash: String::new(), };
snapshot_event.event_hash =
writer::compute_event_hash(&snapshot_event).expect("snapshot event should always hash");
Some(snapshot_event)
}
#[must_use]
pub fn is_eligible(state: &WorkItemState, min_age_days: u32, now_us: i64) -> bool {
let phase = state.phase();
if phase != Phase::Done && phase != Phase::Archived {
return false;
}
if state.is_deleted() {
return false;
}
let age_us = now_us.saturating_sub(state.updated_at.cast_signed());
let min_age_us = i64::from(min_age_days) * 24 * 60 * 60 * 1_000_000;
age_us >= min_age_us
}
pub fn compact_items<S: ::std::hash::BuildHasher>(
events_by_item: &BTreeMap<String, Vec<Event>>,
agent: &str,
min_age_days: u32,
now_us: i64,
redacted_hashes: &HashSet<String, S>,
) -> (Vec<Event>, CompactionReport) {
let mut snapshots = Vec::new();
let mut report = CompactionReport {
items_compacted: 0,
events_replaced: 0,
snapshots_created: 0,
items_skipped: 0,
};
for (item_id, events) in events_by_item {
if events.is_empty() {
report.items_skipped += 1;
continue;
}
if events.len() == 1 && events[0].event_type == EventType::Snapshot {
report.items_skipped += 1;
continue;
}
let mut state = WorkItemState::new();
for event in events {
state.apply_event(event);
}
if !is_eligible(&state, min_age_days, now_us) {
report.items_skipped += 1;
continue;
}
match compact_item(item_id, events, agent, redacted_hashes) {
Some(snapshot) => {
report.items_compacted += 1;
report.events_replaced += events.len();
report.snapshots_created += 1;
snapshots.push(snapshot);
}
None => {
report.items_skipped += 1;
}
}
}
(snapshots, report)
}
pub fn verify_compaction(
item_id: &str,
original_events: &[Event],
snapshot_event: &Event,
) -> Result<bool> {
let mut original_state = WorkItemState::new();
for event in original_events {
original_state.apply_event(event);
}
let payload = extract_snapshot_payload(snapshot_event)
.with_context(|| format!("parse snapshot for {item_id}"))?;
let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
Ok(states_match(&original_state, &snapshot_state))
}
pub fn verify_lattice_join(original_events: &[Event], snapshot_event: &Event) -> Result<bool> {
let mut original_state = WorkItemState::new();
for event in original_events {
original_state.apply_event(event);
}
let payload = extract_snapshot_payload(snapshot_event)?;
let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
let mut merged = original_state.clone();
merged.merge(&snapshot_state);
Ok(states_match(&original_state, &merged))
}
pub fn extract_snapshot_payload(event: &Event) -> Result<SnapshotPayload> {
if event.event_type != EventType::Snapshot {
bail!("expected item.snapshot event, got {}", event.event_type);
}
let state_json = match &event.data {
EventData::Snapshot(data) => &data.state,
_ => bail!("event data is not Snapshot variant"),
};
let payload: SnapshotPayload = serde_json::from_value(state_json.clone())
.context("deserialize SnapshotPayload from snapshot event")?;
Ok(payload)
}
fn states_match(a: &WorkItemState, b: &WorkItemState) -> bool {
a.title.value == b.title.value
&& a.title.wall_ts == b.title.wall_ts
&& a.title.agent_id == b.title.agent_id
&& a.title.event_hash == b.title.event_hash
&& a.description.value == b.description.value
&& a.description.wall_ts == b.description.wall_ts
&& a.kind.value == b.kind.value
&& a.kind.wall_ts == b.kind.wall_ts
&& a.size.value == b.size.value
&& a.size.wall_ts == b.size.wall_ts
&& a.urgency.value == b.urgency.value
&& a.urgency.wall_ts == b.urgency.wall_ts
&& a.parent.value == b.parent.value
&& a.parent.wall_ts == b.parent.wall_ts
&& a.deleted.value == b.deleted.value
&& a.deleted.wall_ts == b.deleted.wall_ts
&& a.state == b.state
&& a.assignees == b.assignees
&& a.labels == b.labels
&& a.blocked_by == b.blocked_by
&& a.related_to == b.related_to
&& a.comments == b.comments
&& a.created_at == b.created_at
&& a.updated_at == b.updated_at
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionPolicy {
pub min_age_days: u32,
pub target_states: Vec<String>,
pub dry_run: bool,
}
impl Default for CompactionPolicy {
fn default() -> Self {
Self {
min_age_days: 30,
target_states: vec!["done".to_string(), "archived".to_string()],
dry_run: false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::itc::Stamp;
use crate::event::data::*;
use crate::model::item::{Kind, Size, State, Urgency};
use std::collections::BTreeMap;
fn make_event(
event_type: EventType,
data: EventData,
wall_ts_us: i64,
agent: &str,
event_hash: &str,
item_id: &str,
) -> Event {
let mut stamp = Stamp::seed();
stamp.event();
Event {
wall_ts_us,
agent: agent.to_string(),
itc: stamp.to_string(),
parents: vec![],
event_type,
item_id: ItemId::new_unchecked(item_id),
data,
event_hash: event_hash.to_string(),
}
}
fn create_event(title: &str, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
make_event(
EventType::Create,
EventData::Create(CreateData {
title: title.to_string(),
kind: Kind::Task,
size: Some(Size::M),
urgency: Urgency::Default,
labels: vec!["backend".to_string()],
parent: None,
causation: None,
description: Some("A description".to_string()),
extra: BTreeMap::new(),
}),
wall_ts,
agent,
hash,
item_id,
)
}
fn move_event(state: State, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
make_event(
EventType::Move,
EventData::Move(MoveData {
state,
reason: None,
extra: BTreeMap::new(),
}),
wall_ts,
agent,
hash,
item_id,
)
}
fn assign_event(
target_agent: &str,
wall_ts: i64,
agent: &str,
hash: &str,
item_id: &str,
) -> Event {
make_event(
EventType::Assign,
EventData::Assign(AssignData {
agent: target_agent.to_string(),
action: AssignAction::Assign,
extra: BTreeMap::new(),
}),
wall_ts,
agent,
hash,
item_id,
)
}
fn comment_event(body: &str, wall_ts: i64, agent: &str, hash: &str, item_id: &str) -> Event {
make_event(
EventType::Comment,
EventData::Comment(CommentData {
body: body.to_string(),
extra: BTreeMap::new(),
}),
wall_ts,
agent,
hash,
item_id,
)
}
fn update_title_event(
title: &str,
wall_ts: i64,
agent: &str,
hash: &str,
item_id: &str,
) -> Event {
make_event(
EventType::Update,
EventData::Update(UpdateData {
field: "title".to_string(),
value: serde_json::Value::String(title.to_string()),
extra: BTreeMap::new(),
}),
wall_ts,
agent,
hash,
item_id,
)
}
fn sample_item_events(item_id: &str) -> Vec<Event> {
vec![
create_event("Fix auth retry", 1_000_000, "alice", "blake3:e1", item_id),
assign_event("bob", 2_000_000, "alice", "blake3:e2", item_id),
move_event(State::Doing, 3_000_000, "bob", "blake3:e3", item_id),
comment_event("Found root cause", 4_000_000, "bob", "blake3:e4", item_id),
update_title_event(
"Fix auth retry logic",
5_000_000,
"bob",
"blake3:e5",
item_id,
),
move_event(State::Done, 6_000_000, "bob", "blake3:e6", item_id),
]
}
#[test]
fn compact_item_produces_snapshot() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted)
.expect("should produce snapshot");
assert_eq!(snapshot.event_type, EventType::Snapshot);
assert_eq!(snapshot.item_id.as_str(), "bn-test1");
assert_eq!(snapshot.agent, "compactor");
assert!(snapshot.event_hash.starts_with("blake3:"));
}
#[test]
fn compact_item_empty_events_returns_none() {
let redacted = HashSet::new();
assert!(compact_item("bn-test1", &[], "compactor", &redacted).is_none());
}
#[test]
fn compact_item_redacted_events_returns_none() {
let events = sample_item_events("bn-test1");
let mut redacted = HashSet::new();
redacted.insert("blake3:e3".to_string());
assert!(compact_item("bn-test1", &events, "compactor", &redacted).is_none());
}
#[test]
fn compact_item_snapshot_payload_has_audit_metadata() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let payload = extract_snapshot_payload(&snapshot).unwrap();
assert_eq!(payload.compacted_from, 6);
assert_eq!(payload.earliest_ts, 1_000_000);
assert_eq!(payload.latest_ts, 6_000_000);
}
#[test]
fn compact_item_snapshot_preserves_state() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let payload = extract_snapshot_payload(&snapshot).unwrap();
assert_eq!(payload.title.value, "Fix auth retry logic");
assert_eq!(payload.kind.value, Kind::Task);
assert_eq!(payload.size.value, Some(Size::M));
assert_eq!(payload.state.phase, Phase::Done);
assert_eq!(payload.description.value, "A description");
assert!(!payload.deleted.value);
}
#[test]
fn verify_compaction_matches() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let matches = verify_compaction("bn-test1", &events, &snapshot).unwrap();
assert!(matches, "compacted state should match replayed state");
}
#[test]
fn verify_lattice_join_holds() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let holds = verify_lattice_join(&events, &snapshot).unwrap();
assert!(holds, "merge(original, snapshot) should equal original");
}
#[test]
fn snapshot_payload_serde_roundtrip() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let payload = extract_snapshot_payload(&snapshot).unwrap();
let json = serde_json::to_string(&payload).expect("serialize");
let roundtripped: SnapshotPayload = serde_json::from_str(&json).expect("deserialize");
assert_eq!(roundtripped.item_id, payload.item_id);
assert_eq!(roundtripped.title.value, payload.title.value);
assert_eq!(roundtripped.compacted_from, payload.compacted_from);
}
#[test]
fn from_snapshot_payload_roundtrips_state() {
let events = sample_item_events("bn-test1");
let mut original = WorkItemState::new();
for event in &events {
original.apply_event(event);
}
let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
let reconstructed = WorkItemState::from_snapshot_payload(&payload);
assert!(states_match(&original, &reconstructed));
}
#[test]
fn eligible_done_item_old_enough() {
let events = sample_item_events("bn-test1");
let mut state = WorkItemState::new();
for event in &events {
state.apply_event(event);
}
let now = 6_000_000 + 31 * 24 * 60 * 60 * 1_000_000;
assert!(is_eligible(&state, 30, now));
}
#[test]
fn not_eligible_done_item_too_new() {
let events = sample_item_events("bn-test1");
let mut state = WorkItemState::new();
for event in &events {
state.apply_event(event);
}
let now = 6_000_000 + 10 * 24 * 60 * 60 * 1_000_000;
assert!(!is_eligible(&state, 30, now));
}
#[test]
fn not_eligible_open_item() {
let events = vec![create_event(
"Title",
1_000_000,
"alice",
"blake3:c1",
"bn-test1",
)];
let mut state = WorkItemState::new();
for event in &events {
state.apply_event(event);
}
let now = 1_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
assert!(!is_eligible(&state, 30, now));
}
#[test]
fn not_eligible_deleted_item() {
let events = vec![
create_event("Title", 1_000_000, "alice", "blake3:c1", "bn-test1"),
move_event(State::Done, 2_000_000, "alice", "blake3:m1", "bn-test1"),
make_event(
EventType::Delete,
EventData::Delete(DeleteData {
reason: Some("dup".to_string()),
extra: BTreeMap::new(),
}),
3_000_000,
"alice",
"blake3:d1",
"bn-test1",
),
];
let mut state = WorkItemState::new();
for event in &events {
state.apply_event(event);
}
let now = 3_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
assert!(!is_eligible(&state, 30, now));
}
#[test]
fn compact_items_batch() {
let item1_events = sample_item_events("bn-test1");
let item2_events = vec![create_event(
"Open item",
1_000_000,
"alice",
"blake3:o1",
"bn-test2",
)];
let mut events_by_item = BTreeMap::new();
events_by_item.insert("bn-test1".to_string(), item1_events);
events_by_item.insert("bn-test2".to_string(), item2_events);
let now = 6_000_000 + 31 * 24 * 60 * 60 * 1_000_000;
let redacted = HashSet::new();
let (snapshots, report) = compact_items(&events_by_item, "compactor", 30, now, &redacted);
assert_eq!(snapshots.len(), 1);
assert_eq!(report.items_compacted, 1);
assert_eq!(report.events_replaced, 6);
assert_eq!(report.snapshots_created, 1);
assert_eq!(report.items_skipped, 1);
}
#[test]
fn compact_items_skips_already_compacted() {
let events = sample_item_events("bn-test1");
let redacted = HashSet::new();
let snapshot = compact_item("bn-test1", &events, "compactor", &redacted).unwrap();
let mut events_by_item = BTreeMap::new();
events_by_item.insert("bn-test1".to_string(), vec![snapshot]);
let now = 6_000_000 + 365 * 24 * 60 * 60 * 1_000_000;
let (snapshots, report) = compact_items(&events_by_item, "compactor", 30, now, &redacted);
assert_eq!(snapshots.len(), 0);
assert_eq!(report.items_skipped, 1);
}
#[test]
fn snapshot_merge_is_idempotent_with_original() {
let events = sample_item_events("bn-test1");
let mut original = WorkItemState::new();
for event in &events {
original.apply_event(event);
}
let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
let mut merged = original.clone();
merged.merge(&snapshot_state);
assert!(
states_match(&original, &merged),
"merge(original, snapshot) should equal original"
);
}
#[test]
fn snapshot_merge_commutative() {
let events = sample_item_events("bn-test1");
let mut original = WorkItemState::new();
for event in &events {
original.apply_event(event);
}
let payload = original.to_snapshot_payload("bn-test1", events.len(), 1_000_000, 6_000_000);
let snapshot_state = WorkItemState::from_snapshot_payload(&payload);
let mut ab = original.clone();
ab.merge(&snapshot_state);
let mut ba = snapshot_state.clone();
ba.merge(&original);
assert!(
states_match(&ab, &ba),
"snapshot merge should be commutative"
);
}
#[test]
fn compaction_policy_defaults() {
let policy = CompactionPolicy::default();
assert_eq!(policy.min_age_days, 30);
assert_eq!(
policy.target_states,
vec!["done".to_string(), "archived".to_string()]
);
assert!(!policy.dry_run);
}
}