1use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36 pub projected: usize,
38 pub duplicates: usize,
40 pub errors: usize,
42}
43
44pub struct Projector<'conn> {
53 conn: &'conn Connection,
54 has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58 pub fn new(conn: &'conn Connection) -> Self {
63 let _ = ensure_tracking_table(conn);
66 let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67 Self {
68 conn,
69 has_agent_column,
70 }
71 }
72
73 pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84 let mut stats = ProjectionStats::default();
85
86 self.conn
88 .execute_batch("BEGIN IMMEDIATE")
89 .context("begin projection transaction")?;
90
91 for event in events {
92 match self.project_event_inner(event) {
93 Ok(ProjectResult::Projected) => stats.projected += 1,
94 Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95 Err(e) => {
96 tracing::warn!(
97 event_hash = %event.event_hash,
98 event_type = %event.event_type,
99 item_id = %event.item_id,
100 error = %e,
101 "skipping event due to projection error"
102 );
103 stats.errors += 1;
104 }
105 }
106 }
107
108 self.conn
109 .execute_batch("COMMIT")
110 .context("commit projection transaction")?;
111
112 if stats.errors > 0 {
113 let reason = format!(
114 "project_batch encountered {} projection errors while applying {} events",
115 stats.errors,
116 events.len()
117 );
118 if let Err(err) = crate::db::mark_projection_dirty_from_connection(self.conn, &reason) {
119 tracing::warn!(error = %err, "failed to mark projection dirty after batch errors");
120 }
121 }
122
123 Ok(stats)
124 }
125
126 pub fn project_event(&self, event: &Event) -> Result<bool> {
135 let projected = match self.project_event_inner(event) {
136 Ok(ProjectResult::Projected) => true,
137 Ok(ProjectResult::Duplicate) => false,
138 Err(err) => {
139 let reason = format!(
140 "project_event failed hash={} type={}",
141 event.event_hash, event.event_type
142 );
143 if let Err(mark_err) =
144 crate::db::mark_projection_dirty_from_connection(self.conn, &reason)
145 {
146 tracing::warn!(
147 error = %mark_err,
148 event_hash = %event.event_hash,
149 "failed to mark projection dirty after single-event projection failure"
150 );
151 }
152 return Err(err);
153 }
154 };
155
156 if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
157 tracing::warn!(
158 event_hash = %event.event_hash,
159 error = %err,
160 "failed to update projection cursor after single-event projection"
161 );
162 let reason = format!(
163 "cursor update failed after projecting hash={} error={err}",
164 event.event_hash
165 );
166 let _ = crate::db::mark_projection_dirty_from_connection(self.conn, &reason);
167 }
168
169 Ok(projected)
170 }
171
172 fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
177 if self.is_event_projected(&event.event_hash)? {
179 return Ok(ProjectResult::Duplicate);
180 }
181
182 match event.event_type {
183 EventType::Create => self.project_create(event)?,
184 EventType::Update => self.project_update(event)?,
185 EventType::Move => self.project_move(event)?,
186 EventType::Assign => self.project_assign(event)?,
187 EventType::Comment => self.project_comment(event)?,
188 EventType::Link => self.project_link(event)?,
189 EventType::Unlink => self.project_unlink(event)?,
190 EventType::Delete => self.project_delete(event)?,
191 EventType::Compact => self.project_compact(event)?,
192 EventType::Snapshot => self.project_snapshot(event)?,
193 EventType::Redact => self.project_redact(event)?,
194 }
195
196 self.record_projected_hash(&event.event_hash, event)?;
198
199 Ok(ProjectResult::Projected)
200 }
201
202 fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
207 let exists: bool = self
214 .conn
215 .query_row(
216 "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
217 params![event_hash],
218 |row| row.get(0),
219 )
220 .unwrap_or(false);
221 Ok(exists)
222 }
223
224 fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
225 let exists: bool = self
226 .conn
227 .query_row(
228 "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
229 params![event_hash],
230 |row| row.get(0),
231 )
232 .unwrap_or(false);
233 Ok(exists)
234 }
235
236 fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
237 if self.has_agent_column {
238 self.conn
239 .execute(
240 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
241 VALUES (?1, ?2, ?3, ?4, ?5)",
242 params![
243 event_hash,
244 event.item_id.as_str(),
245 event.event_type.as_str(),
246 event.wall_ts_us,
247 event.agent.as_str(),
248 ],
249 )
250 .context("record projected event hash")?;
251 return Ok(());
252 }
253
254 self.conn
255 .execute(
256 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
257 VALUES (?1, ?2, ?3, ?4)",
258 params![
259 event_hash,
260 event.item_id.as_str(),
261 event.event_type.as_str(),
262 event.wall_ts_us,
263 ],
264 )
265 .context("record projected event hash")?;
266 Ok(())
267 }
268
269 fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
270 let main_db_file: String = self
271 .conn
272 .query_row(
273 "SELECT file FROM pragma_database_list WHERE name = 'main'",
274 [],
275 |row| row.get(0),
276 )
277 .context("read main database file from pragma_database_list")?;
278
279 if main_db_file.trim().is_empty() {
280 return Ok(());
282 }
283
284 let db_path = std::path::Path::new(&main_db_file);
285 let Some(bones_dir) = db_path.parent() else {
286 return Ok(());
287 };
288
289 let shard_mgr = ShardManager::new(bones_dir);
290 let total_len = shard_mgr
291 .total_content_len()
292 .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
293 let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
294
295 query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
296 .context("write projection cursor after single-event projection")?;
297
298 Ok(())
299 }
300
301 fn project_create(&self, event: &Event) -> Result<()> {
306 let EventData::Create(ref data) = event.data else {
307 anyhow::bail!("expected Create data for item.create event");
308 };
309
310 let is_redacted = self.is_event_redacted(&event.event_hash)?;
311 let title = if is_redacted { "" } else { &data.title };
312 let description = if is_redacted {
313 None
314 } else {
315 data.description.as_deref()
316 };
317 let labels_str = if is_redacted {
318 String::new()
319 } else {
320 data.labels.join(" ")
321 };
322
323 self.conn
324 .execute(
325 "INSERT INTO items (
326 item_id, title, description, kind, state, urgency,
327 size, parent_id, is_deleted, search_labels,
328 created_at_us, updated_at_us
329 ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
330 ON CONFLICT(item_id) DO UPDATE SET
331 title = excluded.title,
332 description = excluded.description,
333 kind = excluded.kind,
334 urgency = excluded.urgency,
335 size = excluded.size,
336 parent_id = excluded.parent_id,
337 created_at_us = excluded.created_at_us,
338 search_labels = excluded.search_labels,
339 updated_at_us = excluded.updated_at_us
340 WHERE items.title = ''",
341 params![
342 event.item_id.as_str(),
343 title,
344 description,
345 data.kind.to_string(),
346 data.urgency.to_string(),
347 data.size.map(|s| s.to_string()),
348 data.parent.as_deref(),
349 labels_str,
350 event.wall_ts_us,
351 event.wall_ts_us,
352 ],
353 )
354 .with_context(|| format!("project create for {}", event.item_id))?;
355
356 if !is_redacted {
358 for label in &data.labels {
359 self.conn
360 .execute(
361 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
362 VALUES (?1, ?2, ?3)",
363 params![event.item_id.as_str(), label, event.wall_ts_us],
364 )
365 .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
366 }
367 }
368
369 Ok(())
370 }
371
372 #[allow(clippy::too_many_lines)]
373 fn project_update(&self, event: &Event) -> Result<()> {
374 let EventData::Update(ref data) = event.data else {
375 anyhow::bail!("expected Update data for item.update event");
376 };
377
378 self.ensure_item_exists(event)?;
379
380 let is_redacted = self.is_event_redacted(&event.event_hash)?;
381
382 match data.field.as_str() {
383 "title" => {
384 let value = if is_redacted {
385 "[redacted]"
386 } else {
387 data.value.as_str().unwrap_or_default()
388 };
389 self.conn.execute(
390 "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
391 params![value, event.wall_ts_us, event.item_id.as_str()],
392 )?;
393 }
394 "description" => {
395 let value = if is_redacted {
396 Some("[redacted]".to_string())
397 } else {
398 data.value.as_str().map(String::from)
399 };
400 self.conn.execute(
401 "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
402 params![value, event.wall_ts_us, event.item_id.as_str()],
403 )?;
404 }
405 "kind" => {
406 let value = data.value.as_str().unwrap_or("task");
407 self.conn.execute(
408 "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
409 params![value, event.wall_ts_us, event.item_id.as_str()],
410 )?;
411 }
412 "size" => {
413 let value = data.value.as_str().map(String::from);
414 self.conn.execute(
415 "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
416 params![value, event.wall_ts_us, event.item_id.as_str()],
417 )?;
418 }
419 "urgency" => {
420 let value = data.value.as_str().unwrap_or("default");
421 self.conn.execute(
422 "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
423 params![value, event.wall_ts_us, event.item_id.as_str()],
424 )?;
425 }
426 "parent" => {
427 let value = data.value.as_str().map(String::from);
428 self.conn.execute(
429 "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
430 params![value, event.wall_ts_us, event.item_id.as_str()],
431 )?;
432 }
433 "labels" => {
434 let mut changed = false;
437
438 if let Some(labels) = data.value.as_array() {
439 self.conn.execute(
441 "DELETE FROM item_labels WHERE item_id = ?1",
442 params![event.item_id.as_str()],
443 )?;
444
445 for label_val in labels {
446 if let Some(label) = label_val.as_str() {
447 self.conn.execute(
448 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
449 VALUES (?1, ?2, ?3)",
450 params![event.item_id.as_str(), label, event.wall_ts_us],
451 )?;
452 }
453 }
454 changed = true;
455 } else if let Some(obj) = data.value.as_object() {
456 let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
458 let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
459
460 if !label.is_empty() {
461 match action {
462 "add" => {
463 self.conn.execute(
464 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
465 VALUES (?1, ?2, ?3)",
466 params![event.item_id.as_str(), label, event.wall_ts_us],
467 )?;
468 changed = true;
469 }
470 "remove" => {
471 self.conn.execute(
472 "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
473 params![event.item_id.as_str(), label],
474 )?;
475 changed = true;
476 }
477 _ => {}
478 }
479 }
480 }
481
482 if changed {
483 let mut stmt = self.conn.prepare(
485 "SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label",
486 )?;
487 let label_rows = stmt.query_map(params![event.item_id.as_str()], |row| {
488 row.get::<_, String>(0)
489 })?;
490
491 let mut label_strings = Vec::new();
492 for label_res in label_rows {
493 label_strings.push(label_res?);
494 }
495
496 let search_labels = label_strings.join(" ");
497 self.conn.execute(
498 "UPDATE items SET search_labels = ?1, updated_at_us = ?2 WHERE item_id = ?3",
499 params![search_labels, event.wall_ts_us, event.item_id.as_str()],
500 )?;
501 }
502 }
503 _ => {
504 tracing::debug!(
506 field = %data.field,
507 item_id = %event.item_id,
508 "ignoring update for unknown field"
509 );
510 self.conn.execute(
511 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
512 params![event.wall_ts_us, event.item_id.as_str()],
513 )?;
514 }
515 }
516
517 Ok(())
518 }
519
520 fn project_move(&self, event: &Event) -> Result<()> {
521 let EventData::Move(ref data) = event.data else {
522 anyhow::bail!("expected Move data for item.move event");
523 };
524
525 self.ensure_item_exists(event)?;
526
527 self.conn
528 .execute(
529 "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
530 params![
531 data.state.to_string(),
532 event.wall_ts_us,
533 event.item_id.as_str(),
534 ],
535 )
536 .with_context(|| format!("project move for {}", event.item_id))?;
537
538 Ok(())
539 }
540
541 fn project_assign(&self, event: &Event) -> Result<()> {
542 let EventData::Assign(ref data) = event.data else {
543 anyhow::bail!("expected Assign data for item.assign event");
544 };
545
546 self.ensure_item_exists(event)?;
547
548 match data.action {
549 AssignAction::Assign => {
550 self.conn
551 .execute(
552 "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
553 VALUES (?1, ?2, ?3)",
554 params![event.item_id.as_str(), data.agent, event.wall_ts_us],
555 )
556 .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
557 }
558 AssignAction::Unassign => {
559 self.conn
560 .execute(
561 "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
562 params![event.item_id.as_str(), data.agent],
563 )
564 .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
565 }
566 }
567
568 self.conn.execute(
570 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
571 params![event.wall_ts_us, event.item_id.as_str()],
572 )?;
573
574 Ok(())
575 }
576
577 fn project_comment(&self, event: &Event) -> Result<()> {
578 let EventData::Comment(ref data) = event.data else {
579 anyhow::bail!("expected Comment data for item.comment event");
580 };
581
582 self.ensure_item_exists(event)?;
583
584 let is_redacted = self.is_event_redacted(&event.event_hash)?;
585 let body = if is_redacted {
586 "[redacted]"
587 } else {
588 &data.body
589 };
590
591 self.conn
592 .execute(
593 "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
594 VALUES (?1, ?2, ?3, ?4, ?5)",
595 params![
596 event.item_id.as_str(),
597 event.event_hash,
598 event.agent,
599 body,
600 event.wall_ts_us,
601 ],
602 )
603 .with_context(|| format!("project comment for {}", event.item_id))?;
604
605 self.conn.execute(
607 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
608 params![event.wall_ts_us, event.item_id.as_str()],
609 )?;
610
611 Ok(())
612 }
613
614 fn project_link(&self, event: &Event) -> Result<()> {
615 let EventData::Link(ref data) = event.data else {
616 anyhow::bail!("expected Link data for item.link event");
617 };
618
619 self.ensure_item_exists(event)?;
620
621 self.conn
622 .execute(
623 "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
624 VALUES (?1, ?2, ?3, ?4)",
625 params![
626 event.item_id.as_str(),
627 data.target,
628 data.link_type,
629 event.wall_ts_us,
630 ],
631 )
632 .with_context(|| {
633 format!("project link {} -> {}", event.item_id, data.target)
634 })?;
635
636 self.conn.execute(
638 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
639 params![event.wall_ts_us, event.item_id.as_str()],
640 )?;
641
642 Ok(())
643 }
644
645 fn project_unlink(&self, event: &Event) -> Result<()> {
646 let EventData::Unlink(ref data) = event.data else {
647 anyhow::bail!("expected Unlink data for item.unlink event");
648 };
649
650 self.ensure_item_exists(event)?;
651
652 if let Some(ref link_type) = data.link_type {
653 self.conn
654 .execute(
655 "DELETE FROM item_dependencies \
656 WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
657 params![event.item_id.as_str(), data.target, link_type],
658 )
659 .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
660 } else {
661 self.conn
663 .execute(
664 "DELETE FROM item_dependencies \
665 WHERE item_id = ?1 AND depends_on_item_id = ?2",
666 params![event.item_id.as_str(), data.target],
667 )
668 .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
669 }
670
671 self.conn.execute(
673 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
674 params![event.wall_ts_us, event.item_id.as_str()],
675 )?;
676
677 Ok(())
678 }
679
680 fn project_delete(&self, event: &Event) -> Result<()> {
681 self.ensure_item_exists(event)?;
682
683 self.conn
684 .execute(
685 "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
686 WHERE item_id = ?2",
687 params![event.wall_ts_us, event.item_id.as_str()],
688 )
689 .with_context(|| format!("project delete for {}", event.item_id))?;
690
691 Ok(())
692 }
693
694 fn project_compact(&self, event: &Event) -> Result<()> {
695 let EventData::Compact(ref data) = event.data else {
696 anyhow::bail!("expected Compact data for item.compact event");
697 };
698
699 self.ensure_item_exists(event)?;
700
701 let is_redacted = self.is_event_redacted(&event.event_hash)?;
702 let summary = if is_redacted {
703 "[redacted]"
704 } else {
705 data.summary.as_str()
706 };
707
708 self.conn
709 .execute(
710 "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
711 params![summary, event.wall_ts_us, event.item_id.as_str()],
712 )
713 .with_context(|| format!("project compact for {}", event.item_id))?;
714
715 Ok(())
716 }
717
718 fn project_snapshot(&self, event: &Event) -> Result<()> {
719 let EventData::Snapshot(ref data) = event.data else {
720 anyhow::bail!("expected Snapshot data for item.snapshot event");
721 };
722
723 self.ensure_item_exists(event)?;
724
725 let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
726
727 self.conn
728 .execute(
729 "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
730 params![json_str, event.wall_ts_us, event.item_id.as_str()],
731 )
732 .with_context(|| format!("project snapshot for {}", event.item_id))?;
733
734 Ok(())
735 }
736
737 fn project_redact(&self, event: &Event) -> Result<()> {
738 let EventData::Redact(ref data) = event.data else {
739 anyhow::bail!("expected Redact data for item.redact event");
740 };
741
742 self.conn
744 .execute(
745 "INSERT OR IGNORE INTO event_redactions \
746 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
747 VALUES (?1, ?2, ?3, ?4, ?5)",
748 params![
749 data.target_hash,
750 event.item_id.as_str(),
751 data.reason,
752 event.agent,
753 event.wall_ts_us,
754 ],
755 )
756 .with_context(|| {
757 format!(
758 "project redact for {} targeting {}",
759 event.item_id, data.target_hash
760 )
761 })?;
762
763 self.conn
765 .execute(
766 "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
767 params![data.target_hash],
768 )
769 .context("redact comment body")?;
770
771 Ok(())
772 }
773
774 fn ensure_item_exists(&self, event: &Event) -> Result<()> {
783 let exists: bool = self
784 .conn
785 .query_row(
786 "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
787 params![event.item_id.as_str()],
788 |row| row.get(0),
789 )
790 .context("check item exists")?;
791
792 if !exists {
793 self.conn
794 .execute(
795 "INSERT INTO items (
796 item_id, title, kind, state, urgency,
797 is_deleted, search_labels, created_at_us, updated_at_us
798 ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
799 params![event.item_id.as_str(), event.wall_ts_us],
800 )
801 .with_context(|| format!("create placeholder item for {}", event.item_id))?;
802 }
803
804 Ok(())
805 }
806}
807
808enum ProjectResult {
809 Projected,
810 Duplicate,
811}
812
813pub const PROJECTED_EVENTS_DDL: &str = "\
822CREATE TABLE IF NOT EXISTS projected_events (
823 event_hash TEXT PRIMARY KEY,
824 item_id TEXT NOT NULL,
825 event_type TEXT NOT NULL,
826 projected_at_us INTEGER NOT NULL,
827 agent TEXT NOT NULL DEFAULT ''
828);
829CREATE INDEX IF NOT EXISTS idx_projected_events_item
830 ON projected_events(item_id);
831CREATE INDEX IF NOT EXISTS idx_projected_events_agent
832 ON projected_events(agent);
833";
834
835pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
844 conn.execute_batch(PROJECTED_EVENTS_DDL)
845 .context("create projected_events tracking table")?;
846
847 if !projected_events_has_agent_column(conn)? {
848 conn.execute(
849 "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
850 [],
851 )
852 .context("add agent column to projected_events")?;
853 }
854
855 conn.execute_batch(
856 "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
857 )
858 .context("create projected_events_agent index")?;
859
860 Ok(())
861}
862
863fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
864 let mut stmt = conn
865 .prepare("PRAGMA table_info(projected_events)")
866 .context("inspect projected_events schema")?;
867 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
868
869 for row in rows {
870 let name = row.context("read projected_events column")?;
871 if name == "agent" {
872 return Ok(true);
873 }
874 }
875
876 Ok(false)
877}
878
879pub fn clear_projection(conn: &Connection) -> Result<()> {
888 conn.execute_batch(
889 "DELETE FROM event_redactions;
890 DELETE FROM item_comments;
891 DELETE FROM item_dependencies;
892 DELETE FROM item_assignees;
893 DELETE FROM item_labels;
894 DELETE FROM items;
895 DELETE FROM projected_events;
896 UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
897 )
898 .context("clear projection tables")?;
899 Ok(())
900}
901
902#[cfg(test)]
907mod tests {
908 use super::*;
909 use crate::db::{migrations, query};
910 use crate::event::data::*;
911 use crate::event::types::EventType;
912 use crate::event::writer;
913 use crate::model::item::{Kind, Size, State, Urgency};
914 use crate::model::item_id::ItemId;
915 use crate::shard::ShardManager;
916 use rusqlite::Connection;
917 use std::collections::BTreeMap;
918 use tempfile::TempDir;
919
920 fn test_db() -> Connection {
921 let mut conn = Connection::open_in_memory().expect("open in-memory db");
922 migrations::migrate(&mut conn).expect("migrate");
923 ensure_tracking_table(&conn).expect("create tracking table");
924 conn
925 }
926
927 fn make_event(
928 event_type: EventType,
929 item_id: &str,
930 data: EventData,
931 hash: &str,
932 ts: i64,
933 ) -> Event {
934 Event {
935 wall_ts_us: ts,
936 agent: "test-agent".into(),
937 itc: "itc:AQ".into(),
938 parents: vec![],
939 event_type,
940 item_id: ItemId::new_unchecked(item_id),
941 data,
942 event_hash: format!("blake3:{hash}"),
943 }
944 }
945
946 fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
947 make_event(
948 EventType::Create,
949 id,
950 EventData::Create(CreateData {
951 title: title.into(),
952 kind: Kind::Task,
953 size: Some(Size::M),
954 urgency: Urgency::Default,
955 labels: vec!["backend".into(), "auth".into()],
956 parent: None,
957 causation: None,
958 description: Some("A detailed description".into()),
959 extra: BTreeMap::new(),
960 }),
961 hash,
962 ts,
963 )
964 }
965
966 #[test]
967 fn project_event_updates_projection_cursor_for_file_backed_db() {
968 let dir = TempDir::new().expect("tempdir");
969 let bones_dir = dir.path().join(".bones");
970 std::fs::create_dir_all(&bones_dir).expect("create .bones");
971
972 let shard_mgr = ShardManager::new(&bones_dir);
973 shard_mgr.init().expect("init shard manager");
974
975 let db_path = bones_dir.join("bones.db");
976 let mut conn = Connection::open(&db_path).expect("open projection db");
977 migrations::migrate(&mut conn).expect("migrate");
978 ensure_tracking_table(&conn).expect("create tracking table");
979
980 let projector = Projector::new(&conn);
981 let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
982 let line = writer::write_event(&mut event).expect("serialize event");
983 shard_mgr
984 .append(&line, false, std::time::Duration::from_secs(5))
985 .expect("append event line");
986
987 projector.project_event(&event).expect("project event");
988
989 let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
990 let expected_offset =
991 i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
992
993 assert_eq!(offset, expected_offset);
994 assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
995 }
996
997 #[test]
1002 fn project_create_inserts_item() {
1003 let conn = test_db();
1004 let projector = Projector::new(&conn);
1005 let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
1006
1007 let result = projector.project_event(&event).unwrap();
1008 assert!(result, "should return true for new projection");
1009
1010 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1011 assert_eq!(item.title, "Fix auth timeout");
1012 assert_eq!(item.kind, "task");
1013 assert_eq!(item.state, "open");
1014 assert_eq!(item.urgency, "default");
1015 assert_eq!(item.size.as_deref(), Some("m"));
1016 assert_eq!(item.description.as_deref(), Some("A detailed description"));
1017 assert_eq!(item.created_at_us, 1000);
1018 assert_eq!(item.updated_at_us, 1000);
1019 }
1020
1021 #[test]
1022 fn project_create_inserts_labels() {
1023 let conn = test_db();
1024 let projector = Projector::new(&conn);
1025 let event = make_create("bn-001", "Fix auth", "aaa", 1000);
1026 projector.project_event(&event).unwrap();
1027
1028 let labels = query::get_labels(&conn, "bn-001").unwrap();
1029 assert_eq!(labels.len(), 2);
1030 assert_eq!(labels[0].label, "auth");
1031 assert_eq!(labels[1].label, "backend");
1032 }
1033
1034 #[test]
1039 fn project_update_title() {
1040 let conn = test_db();
1041 let projector = Projector::new(&conn);
1042 projector
1043 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1044 .unwrap();
1045
1046 let update = make_event(
1047 EventType::Update,
1048 "bn-001",
1049 EventData::Update(UpdateData {
1050 field: "title".into(),
1051 value: serde_json::json!("New title"),
1052 extra: BTreeMap::new(),
1053 }),
1054 "bbb",
1055 2000,
1056 );
1057 projector.project_event(&update).unwrap();
1058
1059 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1060 assert_eq!(item.title, "New title");
1061 assert_eq!(item.updated_at_us, 2000);
1062 }
1063
1064 #[test]
1065 fn project_update_description() {
1066 let conn = test_db();
1067 let projector = Projector::new(&conn);
1068 projector
1069 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1070 .unwrap();
1071
1072 let update = make_event(
1073 EventType::Update,
1074 "bn-001",
1075 EventData::Update(UpdateData {
1076 field: "description".into(),
1077 value: serde_json::json!("Updated description"),
1078 extra: BTreeMap::new(),
1079 }),
1080 "bbb",
1081 2000,
1082 );
1083 projector.project_event(&update).unwrap();
1084
1085 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1086 assert_eq!(item.description.as_deref(), Some("Updated description"));
1087 }
1088
1089 #[test]
1090 fn project_update_labels() {
1091 let conn = test_db();
1092 let projector = Projector::new(&conn);
1093 projector
1094 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1095 .unwrap();
1096
1097 let update = make_event(
1098 EventType::Update,
1099 "bn-001",
1100 EventData::Update(UpdateData {
1101 field: "labels".into(),
1102 value: serde_json::json!(["frontend", "urgent"]),
1103 extra: BTreeMap::new(),
1104 }),
1105 "bbb",
1106 2000,
1107 );
1108 projector.project_event(&update).unwrap();
1109
1110 let labels = query::get_labels(&conn, "bn-001").unwrap();
1111 assert_eq!(labels.len(), 2);
1112 assert_eq!(labels[0].label, "frontend");
1113 assert_eq!(labels[1].label, "urgent");
1114 }
1115
1116 #[test]
1117 fn project_update_unknown_field_bumps_updated() {
1118 let conn = test_db();
1119 let projector = Projector::new(&conn);
1120 projector
1121 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1122 .unwrap();
1123
1124 let update = make_event(
1125 EventType::Update,
1126 "bn-001",
1127 EventData::Update(UpdateData {
1128 field: "future_field".into(),
1129 value: serde_json::json!("whatever"),
1130 extra: BTreeMap::new(),
1131 }),
1132 "bbb",
1133 2000,
1134 );
1135 projector.project_event(&update).unwrap();
1136
1137 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1138 assert_eq!(item.updated_at_us, 2000);
1139 }
1140
1141 #[test]
1146 fn project_move_updates_state() {
1147 let conn = test_db();
1148 let projector = Projector::new(&conn);
1149 projector
1150 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1151 .unwrap();
1152
1153 let mv = make_event(
1154 EventType::Move,
1155 "bn-001",
1156 EventData::Move(MoveData {
1157 state: State::Doing,
1158 reason: Some("Starting work".into()),
1159 extra: BTreeMap::new(),
1160 }),
1161 "bbb",
1162 2000,
1163 );
1164 projector.project_event(&mv).unwrap();
1165
1166 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1167 assert_eq!(item.state, "doing");
1168 }
1169
1170 #[test]
1175 fn project_assign_and_unassign() {
1176 let conn = test_db();
1177 let projector = Projector::new(&conn);
1178 projector
1179 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1180 .unwrap();
1181
1182 let assign = make_event(
1184 EventType::Assign,
1185 "bn-001",
1186 EventData::Assign(AssignData {
1187 agent: "alice".into(),
1188 action: AssignAction::Assign,
1189 extra: BTreeMap::new(),
1190 }),
1191 "bbb",
1192 2000,
1193 );
1194 projector.project_event(&assign).unwrap();
1195
1196 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1197 assert_eq!(assignees.len(), 1);
1198 assert_eq!(assignees[0].agent, "alice");
1199
1200 let unassign = make_event(
1202 EventType::Assign,
1203 "bn-001",
1204 EventData::Assign(AssignData {
1205 agent: "alice".into(),
1206 action: AssignAction::Unassign,
1207 extra: BTreeMap::new(),
1208 }),
1209 "ccc",
1210 3000,
1211 );
1212 projector.project_event(&unassign).unwrap();
1213
1214 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1215 assert!(assignees.is_empty());
1216 }
1217
1218 #[test]
1223 fn project_comment_inserts_row() {
1224 let conn = test_db();
1225 let projector = Projector::new(&conn);
1226 projector
1227 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1228 .unwrap();
1229
1230 let comment = make_event(
1231 EventType::Comment,
1232 "bn-001",
1233 EventData::Comment(CommentData {
1234 body: "This is a comment".into(),
1235 extra: BTreeMap::new(),
1236 }),
1237 "bbb",
1238 2000,
1239 );
1240 projector.project_event(&comment).unwrap();
1241
1242 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1243 assert_eq!(comments.len(), 1);
1244 assert_eq!(comments[0].body, "This is a comment");
1245 assert_eq!(comments[0].author, "test-agent");
1246 assert_eq!(comments[0].event_hash, "blake3:bbb");
1247 }
1248
1249 #[test]
1254 fn project_link_and_unlink() {
1255 let conn = test_db();
1256 let projector = Projector::new(&conn);
1257 projector
1258 .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1259 .unwrap();
1260 projector
1261 .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1262 .unwrap();
1263
1264 let link = make_event(
1266 EventType::Link,
1267 "bn-002",
1268 EventData::Link(LinkData {
1269 target: "bn-001".into(),
1270 link_type: "blocks".into(),
1271 extra: BTreeMap::new(),
1272 }),
1273 "ccc",
1274 2000,
1275 );
1276 projector.project_event(&link).unwrap();
1277
1278 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1279 assert_eq!(deps.len(), 1);
1280 assert_eq!(deps[0].depends_on_item_id, "bn-001");
1281
1282 let unlink = make_event(
1284 EventType::Unlink,
1285 "bn-002",
1286 EventData::Unlink(UnlinkData {
1287 target: "bn-001".into(),
1288 link_type: Some("blocks".into()),
1289 extra: BTreeMap::new(),
1290 }),
1291 "ddd",
1292 3000,
1293 );
1294 projector.project_event(&unlink).unwrap();
1295
1296 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1297 assert!(deps.is_empty());
1298 }
1299
1300 #[test]
1305 fn project_delete_soft_deletes() {
1306 let conn = test_db();
1307 let projector = Projector::new(&conn);
1308 projector
1309 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1310 .unwrap();
1311
1312 let delete = make_event(
1313 EventType::Delete,
1314 "bn-001",
1315 EventData::Delete(DeleteData {
1316 reason: Some("Duplicate".into()),
1317 extra: BTreeMap::new(),
1318 }),
1319 "bbb",
1320 2000,
1321 );
1322 projector.project_event(&delete).unwrap();
1323
1324 assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1326 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1328 assert!(item.is_deleted);
1329 assert_eq!(item.deleted_at_us, Some(2000));
1330 }
1331
1332 #[test]
1337 fn project_compact_sets_summary() {
1338 let conn = test_db();
1339 let projector = Projector::new(&conn);
1340 projector
1341 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1342 .unwrap();
1343
1344 let compact = make_event(
1345 EventType::Compact,
1346 "bn-001",
1347 EventData::Compact(CompactData {
1348 summary: "TL;DR: auth fix".into(),
1349 extra: BTreeMap::new(),
1350 }),
1351 "bbb",
1352 2000,
1353 );
1354 projector.project_event(&compact).unwrap();
1355
1356 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1357 assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1358 }
1359
1360 #[test]
1365 fn project_snapshot_stores_json() {
1366 let conn = test_db();
1367 let projector = Projector::new(&conn);
1368 projector
1369 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1370 .unwrap();
1371
1372 let snapshot = make_event(
1373 EventType::Snapshot,
1374 "bn-001",
1375 EventData::Snapshot(SnapshotData {
1376 state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1377 extra: BTreeMap::new(),
1378 }),
1379 "bbb",
1380 2000,
1381 );
1382 projector.project_event(&snapshot).unwrap();
1383
1384 let row: String = conn
1385 .query_row(
1386 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1387 [],
1388 |row| row.get(0),
1389 )
1390 .unwrap();
1391 let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1392 assert_eq!(parsed["title"], "Snapshotted");
1393 }
1394
1395 #[test]
1400 fn project_redact_records_and_blanks_comment() {
1401 let conn = test_db();
1402 let projector = Projector::new(&conn);
1403 projector
1404 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1405 .unwrap();
1406
1407 let comment = make_event(
1409 EventType::Comment,
1410 "bn-001",
1411 EventData::Comment(CommentData {
1412 body: "Secret password: hunter2".into(),
1413 extra: BTreeMap::new(),
1414 }),
1415 "comment_hash",
1416 2000,
1417 );
1418 projector.project_event(&comment).unwrap();
1419
1420 let redact = make_event(
1422 EventType::Redact,
1423 "bn-001",
1424 EventData::Redact(RedactData {
1425 target_hash: "blake3:comment_hash".into(),
1426 reason: "Accidental secret".into(),
1427 extra: BTreeMap::new(),
1428 }),
1429 "redact_hash",
1430 3000,
1431 );
1432 projector.project_event(&redact).unwrap();
1433
1434 let count: i64 = conn
1436 .query_row(
1437 "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1438 [],
1439 |row| row.get(0),
1440 )
1441 .unwrap();
1442 assert_eq!(count, 1);
1443
1444 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1446 assert_eq!(comments.len(), 1);
1447 assert_eq!(comments[0].body, "[redacted]");
1448 }
1449
1450 #[test]
1455 fn duplicate_events_are_skipped() {
1456 let conn = test_db();
1457 let projector = Projector::new(&conn);
1458
1459 let event = make_create("bn-001", "Item", "aaa", 1000);
1460 assert!(projector.project_event(&event).unwrap()); assert!(!projector.project_event(&event).unwrap()); let count: i64 = conn
1465 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1466 .unwrap();
1467 assert_eq!(count, 1);
1468 }
1469
1470 #[test]
1471 fn batch_dedup_counts() {
1472 let conn = test_db();
1473 let projector = Projector::new(&conn);
1474
1475 let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1476 let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1477
1478 let stats1 = projector
1480 .project_batch(&[event1.clone(), event2.clone()])
1481 .unwrap();
1482 assert_eq!(stats1.projected, 2);
1483 assert_eq!(stats1.duplicates, 0);
1484
1485 let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1487 assert_eq!(stats2.projected, 0);
1488 assert_eq!(stats2.duplicates, 2);
1489 }
1490
1491 #[test]
1496 fn incremental_matches_full_replay() {
1497 let events = vec![
1498 make_create("bn-001", "Auth bug", "h1", 1000),
1499 make_event(
1500 EventType::Move,
1501 "bn-001",
1502 EventData::Move(MoveData {
1503 state: State::Doing,
1504 reason: None,
1505 extra: BTreeMap::new(),
1506 }),
1507 "h2",
1508 2000,
1509 ),
1510 make_event(
1511 EventType::Assign,
1512 "bn-001",
1513 EventData::Assign(AssignData {
1514 agent: "alice".into(),
1515 action: AssignAction::Assign,
1516 extra: BTreeMap::new(),
1517 }),
1518 "h3",
1519 3000,
1520 ),
1521 make_event(
1522 EventType::Comment,
1523 "bn-001",
1524 EventData::Comment(CommentData {
1525 body: "Working on it".into(),
1526 extra: BTreeMap::new(),
1527 }),
1528 "h4",
1529 4000,
1530 ),
1531 make_event(
1532 EventType::Update,
1533 "bn-001",
1534 EventData::Update(UpdateData {
1535 field: "title".into(),
1536 value: serde_json::json!("Auth bug (fixed)"),
1537 extra: BTreeMap::new(),
1538 }),
1539 "h5",
1540 5000,
1541 ),
1542 make_event(
1543 EventType::Move,
1544 "bn-001",
1545 EventData::Move(MoveData {
1546 state: State::Done,
1547 reason: Some("Shipped".into()),
1548 extra: BTreeMap::new(),
1549 }),
1550 "h6",
1551 6000,
1552 ),
1553 ];
1554
1555 let conn_full = test_db();
1557 let proj_full = Projector::new(&conn_full);
1558 proj_full.project_batch(&events).unwrap();
1559
1560 let conn_inc = test_db();
1562 let proj_inc = Projector::new(&conn_inc);
1563 for event in &events {
1564 proj_inc.project_event(event).unwrap();
1565 }
1566
1567 let item_full = query::get_item(&conn_full, "bn-001", false)
1569 .unwrap()
1570 .unwrap();
1571 let item_inc = query::get_item(&conn_inc, "bn-001", false)
1572 .unwrap()
1573 .unwrap();
1574
1575 assert_eq!(item_full.title, item_inc.title);
1576 assert_eq!(item_full.state, item_inc.state);
1577 assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1578
1579 let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1580 let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1581 assert_eq!(assignees_full.len(), assignees_inc.len());
1582
1583 let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1584 let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1585 assert_eq!(comments_full.len(), comments_inc.len());
1586 }
1587
1588 #[test]
1593 fn clear_and_replay_produces_same_result() {
1594 let conn = test_db();
1595 let projector = Projector::new(&conn);
1596
1597 let events = vec![
1598 make_create("bn-001", "Item 1", "h1", 1000),
1599 make_create("bn-002", "Item 2", "h2", 1001),
1600 make_event(
1601 EventType::Link,
1602 "bn-002",
1603 EventData::Link(LinkData {
1604 target: "bn-001".into(),
1605 link_type: "blocks".into(),
1606 extra: BTreeMap::new(),
1607 }),
1608 "h3",
1609 2000,
1610 ),
1611 ];
1612
1613 projector.project_batch(&events).unwrap();
1615 let count1: i64 = conn
1616 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1617 .unwrap();
1618 assert_eq!(count1, 2);
1619
1620 clear_projection(&conn).unwrap();
1622 let count_after_clear: i64 = conn
1623 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1624 .unwrap();
1625 assert_eq!(count_after_clear, 0);
1626
1627 projector.project_batch(&events).unwrap();
1628 let count2: i64 = conn
1629 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1630 .unwrap();
1631 assert_eq!(count2, 2);
1632
1633 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1634 assert_eq!(deps.len(), 1);
1635 }
1636
1637 #[test]
1642 fn events_on_missing_item_create_placeholder() {
1643 let conn = test_db();
1644 let projector = Projector::new(&conn);
1645
1646 let comment = make_event(
1648 EventType::Comment,
1649 "bn-ghost",
1650 EventData::Comment(CommentData {
1651 body: "Comment on missing item".into(),
1652 extra: BTreeMap::new(),
1653 }),
1654 "ccc",
1655 2000,
1656 );
1657 projector.project_event(&comment).unwrap();
1658
1659 let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1661 assert_eq!(item.title, ""); assert_eq!(item.state, "open");
1663 }
1664
1665 #[test]
1670 fn project_create_populates_fts() {
1671 let conn = test_db();
1672 let projector = Projector::new(&conn);
1673 projector
1674 .project_event(&make_create(
1675 "bn-001",
1676 "Authentication timeout",
1677 "aaa",
1678 1000,
1679 ))
1680 .unwrap();
1681
1682 let hits = query::search(&conn, "authentication", 10).unwrap();
1683 assert_eq!(hits.len(), 1);
1684 assert_eq!(hits[0].item_id, "bn-001");
1685 }
1686
1687 #[test]
1688 fn project_update_title_updates_fts() {
1689 let conn = test_db();
1690 let projector = Projector::new(&conn);
1691 projector
1692 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1693 .unwrap();
1694
1695 let update = make_event(
1696 EventType::Update,
1697 "bn-001",
1698 EventData::Update(UpdateData {
1699 field: "title".into(),
1700 value: serde_json::json!("Authorization failure"),
1701 extra: BTreeMap::new(),
1702 }),
1703 "bbb",
1704 2000,
1705 );
1706 projector.project_event(&update).unwrap();
1707
1708 let hits_old = query::search(&conn, "Old", 10).unwrap();
1710 assert!(hits_old.is_empty());
1711
1712 let hits_new = query::search(&conn, "authorization", 10).unwrap();
1714 assert_eq!(hits_new.len(), 1);
1715 }
1716
1717 #[test]
1722 fn batch_reports_correct_stats() {
1723 let conn = test_db();
1724 let projector = Projector::new(&conn);
1725
1726 let events = vec![
1727 make_create("bn-001", "Item 1", "h1", 1000),
1728 make_create("bn-002", "Item 2", "h2", 1001),
1729 make_create("bn-003", "Item 3", "h3", 1002),
1730 ];
1731
1732 let stats = projector.project_batch(&events).unwrap();
1733 assert_eq!(stats.projected, 3);
1734 assert_eq!(stats.duplicates, 0);
1735 assert_eq!(stats.errors, 0);
1736 }
1737
1738 #[test]
1743 fn full_lifecycle_all_event_types() {
1744 let conn = test_db();
1745 let projector = Projector::new(&conn);
1746
1747 let mut events = vec![
1748 make_create("bn-001", "Auth bug", "h01", 1000),
1750 make_create("bn-002", "Dep item", "h02", 1001),
1752 ];
1753
1754 events.push(make_event(
1756 EventType::Update,
1757 "bn-001",
1758 EventData::Update(UpdateData {
1759 field: "title".into(),
1760 value: serde_json::json!("Auth timeout bug"),
1761 extra: BTreeMap::new(),
1762 }),
1763 "h03",
1764 2000,
1765 ));
1766
1767 events.push(make_event(
1769 EventType::Move,
1770 "bn-001",
1771 EventData::Move(MoveData {
1772 state: State::Doing,
1773 reason: None,
1774 extra: BTreeMap::new(),
1775 }),
1776 "h04",
1777 3000,
1778 ));
1779
1780 events.push(make_event(
1782 EventType::Assign,
1783 "bn-001",
1784 EventData::Assign(AssignData {
1785 agent: "alice".into(),
1786 action: AssignAction::Assign,
1787 extra: BTreeMap::new(),
1788 }),
1789 "h05",
1790 4000,
1791 ));
1792
1793 events.push(make_event(
1795 EventType::Comment,
1796 "bn-001",
1797 EventData::Comment(CommentData {
1798 body: "Found root cause".into(),
1799 extra: BTreeMap::new(),
1800 }),
1801 "h06",
1802 5000,
1803 ));
1804
1805 events.push(make_event(
1807 EventType::Link,
1808 "bn-001",
1809 EventData::Link(LinkData {
1810 target: "bn-002".into(),
1811 link_type: "blocks".into(),
1812 extra: BTreeMap::new(),
1813 }),
1814 "h07",
1815 6000,
1816 ));
1817
1818 events.push(make_event(
1820 EventType::Unlink,
1821 "bn-001",
1822 EventData::Unlink(UnlinkData {
1823 target: "bn-002".into(),
1824 link_type: Some("blocks".into()),
1825 extra: BTreeMap::new(),
1826 }),
1827 "h08",
1828 7000,
1829 ));
1830
1831 events.push(make_event(
1833 EventType::Compact,
1834 "bn-001",
1835 EventData::Compact(CompactData {
1836 summary: "Auth token refresh race".into(),
1837 extra: BTreeMap::new(),
1838 }),
1839 "h09",
1840 8000,
1841 ));
1842
1843 events.push(make_event(
1845 EventType::Snapshot,
1846 "bn-001",
1847 EventData::Snapshot(SnapshotData {
1848 state: serde_json::json!({"id": "bn-001", "resolved": true}),
1849 extra: BTreeMap::new(),
1850 }),
1851 "h10",
1852 9000,
1853 ));
1854
1855 events.push(make_event(
1857 EventType::Redact,
1858 "bn-001",
1859 EventData::Redact(RedactData {
1860 target_hash: "blake3:h06".into(),
1861 reason: "Contained secret".into(),
1862 extra: BTreeMap::new(),
1863 }),
1864 "h11",
1865 10000,
1866 ));
1867
1868 events.push(make_event(
1870 EventType::Delete,
1871 "bn-001",
1872 EventData::Delete(DeleteData {
1873 reason: Some("Duplicate".into()),
1874 extra: BTreeMap::new(),
1875 }),
1876 "h12",
1877 11000,
1878 ));
1879
1880 let stats = projector.project_batch(&events).unwrap();
1881 assert_eq!(stats.projected, 12); assert_eq!(stats.duplicates, 0);
1883 assert_eq!(stats.errors, 0);
1884
1885 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1887 assert_eq!(item.title, "Auth timeout bug");
1888 assert_eq!(item.state, "doing");
1889 assert!(item.is_deleted);
1890 assert_eq!(
1891 item.compact_summary.as_deref(),
1892 Some("Auth token refresh race")
1893 );
1894 let snapshot: Option<String> = conn
1895 .query_row(
1896 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1897 [],
1898 |row| row.get(0),
1899 )
1900 .unwrap();
1901 assert!(snapshot.is_some());
1902
1903 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1905 assert_eq!(comments.len(), 1);
1906 assert_eq!(comments[0].body, "[redacted]");
1907
1908 let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1910 assert!(deps.is_empty());
1911
1912 let redaction_count: i64 = conn
1914 .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1915 row.get(0)
1916 })
1917 .unwrap();
1918 assert_eq!(redaction_count, 1);
1919 }
1920
1921 #[test]
1922 fn late_create_populates_placeholder() {
1923 let conn = test_db();
1924 let projector = Projector::new(&conn);
1925
1926 let comment = make_event(
1928 EventType::Comment,
1929 "bn-late",
1930 EventData::Comment(CommentData {
1931 body: "Comment first".into(),
1932 extra: BTreeMap::new(),
1933 }),
1934 "h1",
1935 1000,
1936 );
1937 projector.project_event(&comment).unwrap();
1938
1939 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1940 assert_eq!(item.title, "");
1941 assert_eq!(item.created_at_us, 1000);
1942
1943 let create = make_create("bn-late", "Real Title", "h2", 900);
1945
1946 projector.project_event(&create).unwrap();
1947
1948 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1949 assert_eq!(item.title, "Real Title");
1950 assert_eq!(item.created_at_us, 900);
1951 }
1952
1953 #[test]
1957 fn projector_new_creates_tracking_table_on_fresh_db() {
1958 let mut conn = Connection::open_in_memory().expect("open in-memory db");
1959 migrations::migrate(&mut conn).expect("migrate");
1960 let projector = Projector::new(&conn);
1964 let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
1965 projector
1966 .project_event(&event)
1967 .expect("project_event should succeed on fresh DB");
1968
1969 let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
1970 assert_eq!(item.title, "Fresh item");
1971 }
1972}