1use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36 pub projected: usize,
38 pub duplicates: usize,
40 pub errors: usize,
42}
43
44pub struct Projector<'conn> {
53 conn: &'conn Connection,
54 has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58 pub fn new(conn: &'conn Connection) -> Self {
63 let _ = ensure_tracking_table(conn);
66 let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67 Self {
68 conn,
69 has_agent_column,
70 }
71 }
72
73 pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84 let mut stats = ProjectionStats::default();
85
86 self.conn
88 .execute_batch("BEGIN IMMEDIATE")
89 .context("begin projection transaction")?;
90
91 for event in events {
92 match self.project_event_inner(event) {
93 Ok(ProjectResult::Projected) => stats.projected += 1,
94 Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95 Err(e) => {
96 tracing::warn!(
97 event_hash = %event.event_hash,
98 event_type = %event.event_type,
99 item_id = %event.item_id,
100 error = %e,
101 "skipping event due to projection error"
102 );
103 stats.errors += 1;
104 }
105 }
106 }
107
108 self.conn
109 .execute_batch("COMMIT")
110 .context("commit projection transaction")?;
111
112 Ok(stats)
113 }
114
115 pub fn project_event(&self, event: &Event) -> Result<bool> {
124 let projected = match self.project_event_inner(event)? {
125 ProjectResult::Projected => true,
126 ProjectResult::Duplicate => false,
127 };
128
129 if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
130 tracing::warn!(
131 event_hash = %event.event_hash,
132 error = %err,
133 "failed to update projection cursor after single-event projection"
134 );
135 }
136
137 Ok(projected)
138 }
139
140 fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
145 if self.is_event_projected(&event.event_hash)? {
147 return Ok(ProjectResult::Duplicate);
148 }
149
150 match event.event_type {
151 EventType::Create => self.project_create(event)?,
152 EventType::Update => self.project_update(event)?,
153 EventType::Move => self.project_move(event)?,
154 EventType::Assign => self.project_assign(event)?,
155 EventType::Comment => self.project_comment(event)?,
156 EventType::Link => self.project_link(event)?,
157 EventType::Unlink => self.project_unlink(event)?,
158 EventType::Delete => self.project_delete(event)?,
159 EventType::Compact => self.project_compact(event)?,
160 EventType::Snapshot => self.project_snapshot(event)?,
161 EventType::Redact => self.project_redact(event)?,
162 }
163
164 self.record_projected_hash(&event.event_hash, event)?;
166
167 Ok(ProjectResult::Projected)
168 }
169
170 fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
175 let exists: bool = self
182 .conn
183 .query_row(
184 "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
185 params![event_hash],
186 |row| row.get(0),
187 )
188 .unwrap_or(false);
189 Ok(exists)
190 }
191
192 fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
193 let exists: bool = self
194 .conn
195 .query_row(
196 "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
197 params![event_hash],
198 |row| row.get(0),
199 )
200 .unwrap_or(false);
201 Ok(exists)
202 }
203
204 fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
205 if self.has_agent_column {
206 self.conn
207 .execute(
208 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
209 VALUES (?1, ?2, ?3, ?4, ?5)",
210 params![
211 event_hash,
212 event.item_id.as_str(),
213 event.event_type.as_str(),
214 event.wall_ts_us,
215 event.agent.as_str(),
216 ],
217 )
218 .context("record projected event hash")?;
219 return Ok(());
220 }
221
222 self.conn
223 .execute(
224 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
225 VALUES (?1, ?2, ?3, ?4)",
226 params![
227 event_hash,
228 event.item_id.as_str(),
229 event.event_type.as_str(),
230 event.wall_ts_us,
231 ],
232 )
233 .context("record projected event hash")?;
234 Ok(())
235 }
236
237 fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
238 let main_db_file: String = self
239 .conn
240 .query_row(
241 "SELECT file FROM pragma_database_list WHERE name = 'main'",
242 [],
243 |row| row.get(0),
244 )
245 .context("read main database file from pragma_database_list")?;
246
247 if main_db_file.trim().is_empty() {
248 return Ok(());
250 }
251
252 let db_path = std::path::Path::new(&main_db_file);
253 let Some(bones_dir) = db_path.parent() else {
254 return Ok(());
255 };
256
257 let shard_mgr = ShardManager::new(bones_dir);
258 let total_len = shard_mgr
259 .total_content_len()
260 .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
261 let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
262
263 query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
264 .context("write projection cursor after single-event projection")?;
265
266 Ok(())
267 }
268
269 fn project_create(&self, event: &Event) -> Result<()> {
274 let EventData::Create(ref data) = event.data else {
275 anyhow::bail!("expected Create data for item.create event");
276 };
277
278 let is_redacted = self.is_event_redacted(&event.event_hash)?;
279 let title = if is_redacted { "" } else { &data.title };
280 let description = if is_redacted {
281 None
282 } else {
283 data.description.as_deref()
284 };
285 let labels_str = if is_redacted {
286 String::new()
287 } else {
288 data.labels.join(" ")
289 };
290
291 self.conn
292 .execute(
293 "INSERT INTO items (
294 item_id, title, description, kind, state, urgency,
295 size, parent_id, is_deleted, search_labels,
296 created_at_us, updated_at_us
297 ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
298 ON CONFLICT(item_id) DO UPDATE SET
299 title = excluded.title,
300 description = excluded.description,
301 kind = excluded.kind,
302 urgency = excluded.urgency,
303 size = excluded.size,
304 parent_id = excluded.parent_id,
305 created_at_us = excluded.created_at_us,
306 search_labels = excluded.search_labels,
307 updated_at_us = excluded.updated_at_us
308 WHERE items.title = ''",
309 params![
310 event.item_id.as_str(),
311 title,
312 description,
313 data.kind.to_string(),
314 data.urgency.to_string(),
315 data.size.map(|s| s.to_string()),
316 data.parent.as_deref(),
317 labels_str,
318 event.wall_ts_us,
319 event.wall_ts_us,
320 ],
321 )
322 .with_context(|| format!("project create for {}", event.item_id))?;
323
324 if !is_redacted {
326 for label in &data.labels {
327 self.conn
328 .execute(
329 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
330 VALUES (?1, ?2, ?3)",
331 params![event.item_id.as_str(), label, event.wall_ts_us],
332 )
333 .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
334 }
335 }
336
337 Ok(())
338 }
339
340 #[allow(clippy::too_many_lines)]
341 fn project_update(&self, event: &Event) -> Result<()> {
342 let EventData::Update(ref data) = event.data else {
343 anyhow::bail!("expected Update data for item.update event");
344 };
345
346 self.ensure_item_exists(event)?;
347
348 let is_redacted = self.is_event_redacted(&event.event_hash)?;
349
350 match data.field.as_str() {
351 "title" => {
352 let value = if is_redacted {
353 "[redacted]"
354 } else {
355 data.value.as_str().unwrap_or_default()
356 };
357 self.conn.execute(
358 "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
359 params![value, event.wall_ts_us, event.item_id.as_str()],
360 )?;
361 }
362 "description" => {
363 let value = if is_redacted {
364 Some("[redacted]".to_string())
365 } else {
366 data.value.as_str().map(String::from)
367 };
368 self.conn.execute(
369 "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
370 params![value, event.wall_ts_us, event.item_id.as_str()],
371 )?;
372 }
373 "kind" => {
374 let value = data.value.as_str().unwrap_or("task");
375 self.conn.execute(
376 "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
377 params![value, event.wall_ts_us, event.item_id.as_str()],
378 )?;
379 }
380 "size" => {
381 let value = data.value.as_str().map(String::from);
382 self.conn.execute(
383 "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
384 params![value, event.wall_ts_us, event.item_id.as_str()],
385 )?;
386 }
387 "urgency" => {
388 let value = data.value.as_str().unwrap_or("default");
389 self.conn.execute(
390 "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
391 params![value, event.wall_ts_us, event.item_id.as_str()],
392 )?;
393 }
394 "parent" => {
395 let value = data.value.as_str().map(String::from);
396 self.conn.execute(
397 "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
398 params![value, event.wall_ts_us, event.item_id.as_str()],
399 )?;
400 }
401 "labels" => {
402 let mut changed = false;
405
406 if let Some(labels) = data.value.as_array() {
407 self.conn.execute(
409 "DELETE FROM item_labels WHERE item_id = ?1",
410 params![event.item_id.as_str()],
411 )?;
412
413 for label_val in labels {
414 if let Some(label) = label_val.as_str() {
415 self.conn.execute(
416 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
417 VALUES (?1, ?2, ?3)",
418 params![event.item_id.as_str(), label, event.wall_ts_us],
419 )?;
420 }
421 }
422 changed = true;
423 } else if let Some(obj) = data.value.as_object() {
424 let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
426 let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
427
428 if !label.is_empty() {
429 match action {
430 "add" => {
431 self.conn.execute(
432 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
433 VALUES (?1, ?2, ?3)",
434 params![event.item_id.as_str(), label, event.wall_ts_us],
435 )?;
436 changed = true;
437 }
438 "remove" => {
439 self.conn.execute(
440 "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
441 params![event.item_id.as_str(), label],
442 )?;
443 changed = true;
444 }
445 _ => {}
446 }
447 }
448 }
449
450 if changed {
451 let mut stmt = self.conn.prepare(
453 "SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label",
454 )?;
455 let label_rows = stmt.query_map(params![event.item_id.as_str()], |row| {
456 row.get::<_, String>(0)
457 })?;
458
459 let mut label_strings = Vec::new();
460 for label_res in label_rows {
461 label_strings.push(label_res?);
462 }
463
464 let search_labels = label_strings.join(" ");
465 self.conn.execute(
466 "UPDATE items SET search_labels = ?1, updated_at_us = ?2 WHERE item_id = ?3",
467 params![search_labels, event.wall_ts_us, event.item_id.as_str()],
468 )?;
469 }
470 }
471 _ => {
472 tracing::debug!(
474 field = %data.field,
475 item_id = %event.item_id,
476 "ignoring update for unknown field"
477 );
478 self.conn.execute(
479 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
480 params![event.wall_ts_us, event.item_id.as_str()],
481 )?;
482 }
483 }
484
485 Ok(())
486 }
487
488 fn project_move(&self, event: &Event) -> Result<()> {
489 let EventData::Move(ref data) = event.data else {
490 anyhow::bail!("expected Move data for item.move event");
491 };
492
493 self.ensure_item_exists(event)?;
494
495 self.conn
496 .execute(
497 "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
498 params![
499 data.state.to_string(),
500 event.wall_ts_us,
501 event.item_id.as_str(),
502 ],
503 )
504 .with_context(|| format!("project move for {}", event.item_id))?;
505
506 Ok(())
507 }
508
509 fn project_assign(&self, event: &Event) -> Result<()> {
510 let EventData::Assign(ref data) = event.data else {
511 anyhow::bail!("expected Assign data for item.assign event");
512 };
513
514 self.ensure_item_exists(event)?;
515
516 match data.action {
517 AssignAction::Assign => {
518 self.conn
519 .execute(
520 "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
521 VALUES (?1, ?2, ?3)",
522 params![event.item_id.as_str(), data.agent, event.wall_ts_us],
523 )
524 .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
525 }
526 AssignAction::Unassign => {
527 self.conn
528 .execute(
529 "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
530 params![event.item_id.as_str(), data.agent],
531 )
532 .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
533 }
534 }
535
536 self.conn.execute(
538 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
539 params![event.wall_ts_us, event.item_id.as_str()],
540 )?;
541
542 Ok(())
543 }
544
545 fn project_comment(&self, event: &Event) -> Result<()> {
546 let EventData::Comment(ref data) = event.data else {
547 anyhow::bail!("expected Comment data for item.comment event");
548 };
549
550 self.ensure_item_exists(event)?;
551
552 let is_redacted = self.is_event_redacted(&event.event_hash)?;
553 let body = if is_redacted {
554 "[redacted]"
555 } else {
556 &data.body
557 };
558
559 self.conn
560 .execute(
561 "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
562 VALUES (?1, ?2, ?3, ?4, ?5)",
563 params![
564 event.item_id.as_str(),
565 event.event_hash,
566 event.agent,
567 body,
568 event.wall_ts_us,
569 ],
570 )
571 .with_context(|| format!("project comment for {}", event.item_id))?;
572
573 self.conn.execute(
575 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
576 params![event.wall_ts_us, event.item_id.as_str()],
577 )?;
578
579 Ok(())
580 }
581
582 fn project_link(&self, event: &Event) -> Result<()> {
583 let EventData::Link(ref data) = event.data else {
584 anyhow::bail!("expected Link data for item.link event");
585 };
586
587 self.ensure_item_exists(event)?;
588
589 self.conn
590 .execute(
591 "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
592 VALUES (?1, ?2, ?3, ?4)",
593 params![
594 event.item_id.as_str(),
595 data.target,
596 data.link_type,
597 event.wall_ts_us,
598 ],
599 )
600 .with_context(|| {
601 format!("project link {} -> {}", event.item_id, data.target)
602 })?;
603
604 self.conn.execute(
606 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
607 params![event.wall_ts_us, event.item_id.as_str()],
608 )?;
609
610 Ok(())
611 }
612
613 fn project_unlink(&self, event: &Event) -> Result<()> {
614 let EventData::Unlink(ref data) = event.data else {
615 anyhow::bail!("expected Unlink data for item.unlink event");
616 };
617
618 self.ensure_item_exists(event)?;
619
620 if let Some(ref link_type) = data.link_type {
621 self.conn
622 .execute(
623 "DELETE FROM item_dependencies \
624 WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
625 params![event.item_id.as_str(), data.target, link_type],
626 )
627 .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
628 } else {
629 self.conn
631 .execute(
632 "DELETE FROM item_dependencies \
633 WHERE item_id = ?1 AND depends_on_item_id = ?2",
634 params![event.item_id.as_str(), data.target],
635 )
636 .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
637 }
638
639 self.conn.execute(
641 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
642 params![event.wall_ts_us, event.item_id.as_str()],
643 )?;
644
645 Ok(())
646 }
647
648 fn project_delete(&self, event: &Event) -> Result<()> {
649 self.ensure_item_exists(event)?;
650
651 self.conn
652 .execute(
653 "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
654 WHERE item_id = ?2",
655 params![event.wall_ts_us, event.item_id.as_str()],
656 )
657 .with_context(|| format!("project delete for {}", event.item_id))?;
658
659 Ok(())
660 }
661
662 fn project_compact(&self, event: &Event) -> Result<()> {
663 let EventData::Compact(ref data) = event.data else {
664 anyhow::bail!("expected Compact data for item.compact event");
665 };
666
667 self.ensure_item_exists(event)?;
668
669 let is_redacted = self.is_event_redacted(&event.event_hash)?;
670 let summary = if is_redacted {
671 "[redacted]"
672 } else {
673 data.summary.as_str()
674 };
675
676 self.conn
677 .execute(
678 "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
679 params![summary, event.wall_ts_us, event.item_id.as_str()],
680 )
681 .with_context(|| format!("project compact for {}", event.item_id))?;
682
683 Ok(())
684 }
685
686 fn project_snapshot(&self, event: &Event) -> Result<()> {
687 let EventData::Snapshot(ref data) = event.data else {
688 anyhow::bail!("expected Snapshot data for item.snapshot event");
689 };
690
691 self.ensure_item_exists(event)?;
692
693 let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
694
695 self.conn
696 .execute(
697 "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
698 params![json_str, event.wall_ts_us, event.item_id.as_str()],
699 )
700 .with_context(|| format!("project snapshot for {}", event.item_id))?;
701
702 Ok(())
703 }
704
705 fn project_redact(&self, event: &Event) -> Result<()> {
706 let EventData::Redact(ref data) = event.data else {
707 anyhow::bail!("expected Redact data for item.redact event");
708 };
709
710 self.conn
712 .execute(
713 "INSERT OR IGNORE INTO event_redactions \
714 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
715 VALUES (?1, ?2, ?3, ?4, ?5)",
716 params![
717 data.target_hash,
718 event.item_id.as_str(),
719 data.reason,
720 event.agent,
721 event.wall_ts_us,
722 ],
723 )
724 .with_context(|| {
725 format!(
726 "project redact for {} targeting {}",
727 event.item_id, data.target_hash
728 )
729 })?;
730
731 self.conn
733 .execute(
734 "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
735 params![data.target_hash],
736 )
737 .context("redact comment body")?;
738
739 Ok(())
740 }
741
742 fn ensure_item_exists(&self, event: &Event) -> Result<()> {
751 let exists: bool = self
752 .conn
753 .query_row(
754 "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
755 params![event.item_id.as_str()],
756 |row| row.get(0),
757 )
758 .context("check item exists")?;
759
760 if !exists {
761 self.conn
762 .execute(
763 "INSERT INTO items (
764 item_id, title, kind, state, urgency,
765 is_deleted, search_labels, created_at_us, updated_at_us
766 ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
767 params![event.item_id.as_str(), event.wall_ts_us],
768 )
769 .with_context(|| format!("create placeholder item for {}", event.item_id))?;
770 }
771
772 Ok(())
773 }
774}
775
776enum ProjectResult {
777 Projected,
778 Duplicate,
779}
780
781pub const PROJECTED_EVENTS_DDL: &str = "\
790CREATE TABLE IF NOT EXISTS projected_events (
791 event_hash TEXT PRIMARY KEY,
792 item_id TEXT NOT NULL,
793 event_type TEXT NOT NULL,
794 projected_at_us INTEGER NOT NULL,
795 agent TEXT NOT NULL DEFAULT ''
796);
797CREATE INDEX IF NOT EXISTS idx_projected_events_item
798 ON projected_events(item_id);
799CREATE INDEX IF NOT EXISTS idx_projected_events_agent
800 ON projected_events(agent);
801";
802
803pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
812 conn.execute_batch(PROJECTED_EVENTS_DDL)
813 .context("create projected_events tracking table")?;
814
815 if !projected_events_has_agent_column(conn)? {
816 conn.execute(
817 "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
818 [],
819 )
820 .context("add agent column to projected_events")?;
821 }
822
823 conn.execute_batch(
824 "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
825 )
826 .context("create projected_events_agent index")?;
827
828 Ok(())
829}
830
831fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
832 let mut stmt = conn
833 .prepare("PRAGMA table_info(projected_events)")
834 .context("inspect projected_events schema")?;
835 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
836
837 for row in rows {
838 let name = row.context("read projected_events column")?;
839 if name == "agent" {
840 return Ok(true);
841 }
842 }
843
844 Ok(false)
845}
846
847pub fn clear_projection(conn: &Connection) -> Result<()> {
856 conn.execute_batch(
857 "DELETE FROM event_redactions;
858 DELETE FROM item_comments;
859 DELETE FROM item_dependencies;
860 DELETE FROM item_assignees;
861 DELETE FROM item_labels;
862 DELETE FROM items;
863 DELETE FROM projected_events;
864 UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
865 )
866 .context("clear projection tables")?;
867 Ok(())
868}
869
870#[cfg(test)]
875mod tests {
876 use super::*;
877 use crate::db::{migrations, query};
878 use crate::event::data::*;
879 use crate::event::types::EventType;
880 use crate::event::writer;
881 use crate::model::item::{Kind, Size, State, Urgency};
882 use crate::model::item_id::ItemId;
883 use crate::shard::ShardManager;
884 use rusqlite::Connection;
885 use std::collections::BTreeMap;
886 use tempfile::TempDir;
887
888 fn test_db() -> Connection {
889 let mut conn = Connection::open_in_memory().expect("open in-memory db");
890 migrations::migrate(&mut conn).expect("migrate");
891 ensure_tracking_table(&conn).expect("create tracking table");
892 conn
893 }
894
895 fn make_event(
896 event_type: EventType,
897 item_id: &str,
898 data: EventData,
899 hash: &str,
900 ts: i64,
901 ) -> Event {
902 Event {
903 wall_ts_us: ts,
904 agent: "test-agent".into(),
905 itc: "itc:AQ".into(),
906 parents: vec![],
907 event_type,
908 item_id: ItemId::new_unchecked(item_id),
909 data,
910 event_hash: format!("blake3:{hash}"),
911 }
912 }
913
914 fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
915 make_event(
916 EventType::Create,
917 id,
918 EventData::Create(CreateData {
919 title: title.into(),
920 kind: Kind::Task,
921 size: Some(Size::M),
922 urgency: Urgency::Default,
923 labels: vec!["backend".into(), "auth".into()],
924 parent: None,
925 causation: None,
926 description: Some("A detailed description".into()),
927 extra: BTreeMap::new(),
928 }),
929 hash,
930 ts,
931 )
932 }
933
934 #[test]
935 fn project_event_updates_projection_cursor_for_file_backed_db() {
936 let dir = TempDir::new().expect("tempdir");
937 let bones_dir = dir.path().join(".bones");
938 std::fs::create_dir_all(&bones_dir).expect("create .bones");
939
940 let shard_mgr = ShardManager::new(&bones_dir);
941 shard_mgr.init().expect("init shard manager");
942
943 let db_path = bones_dir.join("bones.db");
944 let mut conn = Connection::open(&db_path).expect("open projection db");
945 migrations::migrate(&mut conn).expect("migrate");
946 ensure_tracking_table(&conn).expect("create tracking table");
947
948 let projector = Projector::new(&conn);
949 let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
950 let line = writer::write_event(&mut event).expect("serialize event");
951 shard_mgr
952 .append(&line, false, std::time::Duration::from_secs(5))
953 .expect("append event line");
954
955 projector.project_event(&event).expect("project event");
956
957 let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
958 let expected_offset =
959 i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
960
961 assert_eq!(offset, expected_offset);
962 assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
963 }
964
965 #[test]
970 fn project_create_inserts_item() {
971 let conn = test_db();
972 let projector = Projector::new(&conn);
973 let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
974
975 let result = projector.project_event(&event).unwrap();
976 assert!(result, "should return true for new projection");
977
978 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
979 assert_eq!(item.title, "Fix auth timeout");
980 assert_eq!(item.kind, "task");
981 assert_eq!(item.state, "open");
982 assert_eq!(item.urgency, "default");
983 assert_eq!(item.size.as_deref(), Some("m"));
984 assert_eq!(item.description.as_deref(), Some("A detailed description"));
985 assert_eq!(item.created_at_us, 1000);
986 assert_eq!(item.updated_at_us, 1000);
987 }
988
989 #[test]
990 fn project_create_inserts_labels() {
991 let conn = test_db();
992 let projector = Projector::new(&conn);
993 let event = make_create("bn-001", "Fix auth", "aaa", 1000);
994 projector.project_event(&event).unwrap();
995
996 let labels = query::get_labels(&conn, "bn-001").unwrap();
997 assert_eq!(labels.len(), 2);
998 assert_eq!(labels[0].label, "auth");
999 assert_eq!(labels[1].label, "backend");
1000 }
1001
1002 #[test]
1007 fn project_update_title() {
1008 let conn = test_db();
1009 let projector = Projector::new(&conn);
1010 projector
1011 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1012 .unwrap();
1013
1014 let update = make_event(
1015 EventType::Update,
1016 "bn-001",
1017 EventData::Update(UpdateData {
1018 field: "title".into(),
1019 value: serde_json::json!("New title"),
1020 extra: BTreeMap::new(),
1021 }),
1022 "bbb",
1023 2000,
1024 );
1025 projector.project_event(&update).unwrap();
1026
1027 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1028 assert_eq!(item.title, "New title");
1029 assert_eq!(item.updated_at_us, 2000);
1030 }
1031
1032 #[test]
1033 fn project_update_description() {
1034 let conn = test_db();
1035 let projector = Projector::new(&conn);
1036 projector
1037 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1038 .unwrap();
1039
1040 let update = make_event(
1041 EventType::Update,
1042 "bn-001",
1043 EventData::Update(UpdateData {
1044 field: "description".into(),
1045 value: serde_json::json!("Updated description"),
1046 extra: BTreeMap::new(),
1047 }),
1048 "bbb",
1049 2000,
1050 );
1051 projector.project_event(&update).unwrap();
1052
1053 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1054 assert_eq!(item.description.as_deref(), Some("Updated description"));
1055 }
1056
1057 #[test]
1058 fn project_update_labels() {
1059 let conn = test_db();
1060 let projector = Projector::new(&conn);
1061 projector
1062 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1063 .unwrap();
1064
1065 let update = make_event(
1066 EventType::Update,
1067 "bn-001",
1068 EventData::Update(UpdateData {
1069 field: "labels".into(),
1070 value: serde_json::json!(["frontend", "urgent"]),
1071 extra: BTreeMap::new(),
1072 }),
1073 "bbb",
1074 2000,
1075 );
1076 projector.project_event(&update).unwrap();
1077
1078 let labels = query::get_labels(&conn, "bn-001").unwrap();
1079 assert_eq!(labels.len(), 2);
1080 assert_eq!(labels[0].label, "frontend");
1081 assert_eq!(labels[1].label, "urgent");
1082 }
1083
1084 #[test]
1085 fn project_update_unknown_field_bumps_updated() {
1086 let conn = test_db();
1087 let projector = Projector::new(&conn);
1088 projector
1089 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1090 .unwrap();
1091
1092 let update = make_event(
1093 EventType::Update,
1094 "bn-001",
1095 EventData::Update(UpdateData {
1096 field: "future_field".into(),
1097 value: serde_json::json!("whatever"),
1098 extra: BTreeMap::new(),
1099 }),
1100 "bbb",
1101 2000,
1102 );
1103 projector.project_event(&update).unwrap();
1104
1105 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1106 assert_eq!(item.updated_at_us, 2000);
1107 }
1108
1109 #[test]
1114 fn project_move_updates_state() {
1115 let conn = test_db();
1116 let projector = Projector::new(&conn);
1117 projector
1118 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1119 .unwrap();
1120
1121 let mv = make_event(
1122 EventType::Move,
1123 "bn-001",
1124 EventData::Move(MoveData {
1125 state: State::Doing,
1126 reason: Some("Starting work".into()),
1127 extra: BTreeMap::new(),
1128 }),
1129 "bbb",
1130 2000,
1131 );
1132 projector.project_event(&mv).unwrap();
1133
1134 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1135 assert_eq!(item.state, "doing");
1136 }
1137
1138 #[test]
1143 fn project_assign_and_unassign() {
1144 let conn = test_db();
1145 let projector = Projector::new(&conn);
1146 projector
1147 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1148 .unwrap();
1149
1150 let assign = make_event(
1152 EventType::Assign,
1153 "bn-001",
1154 EventData::Assign(AssignData {
1155 agent: "alice".into(),
1156 action: AssignAction::Assign,
1157 extra: BTreeMap::new(),
1158 }),
1159 "bbb",
1160 2000,
1161 );
1162 projector.project_event(&assign).unwrap();
1163
1164 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1165 assert_eq!(assignees.len(), 1);
1166 assert_eq!(assignees[0].agent, "alice");
1167
1168 let unassign = make_event(
1170 EventType::Assign,
1171 "bn-001",
1172 EventData::Assign(AssignData {
1173 agent: "alice".into(),
1174 action: AssignAction::Unassign,
1175 extra: BTreeMap::new(),
1176 }),
1177 "ccc",
1178 3000,
1179 );
1180 projector.project_event(&unassign).unwrap();
1181
1182 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1183 assert!(assignees.is_empty());
1184 }
1185
1186 #[test]
1191 fn project_comment_inserts_row() {
1192 let conn = test_db();
1193 let projector = Projector::new(&conn);
1194 projector
1195 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1196 .unwrap();
1197
1198 let comment = make_event(
1199 EventType::Comment,
1200 "bn-001",
1201 EventData::Comment(CommentData {
1202 body: "This is a comment".into(),
1203 extra: BTreeMap::new(),
1204 }),
1205 "bbb",
1206 2000,
1207 );
1208 projector.project_event(&comment).unwrap();
1209
1210 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1211 assert_eq!(comments.len(), 1);
1212 assert_eq!(comments[0].body, "This is a comment");
1213 assert_eq!(comments[0].author, "test-agent");
1214 assert_eq!(comments[0].event_hash, "blake3:bbb");
1215 }
1216
1217 #[test]
1222 fn project_link_and_unlink() {
1223 let conn = test_db();
1224 let projector = Projector::new(&conn);
1225 projector
1226 .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1227 .unwrap();
1228 projector
1229 .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1230 .unwrap();
1231
1232 let link = make_event(
1234 EventType::Link,
1235 "bn-002",
1236 EventData::Link(LinkData {
1237 target: "bn-001".into(),
1238 link_type: "blocks".into(),
1239 extra: BTreeMap::new(),
1240 }),
1241 "ccc",
1242 2000,
1243 );
1244 projector.project_event(&link).unwrap();
1245
1246 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1247 assert_eq!(deps.len(), 1);
1248 assert_eq!(deps[0].depends_on_item_id, "bn-001");
1249
1250 let unlink = make_event(
1252 EventType::Unlink,
1253 "bn-002",
1254 EventData::Unlink(UnlinkData {
1255 target: "bn-001".into(),
1256 link_type: Some("blocks".into()),
1257 extra: BTreeMap::new(),
1258 }),
1259 "ddd",
1260 3000,
1261 );
1262 projector.project_event(&unlink).unwrap();
1263
1264 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1265 assert!(deps.is_empty());
1266 }
1267
1268 #[test]
1273 fn project_delete_soft_deletes() {
1274 let conn = test_db();
1275 let projector = Projector::new(&conn);
1276 projector
1277 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1278 .unwrap();
1279
1280 let delete = make_event(
1281 EventType::Delete,
1282 "bn-001",
1283 EventData::Delete(DeleteData {
1284 reason: Some("Duplicate".into()),
1285 extra: BTreeMap::new(),
1286 }),
1287 "bbb",
1288 2000,
1289 );
1290 projector.project_event(&delete).unwrap();
1291
1292 assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1294 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1296 assert!(item.is_deleted);
1297 assert_eq!(item.deleted_at_us, Some(2000));
1298 }
1299
1300 #[test]
1305 fn project_compact_sets_summary() {
1306 let conn = test_db();
1307 let projector = Projector::new(&conn);
1308 projector
1309 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1310 .unwrap();
1311
1312 let compact = make_event(
1313 EventType::Compact,
1314 "bn-001",
1315 EventData::Compact(CompactData {
1316 summary: "TL;DR: auth fix".into(),
1317 extra: BTreeMap::new(),
1318 }),
1319 "bbb",
1320 2000,
1321 );
1322 projector.project_event(&compact).unwrap();
1323
1324 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1325 assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1326 }
1327
1328 #[test]
1333 fn project_snapshot_stores_json() {
1334 let conn = test_db();
1335 let projector = Projector::new(&conn);
1336 projector
1337 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1338 .unwrap();
1339
1340 let snapshot = make_event(
1341 EventType::Snapshot,
1342 "bn-001",
1343 EventData::Snapshot(SnapshotData {
1344 state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1345 extra: BTreeMap::new(),
1346 }),
1347 "bbb",
1348 2000,
1349 );
1350 projector.project_event(&snapshot).unwrap();
1351
1352 let row: String = conn
1353 .query_row(
1354 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1355 [],
1356 |row| row.get(0),
1357 )
1358 .unwrap();
1359 let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1360 assert_eq!(parsed["title"], "Snapshotted");
1361 }
1362
1363 #[test]
1368 fn project_redact_records_and_blanks_comment() {
1369 let conn = test_db();
1370 let projector = Projector::new(&conn);
1371 projector
1372 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1373 .unwrap();
1374
1375 let comment = make_event(
1377 EventType::Comment,
1378 "bn-001",
1379 EventData::Comment(CommentData {
1380 body: "Secret password: hunter2".into(),
1381 extra: BTreeMap::new(),
1382 }),
1383 "comment_hash",
1384 2000,
1385 );
1386 projector.project_event(&comment).unwrap();
1387
1388 let redact = make_event(
1390 EventType::Redact,
1391 "bn-001",
1392 EventData::Redact(RedactData {
1393 target_hash: "blake3:comment_hash".into(),
1394 reason: "Accidental secret".into(),
1395 extra: BTreeMap::new(),
1396 }),
1397 "redact_hash",
1398 3000,
1399 );
1400 projector.project_event(&redact).unwrap();
1401
1402 let count: i64 = conn
1404 .query_row(
1405 "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1406 [],
1407 |row| row.get(0),
1408 )
1409 .unwrap();
1410 assert_eq!(count, 1);
1411
1412 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1414 assert_eq!(comments.len(), 1);
1415 assert_eq!(comments[0].body, "[redacted]");
1416 }
1417
1418 #[test]
1423 fn duplicate_events_are_skipped() {
1424 let conn = test_db();
1425 let projector = Projector::new(&conn);
1426
1427 let event = make_create("bn-001", "Item", "aaa", 1000);
1428 assert!(projector.project_event(&event).unwrap()); assert!(!projector.project_event(&event).unwrap()); let count: i64 = conn
1433 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1434 .unwrap();
1435 assert_eq!(count, 1);
1436 }
1437
1438 #[test]
1439 fn batch_dedup_counts() {
1440 let conn = test_db();
1441 let projector = Projector::new(&conn);
1442
1443 let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1444 let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1445
1446 let stats1 = projector
1448 .project_batch(&[event1.clone(), event2.clone()])
1449 .unwrap();
1450 assert_eq!(stats1.projected, 2);
1451 assert_eq!(stats1.duplicates, 0);
1452
1453 let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1455 assert_eq!(stats2.projected, 0);
1456 assert_eq!(stats2.duplicates, 2);
1457 }
1458
1459 #[test]
1464 fn incremental_matches_full_replay() {
1465 let events = vec![
1466 make_create("bn-001", "Auth bug", "h1", 1000),
1467 make_event(
1468 EventType::Move,
1469 "bn-001",
1470 EventData::Move(MoveData {
1471 state: State::Doing,
1472 reason: None,
1473 extra: BTreeMap::new(),
1474 }),
1475 "h2",
1476 2000,
1477 ),
1478 make_event(
1479 EventType::Assign,
1480 "bn-001",
1481 EventData::Assign(AssignData {
1482 agent: "alice".into(),
1483 action: AssignAction::Assign,
1484 extra: BTreeMap::new(),
1485 }),
1486 "h3",
1487 3000,
1488 ),
1489 make_event(
1490 EventType::Comment,
1491 "bn-001",
1492 EventData::Comment(CommentData {
1493 body: "Working on it".into(),
1494 extra: BTreeMap::new(),
1495 }),
1496 "h4",
1497 4000,
1498 ),
1499 make_event(
1500 EventType::Update,
1501 "bn-001",
1502 EventData::Update(UpdateData {
1503 field: "title".into(),
1504 value: serde_json::json!("Auth bug (fixed)"),
1505 extra: BTreeMap::new(),
1506 }),
1507 "h5",
1508 5000,
1509 ),
1510 make_event(
1511 EventType::Move,
1512 "bn-001",
1513 EventData::Move(MoveData {
1514 state: State::Done,
1515 reason: Some("Shipped".into()),
1516 extra: BTreeMap::new(),
1517 }),
1518 "h6",
1519 6000,
1520 ),
1521 ];
1522
1523 let conn_full = test_db();
1525 let proj_full = Projector::new(&conn_full);
1526 proj_full.project_batch(&events).unwrap();
1527
1528 let conn_inc = test_db();
1530 let proj_inc = Projector::new(&conn_inc);
1531 for event in &events {
1532 proj_inc.project_event(event).unwrap();
1533 }
1534
1535 let item_full = query::get_item(&conn_full, "bn-001", false)
1537 .unwrap()
1538 .unwrap();
1539 let item_inc = query::get_item(&conn_inc, "bn-001", false)
1540 .unwrap()
1541 .unwrap();
1542
1543 assert_eq!(item_full.title, item_inc.title);
1544 assert_eq!(item_full.state, item_inc.state);
1545 assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1546
1547 let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1548 let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1549 assert_eq!(assignees_full.len(), assignees_inc.len());
1550
1551 let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1552 let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1553 assert_eq!(comments_full.len(), comments_inc.len());
1554 }
1555
1556 #[test]
1561 fn clear_and_replay_produces_same_result() {
1562 let conn = test_db();
1563 let projector = Projector::new(&conn);
1564
1565 let events = vec![
1566 make_create("bn-001", "Item 1", "h1", 1000),
1567 make_create("bn-002", "Item 2", "h2", 1001),
1568 make_event(
1569 EventType::Link,
1570 "bn-002",
1571 EventData::Link(LinkData {
1572 target: "bn-001".into(),
1573 link_type: "blocks".into(),
1574 extra: BTreeMap::new(),
1575 }),
1576 "h3",
1577 2000,
1578 ),
1579 ];
1580
1581 projector.project_batch(&events).unwrap();
1583 let count1: i64 = conn
1584 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1585 .unwrap();
1586 assert_eq!(count1, 2);
1587
1588 clear_projection(&conn).unwrap();
1590 let count_after_clear: i64 = conn
1591 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1592 .unwrap();
1593 assert_eq!(count_after_clear, 0);
1594
1595 projector.project_batch(&events).unwrap();
1596 let count2: i64 = conn
1597 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1598 .unwrap();
1599 assert_eq!(count2, 2);
1600
1601 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1602 assert_eq!(deps.len(), 1);
1603 }
1604
1605 #[test]
1610 fn events_on_missing_item_create_placeholder() {
1611 let conn = test_db();
1612 let projector = Projector::new(&conn);
1613
1614 let comment = make_event(
1616 EventType::Comment,
1617 "bn-ghost",
1618 EventData::Comment(CommentData {
1619 body: "Comment on missing item".into(),
1620 extra: BTreeMap::new(),
1621 }),
1622 "ccc",
1623 2000,
1624 );
1625 projector.project_event(&comment).unwrap();
1626
1627 let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1629 assert_eq!(item.title, ""); assert_eq!(item.state, "open");
1631 }
1632
1633 #[test]
1638 fn project_create_populates_fts() {
1639 let conn = test_db();
1640 let projector = Projector::new(&conn);
1641 projector
1642 .project_event(&make_create(
1643 "bn-001",
1644 "Authentication timeout",
1645 "aaa",
1646 1000,
1647 ))
1648 .unwrap();
1649
1650 let hits = query::search(&conn, "authentication", 10).unwrap();
1651 assert_eq!(hits.len(), 1);
1652 assert_eq!(hits[0].item_id, "bn-001");
1653 }
1654
1655 #[test]
1656 fn project_update_title_updates_fts() {
1657 let conn = test_db();
1658 let projector = Projector::new(&conn);
1659 projector
1660 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1661 .unwrap();
1662
1663 let update = make_event(
1664 EventType::Update,
1665 "bn-001",
1666 EventData::Update(UpdateData {
1667 field: "title".into(),
1668 value: serde_json::json!("Authorization failure"),
1669 extra: BTreeMap::new(),
1670 }),
1671 "bbb",
1672 2000,
1673 );
1674 projector.project_event(&update).unwrap();
1675
1676 let hits_old = query::search(&conn, "Old", 10).unwrap();
1678 assert!(hits_old.is_empty());
1679
1680 let hits_new = query::search(&conn, "authorization", 10).unwrap();
1682 assert_eq!(hits_new.len(), 1);
1683 }
1684
1685 #[test]
1690 fn batch_reports_correct_stats() {
1691 let conn = test_db();
1692 let projector = Projector::new(&conn);
1693
1694 let events = vec![
1695 make_create("bn-001", "Item 1", "h1", 1000),
1696 make_create("bn-002", "Item 2", "h2", 1001),
1697 make_create("bn-003", "Item 3", "h3", 1002),
1698 ];
1699
1700 let stats = projector.project_batch(&events).unwrap();
1701 assert_eq!(stats.projected, 3);
1702 assert_eq!(stats.duplicates, 0);
1703 assert_eq!(stats.errors, 0);
1704 }
1705
1706 #[test]
1711 fn full_lifecycle_all_event_types() {
1712 let conn = test_db();
1713 let projector = Projector::new(&conn);
1714
1715 let mut events = vec![
1716 make_create("bn-001", "Auth bug", "h01", 1000),
1718 make_create("bn-002", "Dep item", "h02", 1001),
1720 ];
1721
1722 events.push(make_event(
1724 EventType::Update,
1725 "bn-001",
1726 EventData::Update(UpdateData {
1727 field: "title".into(),
1728 value: serde_json::json!("Auth timeout bug"),
1729 extra: BTreeMap::new(),
1730 }),
1731 "h03",
1732 2000,
1733 ));
1734
1735 events.push(make_event(
1737 EventType::Move,
1738 "bn-001",
1739 EventData::Move(MoveData {
1740 state: State::Doing,
1741 reason: None,
1742 extra: BTreeMap::new(),
1743 }),
1744 "h04",
1745 3000,
1746 ));
1747
1748 events.push(make_event(
1750 EventType::Assign,
1751 "bn-001",
1752 EventData::Assign(AssignData {
1753 agent: "alice".into(),
1754 action: AssignAction::Assign,
1755 extra: BTreeMap::new(),
1756 }),
1757 "h05",
1758 4000,
1759 ));
1760
1761 events.push(make_event(
1763 EventType::Comment,
1764 "bn-001",
1765 EventData::Comment(CommentData {
1766 body: "Found root cause".into(),
1767 extra: BTreeMap::new(),
1768 }),
1769 "h06",
1770 5000,
1771 ));
1772
1773 events.push(make_event(
1775 EventType::Link,
1776 "bn-001",
1777 EventData::Link(LinkData {
1778 target: "bn-002".into(),
1779 link_type: "blocks".into(),
1780 extra: BTreeMap::new(),
1781 }),
1782 "h07",
1783 6000,
1784 ));
1785
1786 events.push(make_event(
1788 EventType::Unlink,
1789 "bn-001",
1790 EventData::Unlink(UnlinkData {
1791 target: "bn-002".into(),
1792 link_type: Some("blocks".into()),
1793 extra: BTreeMap::new(),
1794 }),
1795 "h08",
1796 7000,
1797 ));
1798
1799 events.push(make_event(
1801 EventType::Compact,
1802 "bn-001",
1803 EventData::Compact(CompactData {
1804 summary: "Auth token refresh race".into(),
1805 extra: BTreeMap::new(),
1806 }),
1807 "h09",
1808 8000,
1809 ));
1810
1811 events.push(make_event(
1813 EventType::Snapshot,
1814 "bn-001",
1815 EventData::Snapshot(SnapshotData {
1816 state: serde_json::json!({"id": "bn-001", "resolved": true}),
1817 extra: BTreeMap::new(),
1818 }),
1819 "h10",
1820 9000,
1821 ));
1822
1823 events.push(make_event(
1825 EventType::Redact,
1826 "bn-001",
1827 EventData::Redact(RedactData {
1828 target_hash: "blake3:h06".into(),
1829 reason: "Contained secret".into(),
1830 extra: BTreeMap::new(),
1831 }),
1832 "h11",
1833 10000,
1834 ));
1835
1836 events.push(make_event(
1838 EventType::Delete,
1839 "bn-001",
1840 EventData::Delete(DeleteData {
1841 reason: Some("Duplicate".into()),
1842 extra: BTreeMap::new(),
1843 }),
1844 "h12",
1845 11000,
1846 ));
1847
1848 let stats = projector.project_batch(&events).unwrap();
1849 assert_eq!(stats.projected, 12); assert_eq!(stats.duplicates, 0);
1851 assert_eq!(stats.errors, 0);
1852
1853 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1855 assert_eq!(item.title, "Auth timeout bug");
1856 assert_eq!(item.state, "doing");
1857 assert!(item.is_deleted);
1858 assert_eq!(
1859 item.compact_summary.as_deref(),
1860 Some("Auth token refresh race")
1861 );
1862 let snapshot: Option<String> = conn
1863 .query_row(
1864 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1865 [],
1866 |row| row.get(0),
1867 )
1868 .unwrap();
1869 assert!(snapshot.is_some());
1870
1871 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1873 assert_eq!(comments.len(), 1);
1874 assert_eq!(comments[0].body, "[redacted]");
1875
1876 let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1878 assert!(deps.is_empty());
1879
1880 let redaction_count: i64 = conn
1882 .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1883 row.get(0)
1884 })
1885 .unwrap();
1886 assert_eq!(redaction_count, 1);
1887 }
1888
1889 #[test]
1890 fn late_create_populates_placeholder() {
1891 let conn = test_db();
1892 let projector = Projector::new(&conn);
1893
1894 let comment = make_event(
1896 EventType::Comment,
1897 "bn-late",
1898 EventData::Comment(CommentData {
1899 body: "Comment first".into(),
1900 extra: BTreeMap::new(),
1901 }),
1902 "h1",
1903 1000,
1904 );
1905 projector.project_event(&comment).unwrap();
1906
1907 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1908 assert_eq!(item.title, "");
1909 assert_eq!(item.created_at_us, 1000);
1910
1911 let create = make_create("bn-late", "Real Title", "h2", 900);
1913
1914 projector.project_event(&create).unwrap();
1915
1916 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1917 assert_eq!(item.title, "Real Title");
1918 assert_eq!(item.created_at_us, 900);
1919 }
1920
1921 #[test]
1925 fn projector_new_creates_tracking_table_on_fresh_db() {
1926 let mut conn = Connection::open_in_memory().expect("open in-memory db");
1927 migrations::migrate(&mut conn).expect("migrate");
1928 let projector = Projector::new(&conn);
1932 let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
1933 projector
1934 .project_event(&event)
1935 .expect("project_event should succeed on fresh DB");
1936
1937 let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
1938 assert_eq!(item.title, "Fresh item");
1939 }
1940}