1use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36 pub projected: usize,
38 pub duplicates: usize,
40 pub errors: usize,
42}
43
44pub struct Projector<'conn> {
53 conn: &'conn Connection,
54 has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58 pub fn new(conn: &'conn Connection) -> Self {
63 let _ = ensure_tracking_table(conn);
66 let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67 Self {
68 conn,
69 has_agent_column,
70 }
71 }
72
73 pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84 let mut stats = ProjectionStats::default();
85
86 self.conn
88 .execute_batch("BEGIN IMMEDIATE")
89 .context("begin projection transaction")?;
90
91 for event in events {
92 match self.project_event_inner(event) {
93 Ok(ProjectResult::Projected) => stats.projected += 1,
94 Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95 Err(e) => {
96 tracing::warn!(
97 event_hash = %event.event_hash,
98 event_type = %event.event_type,
99 item_id = %event.item_id,
100 error = %e,
101 "skipping event due to projection error"
102 );
103 stats.errors += 1;
104 }
105 }
106 }
107
108 self.conn
109 .execute_batch("COMMIT")
110 .context("commit projection transaction")?;
111
112 if stats.errors > 0 {
113 let reason = format!(
114 "project_batch encountered {} projection errors while applying {} events",
115 stats.errors,
116 events.len()
117 );
118 if let Err(err) = crate::db::mark_projection_dirty_from_connection(self.conn, &reason) {
119 tracing::warn!(error = %err, "failed to mark projection dirty after batch errors");
120 }
121 }
122
123 Ok(stats)
124 }
125
126 pub fn project_event(&self, event: &Event) -> Result<bool> {
135 let projected = match self.project_event_inner(event) {
136 Ok(ProjectResult::Projected) => true,
137 Ok(ProjectResult::Duplicate) => false,
138 Err(err) => {
139 let reason = format!(
140 "project_event failed hash={} type={}",
141 event.event_hash, event.event_type
142 );
143 if let Err(mark_err) =
144 crate::db::mark_projection_dirty_from_connection(self.conn, &reason)
145 {
146 tracing::warn!(
147 error = %mark_err,
148 event_hash = %event.event_hash,
149 "failed to mark projection dirty after single-event projection failure"
150 );
151 }
152 return Err(err);
153 }
154 };
155
156 if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
157 tracing::warn!(
158 event_hash = %event.event_hash,
159 error = %err,
160 "failed to update projection cursor after single-event projection"
161 );
162 let reason = format!(
163 "cursor update failed after projecting hash={} error={err}",
164 event.event_hash
165 );
166 let _ = crate::db::mark_projection_dirty_from_connection(self.conn, &reason);
167 }
168
169 Ok(projected)
170 }
171
172 fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
177 if self.is_event_projected(&event.event_hash)? {
179 return Ok(ProjectResult::Duplicate);
180 }
181
182 match event.event_type {
183 EventType::Create => self.project_create(event)?,
184 EventType::Update => self.project_update(event)?,
185 EventType::Move => self.project_move(event)?,
186 EventType::Assign => self.project_assign(event)?,
187 EventType::Comment => self.project_comment(event)?,
188 EventType::Link => self.project_link(event)?,
189 EventType::Unlink => self.project_unlink(event)?,
190 EventType::Delete => self.project_delete(event)?,
191 EventType::Compact => self.project_compact(event)?,
192 EventType::Snapshot => self.project_snapshot(event)?,
193 EventType::Redact => self.project_redact(event)?,
194 }
195
196 self.record_projected_hash(&event.event_hash, event)?;
198
199 Ok(ProjectResult::Projected)
200 }
201
202 fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
207 let exists: bool = self
214 .conn
215 .query_row(
216 "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
217 params![event_hash],
218 |row| row.get(0),
219 )
220 .unwrap_or(false);
221 Ok(exists)
222 }
223
224 fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
225 let exists: bool = self
226 .conn
227 .query_row(
228 "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
229 params![event_hash],
230 |row| row.get(0),
231 )
232 .unwrap_or(false);
233 Ok(exists)
234 }
235
236 fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
237 if self.has_agent_column {
238 self.conn
239 .execute(
240 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
241 VALUES (?1, ?2, ?3, ?4, ?5)",
242 params![
243 event_hash,
244 event.item_id.as_str(),
245 event.event_type.as_str(),
246 event.wall_ts_us,
247 event.agent.as_str(),
248 ],
249 )
250 .context("record projected event hash")?;
251 return Ok(());
252 }
253
254 self.conn
255 .execute(
256 "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
257 VALUES (?1, ?2, ?3, ?4)",
258 params![
259 event_hash,
260 event.item_id.as_str(),
261 event.event_type.as_str(),
262 event.wall_ts_us,
263 ],
264 )
265 .context("record projected event hash")?;
266 Ok(())
267 }
268
269 fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
270 let main_db_file: String = self
271 .conn
272 .query_row(
273 "SELECT file FROM pragma_database_list WHERE name = 'main'",
274 [],
275 |row| row.get(0),
276 )
277 .context("read main database file from pragma_database_list")?;
278
279 if main_db_file.trim().is_empty() {
280 return Ok(());
282 }
283
284 let db_path = std::path::Path::new(&main_db_file);
285 let Some(bones_dir) = db_path.parent() else {
286 return Ok(());
287 };
288
289 let shard_mgr = ShardManager::new(bones_dir);
290 let total_len = shard_mgr
291 .total_content_len()
292 .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
293 let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
294
295 query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
296 .context("write projection cursor after single-event projection")?;
297
298 Ok(())
299 }
300
301 fn project_create(&self, event: &Event) -> Result<()> {
306 let EventData::Create(ref data) = event.data else {
307 anyhow::bail!("expected Create data for item.create event");
308 };
309
310 let is_redacted = self.is_event_redacted(&event.event_hash)?;
311 let title = if is_redacted { "" } else { &data.title };
312 let description = if is_redacted {
313 None
314 } else {
315 data.description.as_deref()
316 };
317 let labels_str = if is_redacted {
318 String::new()
319 } else {
320 data.labels.join(" ")
321 };
322
323 self.conn
324 .execute(
325 "INSERT INTO items (
326 item_id, title, description, kind, state, urgency,
327 size, parent_id, is_deleted, search_labels,
328 created_at_us, updated_at_us
329 ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
330 ON CONFLICT(item_id) DO UPDATE SET
331 title = CASE
332 WHEN items.title = '' THEN excluded.title
333 ELSE items.title
334 END,
335 description = COALESCE(items.description, excluded.description),
336 kind = CASE
337 WHEN items.kind = 'task' THEN excluded.kind
338 ELSE items.kind
339 END,
340 urgency = CASE
341 WHEN items.urgency = 'default' THEN excluded.urgency
342 ELSE items.urgency
343 END,
344 size = COALESCE(items.size, excluded.size),
345 parent_id = COALESCE(items.parent_id, excluded.parent_id),
346 created_at_us = MIN(items.created_at_us, excluded.created_at_us),
347 search_labels = CASE
348 WHEN items.search_labels = '' THEN excluded.search_labels
349 ELSE items.search_labels
350 END,
351 updated_at_us = MAX(items.updated_at_us, excluded.updated_at_us)
352 WHERE NOT EXISTS (
353 SELECT 1 FROM projected_events
354 WHERE item_id = excluded.item_id AND event_type = 'item.create'
355 )",
356 params![
357 event.item_id.as_str(),
358 title,
359 description,
360 data.kind.to_string(),
361 data.urgency.to_string(),
362 data.size.map(|s| s.to_string()),
363 data.parent.as_deref(),
364 labels_str,
365 event.wall_ts_us,
366 event.wall_ts_us,
367 ],
368 )
369 .with_context(|| format!("project create for {}", event.item_id))?;
370
371 if !is_redacted {
373 for label in &data.labels {
374 self.conn
375 .execute(
376 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
377 VALUES (?1, ?2, ?3)",
378 params![event.item_id.as_str(), label, event.wall_ts_us],
379 )
380 .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
381 }
382 self.refresh_search_labels(event.item_id.as_str(), event.wall_ts_us)?;
383 }
384
385 Ok(())
386 }
387
388 #[allow(clippy::too_many_lines)]
389 fn project_update(&self, event: &Event) -> Result<()> {
390 let EventData::Update(ref data) = event.data else {
391 anyhow::bail!("expected Update data for item.update event");
392 };
393
394 self.ensure_item_exists(event)?;
395
396 let is_redacted = self.is_event_redacted(&event.event_hash)?;
397
398 match data.field.as_str() {
399 "title" => {
400 let value = if is_redacted {
401 "[redacted]"
402 } else {
403 data.value.as_str().unwrap_or_default()
404 };
405 self.conn.execute(
406 "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
407 params![value, event.wall_ts_us, event.item_id.as_str()],
408 )?;
409 }
410 "description" => {
411 let value = if is_redacted {
412 Some("[redacted]".to_string())
413 } else {
414 data.value.as_str().map(String::from)
415 };
416 self.conn.execute(
417 "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
418 params![value, event.wall_ts_us, event.item_id.as_str()],
419 )?;
420 }
421 "kind" => {
422 let value = data.value.as_str().unwrap_or("task");
423 self.conn.execute(
424 "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
425 params![value, event.wall_ts_us, event.item_id.as_str()],
426 )?;
427 }
428 "size" => {
429 let value = data.value.as_str().map(String::from);
430 self.conn.execute(
431 "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
432 params![value, event.wall_ts_us, event.item_id.as_str()],
433 )?;
434 }
435 "urgency" => {
436 let value = data.value.as_str().unwrap_or("default");
437 self.conn.execute(
438 "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
439 params![value, event.wall_ts_us, event.item_id.as_str()],
440 )?;
441 }
442 "parent" => {
443 let value = data.value.as_str().map(String::from);
444 self.conn.execute(
445 "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
446 params![value, event.wall_ts_us, event.item_id.as_str()],
447 )?;
448 }
449 "labels" => {
450 let mut changed = false;
453
454 if let Some(labels) = data.value.as_array() {
455 self.conn.execute(
457 "DELETE FROM item_labels WHERE item_id = ?1",
458 params![event.item_id.as_str()],
459 )?;
460
461 for label_val in labels {
462 if let Some(label) = label_val.as_str() {
463 self.conn.execute(
464 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
465 VALUES (?1, ?2, ?3)",
466 params![event.item_id.as_str(), label, event.wall_ts_us],
467 )?;
468 }
469 }
470 changed = true;
471 } else if let Some(obj) = data.value.as_object() {
472 let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
474 let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
475
476 if !label.is_empty() {
477 match action {
478 "add" => {
479 self.conn.execute(
480 "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
481 VALUES (?1, ?2, ?3)",
482 params![event.item_id.as_str(), label, event.wall_ts_us],
483 )?;
484 changed = true;
485 }
486 "remove" => {
487 self.conn.execute(
488 "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
489 params![event.item_id.as_str(), label],
490 )?;
491 changed = true;
492 }
493 _ => {}
494 }
495 }
496 }
497
498 if changed {
499 self.refresh_search_labels(event.item_id.as_str(), event.wall_ts_us)?;
500 }
501 }
502 _ => {
503 tracing::debug!(
505 field = %data.field,
506 item_id = %event.item_id,
507 "ignoring update for unknown field"
508 );
509 self.conn.execute(
510 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
511 params![event.wall_ts_us, event.item_id.as_str()],
512 )?;
513 }
514 }
515
516 Ok(())
517 }
518
519 fn project_move(&self, event: &Event) -> Result<()> {
520 let EventData::Move(ref data) = event.data else {
521 anyhow::bail!("expected Move data for item.move event");
522 };
523
524 self.ensure_item_exists(event)?;
525
526 self.conn
527 .execute(
528 "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
529 params![
530 data.state.to_string(),
531 event.wall_ts_us,
532 event.item_id.as_str(),
533 ],
534 )
535 .with_context(|| format!("project move for {}", event.item_id))?;
536
537 Ok(())
538 }
539
540 fn project_assign(&self, event: &Event) -> Result<()> {
541 let EventData::Assign(ref data) = event.data else {
542 anyhow::bail!("expected Assign data for item.assign event");
543 };
544
545 self.ensure_item_exists(event)?;
546
547 match data.action {
548 AssignAction::Assign => {
549 self.conn
550 .execute(
551 "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
552 VALUES (?1, ?2, ?3)",
553 params![event.item_id.as_str(), data.agent, event.wall_ts_us],
554 )
555 .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
556 }
557 AssignAction::Unassign => {
558 self.conn
559 .execute(
560 "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
561 params![event.item_id.as_str(), data.agent],
562 )
563 .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
564 }
565 }
566
567 self.conn.execute(
569 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
570 params![event.wall_ts_us, event.item_id.as_str()],
571 )?;
572
573 Ok(())
574 }
575
576 fn project_comment(&self, event: &Event) -> Result<()> {
577 let EventData::Comment(ref data) = event.data else {
578 anyhow::bail!("expected Comment data for item.comment event");
579 };
580
581 self.ensure_item_exists(event)?;
582
583 let is_redacted = self.is_event_redacted(&event.event_hash)?;
584 let body = if is_redacted {
585 "[redacted]"
586 } else {
587 &data.body
588 };
589
590 self.conn
591 .execute(
592 "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
593 VALUES (?1, ?2, ?3, ?4, ?5)",
594 params![
595 event.item_id.as_str(),
596 event.event_hash,
597 event.agent,
598 body,
599 event.wall_ts_us,
600 ],
601 )
602 .with_context(|| format!("project comment for {}", event.item_id))?;
603
604 self.conn.execute(
606 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
607 params![event.wall_ts_us, event.item_id.as_str()],
608 )?;
609
610 Ok(())
611 }
612
613 fn project_link(&self, event: &Event) -> Result<()> {
614 let EventData::Link(ref data) = event.data else {
615 anyhow::bail!("expected Link data for item.link event");
616 };
617
618 self.ensure_item_exists(event)?;
619
620 self.conn
621 .execute(
622 "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
623 VALUES (?1, ?2, ?3, ?4)",
624 params![
625 event.item_id.as_str(),
626 data.target,
627 data.link_type,
628 event.wall_ts_us,
629 ],
630 )
631 .with_context(|| {
632 format!("project link {} -> {}", event.item_id, data.target)
633 })?;
634
635 self.conn.execute(
637 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
638 params![event.wall_ts_us, event.item_id.as_str()],
639 )?;
640
641 Ok(())
642 }
643
644 fn project_unlink(&self, event: &Event) -> Result<()> {
645 let EventData::Unlink(ref data) = event.data else {
646 anyhow::bail!("expected Unlink data for item.unlink event");
647 };
648
649 self.ensure_item_exists(event)?;
650
651 if let Some(ref link_type) = data.link_type {
652 self.conn
653 .execute(
654 "DELETE FROM item_dependencies \
655 WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
656 params![event.item_id.as_str(), data.target, link_type],
657 )
658 .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
659 } else {
660 self.conn
662 .execute(
663 "DELETE FROM item_dependencies \
664 WHERE item_id = ?1 AND depends_on_item_id = ?2",
665 params![event.item_id.as_str(), data.target],
666 )
667 .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
668 }
669
670 self.conn.execute(
672 "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
673 params![event.wall_ts_us, event.item_id.as_str()],
674 )?;
675
676 Ok(())
677 }
678
679 fn project_delete(&self, event: &Event) -> Result<()> {
680 self.ensure_item_exists(event)?;
681
682 self.conn
683 .execute(
684 "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
685 WHERE item_id = ?2",
686 params![event.wall_ts_us, event.item_id.as_str()],
687 )
688 .with_context(|| format!("project delete for {}", event.item_id))?;
689
690 Ok(())
691 }
692
693 fn project_compact(&self, event: &Event) -> Result<()> {
694 let EventData::Compact(ref data) = event.data else {
695 anyhow::bail!("expected Compact data for item.compact event");
696 };
697
698 self.ensure_item_exists(event)?;
699
700 let is_redacted = self.is_event_redacted(&event.event_hash)?;
701 let summary = if is_redacted {
702 "[redacted]"
703 } else {
704 data.summary.as_str()
705 };
706
707 self.conn
708 .execute(
709 "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
710 params![summary, event.wall_ts_us, event.item_id.as_str()],
711 )
712 .with_context(|| format!("project compact for {}", event.item_id))?;
713
714 Ok(())
715 }
716
717 fn project_snapshot(&self, event: &Event) -> Result<()> {
718 let EventData::Snapshot(ref data) = event.data else {
719 anyhow::bail!("expected Snapshot data for item.snapshot event");
720 };
721
722 self.ensure_item_exists(event)?;
723
724 let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
725
726 self.conn
727 .execute(
728 "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
729 params![json_str, event.wall_ts_us, event.item_id.as_str()],
730 )
731 .with_context(|| format!("project snapshot for {}", event.item_id))?;
732
733 Ok(())
734 }
735
736 fn project_redact(&self, event: &Event) -> Result<()> {
737 let EventData::Redact(ref data) = event.data else {
738 anyhow::bail!("expected Redact data for item.redact event");
739 };
740
741 self.conn
743 .execute(
744 "INSERT OR IGNORE INTO event_redactions \
745 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
746 VALUES (?1, ?2, ?3, ?4, ?5)",
747 params![
748 data.target_hash,
749 event.item_id.as_str(),
750 data.reason,
751 event.agent,
752 event.wall_ts_us,
753 ],
754 )
755 .with_context(|| {
756 format!(
757 "project redact for {} targeting {}",
758 event.item_id, data.target_hash
759 )
760 })?;
761
762 self.conn
764 .execute(
765 "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
766 params![data.target_hash],
767 )
768 .context("redact comment body")?;
769
770 Ok(())
771 }
772
773 fn ensure_item_exists(&self, event: &Event) -> Result<()> {
782 let exists: bool = self
783 .conn
784 .query_row(
785 "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
786 params![event.item_id.as_str()],
787 |row| row.get(0),
788 )
789 .context("check item exists")?;
790
791 if !exists {
792 self.conn
793 .execute(
794 "INSERT INTO items (
795 item_id, title, kind, state, urgency,
796 is_deleted, search_labels, created_at_us, updated_at_us
797 ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
798 params![event.item_id.as_str(), event.wall_ts_us],
799 )
800 .with_context(|| format!("create placeholder item for {}", event.item_id))?;
801 }
802
803 Ok(())
804 }
805
806 fn refresh_search_labels(&self, item_id: &str, updated_at_us: i64) -> Result<()> {
807 let mut stmt = self
808 .conn
809 .prepare("SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label")?;
810 let label_rows = stmt.query_map(params![item_id], |row| row.get::<_, String>(0))?;
811
812 let mut label_strings = Vec::new();
813 for label_res in label_rows {
814 label_strings.push(label_res?);
815 }
816
817 let search_labels = label_strings.join(" ");
818 self.conn.execute(
819 "UPDATE items
820 SET search_labels = ?1,
821 updated_at_us = MAX(updated_at_us, ?2)
822 WHERE item_id = ?3",
823 params![search_labels, updated_at_us, item_id],
824 )?;
825 Ok(())
826 }
827}
828
829enum ProjectResult {
830 Projected,
831 Duplicate,
832}
833
834pub const PROJECTED_EVENTS_DDL: &str = "\
843CREATE TABLE IF NOT EXISTS projected_events (
844 event_hash TEXT PRIMARY KEY,
845 item_id TEXT NOT NULL,
846 event_type TEXT NOT NULL,
847 projected_at_us INTEGER NOT NULL,
848 agent TEXT NOT NULL DEFAULT ''
849);
850CREATE INDEX IF NOT EXISTS idx_projected_events_item
851 ON projected_events(item_id);
852CREATE INDEX IF NOT EXISTS idx_projected_events_agent
853 ON projected_events(agent);
854";
855
856pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
865 conn.execute_batch(PROJECTED_EVENTS_DDL)
866 .context("create projected_events tracking table")?;
867
868 if !projected_events_has_agent_column(conn)? {
869 conn.execute(
870 "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
871 [],
872 )
873 .context("add agent column to projected_events")?;
874 }
875
876 conn.execute_batch(
877 "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
878 )
879 .context("create projected_events_agent index")?;
880
881 Ok(())
882}
883
884fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
885 let mut stmt = conn
886 .prepare("PRAGMA table_info(projected_events)")
887 .context("inspect projected_events schema")?;
888 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
889
890 for row in rows {
891 let name = row.context("read projected_events column")?;
892 if name == "agent" {
893 return Ok(true);
894 }
895 }
896
897 Ok(false)
898}
899
900pub fn clear_projection(conn: &Connection) -> Result<()> {
909 conn.execute_batch(
910 "DELETE FROM event_redactions;
911 DELETE FROM item_comments;
912 DELETE FROM item_dependencies;
913 DELETE FROM item_assignees;
914 DELETE FROM item_labels;
915 DELETE FROM items;
916 DELETE FROM projected_events;
917 UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
918 )
919 .context("clear projection tables")?;
920 Ok(())
921}
922
923#[cfg(test)]
928mod tests {
929 use super::*;
930 use crate::db::{migrations, query};
931 use crate::event::data::*;
932 use crate::event::types::EventType;
933 use crate::event::writer;
934 use crate::model::item::{Kind, Size, State, Urgency};
935 use crate::model::item_id::ItemId;
936 use crate::shard::ShardManager;
937 use rusqlite::Connection;
938 use std::collections::BTreeMap;
939 use tempfile::TempDir;
940
941 fn test_db() -> Connection {
942 let mut conn = Connection::open_in_memory().expect("open in-memory db");
943 migrations::migrate(&mut conn).expect("migrate");
944 ensure_tracking_table(&conn).expect("create tracking table");
945 conn
946 }
947
948 fn make_event(
949 event_type: EventType,
950 item_id: &str,
951 data: EventData,
952 hash: &str,
953 ts: i64,
954 ) -> Event {
955 Event {
956 wall_ts_us: ts,
957 agent: "test-agent".into(),
958 itc: "itc:AQ".into(),
959 parents: vec![],
960 event_type,
961 item_id: ItemId::new_unchecked(item_id),
962 data,
963 event_hash: format!("blake3:{hash}"),
964 }
965 }
966
967 fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
968 make_event(
969 EventType::Create,
970 id,
971 EventData::Create(CreateData {
972 title: title.into(),
973 kind: Kind::Task,
974 size: Some(Size::M),
975 urgency: Urgency::Default,
976 labels: vec!["backend".into(), "auth".into()],
977 parent: None,
978 causation: None,
979 description: Some("A detailed description".into()),
980 extra: BTreeMap::new(),
981 }),
982 hash,
983 ts,
984 )
985 }
986
987 #[test]
988 fn project_event_updates_projection_cursor_for_file_backed_db() {
989 let dir = TempDir::new().expect("tempdir");
990 let bones_dir = dir.path().join(".bones");
991 std::fs::create_dir_all(&bones_dir).expect("create .bones");
992
993 let shard_mgr = ShardManager::new(&bones_dir);
994 shard_mgr.init().expect("init shard manager");
995
996 let db_path = bones_dir.join("bones.db");
997 let mut conn = Connection::open(&db_path).expect("open projection db");
998 migrations::migrate(&mut conn).expect("migrate");
999 ensure_tracking_table(&conn).expect("create tracking table");
1000
1001 let projector = Projector::new(&conn);
1002 let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
1003 let line = writer::write_event(&mut event).expect("serialize event");
1004 shard_mgr
1005 .append(&line, false, std::time::Duration::from_secs(5))
1006 .expect("append event line");
1007
1008 projector.project_event(&event).expect("project event");
1009
1010 let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
1011 let expected_offset =
1012 i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
1013
1014 assert_eq!(offset, expected_offset);
1015 assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
1016 }
1017
1018 #[test]
1023 fn project_create_inserts_item() {
1024 let conn = test_db();
1025 let projector = Projector::new(&conn);
1026 let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
1027
1028 let result = projector.project_event(&event).unwrap();
1029 assert!(result, "should return true for new projection");
1030
1031 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1032 assert_eq!(item.title, "Fix auth timeout");
1033 assert_eq!(item.kind, "task");
1034 assert_eq!(item.state, "open");
1035 assert_eq!(item.urgency, "default");
1036 assert_eq!(item.size.as_deref(), Some("m"));
1037 assert_eq!(item.description.as_deref(), Some("A detailed description"));
1038 assert_eq!(item.created_at_us, 1000);
1039 assert_eq!(item.updated_at_us, 1000);
1040 }
1041
1042 #[test]
1043 fn project_create_inserts_labels() {
1044 let conn = test_db();
1045 let projector = Projector::new(&conn);
1046 let event = make_create("bn-001", "Fix auth", "aaa", 1000);
1047 projector.project_event(&event).unwrap();
1048
1049 let labels = query::get_labels(&conn, "bn-001").unwrap();
1050 assert_eq!(labels.len(), 2);
1051 assert_eq!(labels[0].label, "auth");
1052 assert_eq!(labels[1].label, "backend");
1053 }
1054
1055 #[test]
1060 fn project_update_title() {
1061 let conn = test_db();
1062 let projector = Projector::new(&conn);
1063 projector
1064 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1065 .unwrap();
1066
1067 let update = make_event(
1068 EventType::Update,
1069 "bn-001",
1070 EventData::Update(UpdateData {
1071 field: "title".into(),
1072 value: serde_json::json!("New title"),
1073 extra: BTreeMap::new(),
1074 }),
1075 "bbb",
1076 2000,
1077 );
1078 projector.project_event(&update).unwrap();
1079
1080 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1081 assert_eq!(item.title, "New title");
1082 assert_eq!(item.updated_at_us, 2000);
1083 }
1084
1085 #[test]
1086 fn project_update_description() {
1087 let conn = test_db();
1088 let projector = Projector::new(&conn);
1089 projector
1090 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1091 .unwrap();
1092
1093 let update = make_event(
1094 EventType::Update,
1095 "bn-001",
1096 EventData::Update(UpdateData {
1097 field: "description".into(),
1098 value: serde_json::json!("Updated description"),
1099 extra: BTreeMap::new(),
1100 }),
1101 "bbb",
1102 2000,
1103 );
1104 projector.project_event(&update).unwrap();
1105
1106 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1107 assert_eq!(item.description.as_deref(), Some("Updated description"));
1108 }
1109
1110 #[test]
1111 fn project_update_labels() {
1112 let conn = test_db();
1113 let projector = Projector::new(&conn);
1114 projector
1115 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1116 .unwrap();
1117
1118 let update = make_event(
1119 EventType::Update,
1120 "bn-001",
1121 EventData::Update(UpdateData {
1122 field: "labels".into(),
1123 value: serde_json::json!(["frontend", "urgent"]),
1124 extra: BTreeMap::new(),
1125 }),
1126 "bbb",
1127 2000,
1128 );
1129 projector.project_event(&update).unwrap();
1130
1131 let labels = query::get_labels(&conn, "bn-001").unwrap();
1132 assert_eq!(labels.len(), 2);
1133 assert_eq!(labels[0].label, "frontend");
1134 assert_eq!(labels[1].label, "urgent");
1135 }
1136
1137 #[test]
1138 fn project_update_unknown_field_bumps_updated() {
1139 let conn = test_db();
1140 let projector = Projector::new(&conn);
1141 projector
1142 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1143 .unwrap();
1144
1145 let update = make_event(
1146 EventType::Update,
1147 "bn-001",
1148 EventData::Update(UpdateData {
1149 field: "future_field".into(),
1150 value: serde_json::json!("whatever"),
1151 extra: BTreeMap::new(),
1152 }),
1153 "bbb",
1154 2000,
1155 );
1156 projector.project_event(&update).unwrap();
1157
1158 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1159 assert_eq!(item.updated_at_us, 2000);
1160 }
1161
1162 #[test]
1167 fn project_move_updates_state() {
1168 let conn = test_db();
1169 let projector = Projector::new(&conn);
1170 projector
1171 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1172 .unwrap();
1173
1174 let mv = make_event(
1175 EventType::Move,
1176 "bn-001",
1177 EventData::Move(MoveData {
1178 state: State::Doing,
1179 reason: Some("Starting work".into()),
1180 extra: BTreeMap::new(),
1181 }),
1182 "bbb",
1183 2000,
1184 );
1185 projector.project_event(&mv).unwrap();
1186
1187 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1188 assert_eq!(item.state, "doing");
1189 }
1190
1191 #[test]
1196 fn project_assign_and_unassign() {
1197 let conn = test_db();
1198 let projector = Projector::new(&conn);
1199 projector
1200 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1201 .unwrap();
1202
1203 let assign = make_event(
1205 EventType::Assign,
1206 "bn-001",
1207 EventData::Assign(AssignData {
1208 agent: "alice".into(),
1209 action: AssignAction::Assign,
1210 extra: BTreeMap::new(),
1211 }),
1212 "bbb",
1213 2000,
1214 );
1215 projector.project_event(&assign).unwrap();
1216
1217 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1218 assert_eq!(assignees.len(), 1);
1219 assert_eq!(assignees[0].agent, "alice");
1220
1221 let unassign = make_event(
1223 EventType::Assign,
1224 "bn-001",
1225 EventData::Assign(AssignData {
1226 agent: "alice".into(),
1227 action: AssignAction::Unassign,
1228 extra: BTreeMap::new(),
1229 }),
1230 "ccc",
1231 3000,
1232 );
1233 projector.project_event(&unassign).unwrap();
1234
1235 let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1236 assert!(assignees.is_empty());
1237 }
1238
1239 #[test]
1244 fn project_comment_inserts_row() {
1245 let conn = test_db();
1246 let projector = Projector::new(&conn);
1247 projector
1248 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1249 .unwrap();
1250
1251 let comment = make_event(
1252 EventType::Comment,
1253 "bn-001",
1254 EventData::Comment(CommentData {
1255 body: "This is a comment".into(),
1256 extra: BTreeMap::new(),
1257 }),
1258 "bbb",
1259 2000,
1260 );
1261 projector.project_event(&comment).unwrap();
1262
1263 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1264 assert_eq!(comments.len(), 1);
1265 assert_eq!(comments[0].body, "This is a comment");
1266 assert_eq!(comments[0].author, "test-agent");
1267 assert_eq!(comments[0].event_hash, "blake3:bbb");
1268 }
1269
1270 #[test]
1275 fn project_link_and_unlink() {
1276 let conn = test_db();
1277 let projector = Projector::new(&conn);
1278 projector
1279 .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1280 .unwrap();
1281 projector
1282 .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1283 .unwrap();
1284
1285 let link = make_event(
1287 EventType::Link,
1288 "bn-002",
1289 EventData::Link(LinkData {
1290 target: "bn-001".into(),
1291 link_type: "blocks".into(),
1292 extra: BTreeMap::new(),
1293 }),
1294 "ccc",
1295 2000,
1296 );
1297 projector.project_event(&link).unwrap();
1298
1299 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1300 assert_eq!(deps.len(), 1);
1301 assert_eq!(deps[0].depends_on_item_id, "bn-001");
1302
1303 let unlink = make_event(
1305 EventType::Unlink,
1306 "bn-002",
1307 EventData::Unlink(UnlinkData {
1308 target: "bn-001".into(),
1309 link_type: Some("blocks".into()),
1310 extra: BTreeMap::new(),
1311 }),
1312 "ddd",
1313 3000,
1314 );
1315 projector.project_event(&unlink).unwrap();
1316
1317 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1318 assert!(deps.is_empty());
1319 }
1320
1321 #[test]
1326 fn project_delete_soft_deletes() {
1327 let conn = test_db();
1328 let projector = Projector::new(&conn);
1329 projector
1330 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1331 .unwrap();
1332
1333 let delete = make_event(
1334 EventType::Delete,
1335 "bn-001",
1336 EventData::Delete(DeleteData {
1337 reason: Some("Duplicate".into()),
1338 extra: BTreeMap::new(),
1339 }),
1340 "bbb",
1341 2000,
1342 );
1343 projector.project_event(&delete).unwrap();
1344
1345 assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1347 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1349 assert!(item.is_deleted);
1350 assert_eq!(item.deleted_at_us, Some(2000));
1351 }
1352
1353 #[test]
1358 fn project_compact_sets_summary() {
1359 let conn = test_db();
1360 let projector = Projector::new(&conn);
1361 projector
1362 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1363 .unwrap();
1364
1365 let compact = make_event(
1366 EventType::Compact,
1367 "bn-001",
1368 EventData::Compact(CompactData {
1369 summary: "TL;DR: auth fix".into(),
1370 extra: BTreeMap::new(),
1371 }),
1372 "bbb",
1373 2000,
1374 );
1375 projector.project_event(&compact).unwrap();
1376
1377 let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1378 assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1379 }
1380
1381 #[test]
1386 fn project_snapshot_stores_json() {
1387 let conn = test_db();
1388 let projector = Projector::new(&conn);
1389 projector
1390 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1391 .unwrap();
1392
1393 let snapshot = make_event(
1394 EventType::Snapshot,
1395 "bn-001",
1396 EventData::Snapshot(SnapshotData {
1397 state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1398 extra: BTreeMap::new(),
1399 }),
1400 "bbb",
1401 2000,
1402 );
1403 projector.project_event(&snapshot).unwrap();
1404
1405 let row: String = conn
1406 .query_row(
1407 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1408 [],
1409 |row| row.get(0),
1410 )
1411 .unwrap();
1412 let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1413 assert_eq!(parsed["title"], "Snapshotted");
1414 }
1415
1416 #[test]
1421 fn project_redact_records_and_blanks_comment() {
1422 let conn = test_db();
1423 let projector = Projector::new(&conn);
1424 projector
1425 .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1426 .unwrap();
1427
1428 let comment = make_event(
1430 EventType::Comment,
1431 "bn-001",
1432 EventData::Comment(CommentData {
1433 body: "Secret password: hunter2".into(),
1434 extra: BTreeMap::new(),
1435 }),
1436 "comment_hash",
1437 2000,
1438 );
1439 projector.project_event(&comment).unwrap();
1440
1441 let redact = make_event(
1443 EventType::Redact,
1444 "bn-001",
1445 EventData::Redact(RedactData {
1446 target_hash: "blake3:comment_hash".into(),
1447 reason: "Accidental secret".into(),
1448 extra: BTreeMap::new(),
1449 }),
1450 "redact_hash",
1451 3000,
1452 );
1453 projector.project_event(&redact).unwrap();
1454
1455 let count: i64 = conn
1457 .query_row(
1458 "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1459 [],
1460 |row| row.get(0),
1461 )
1462 .unwrap();
1463 assert_eq!(count, 1);
1464
1465 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1467 assert_eq!(comments.len(), 1);
1468 assert_eq!(comments[0].body, "[redacted]");
1469 }
1470
1471 #[test]
1476 fn duplicate_events_are_skipped() {
1477 let conn = test_db();
1478 let projector = Projector::new(&conn);
1479
1480 let event = make_create("bn-001", "Item", "aaa", 1000);
1481 assert!(projector.project_event(&event).unwrap()); assert!(!projector.project_event(&event).unwrap()); let count: i64 = conn
1486 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1487 .unwrap();
1488 assert_eq!(count, 1);
1489 }
1490
1491 #[test]
1492 fn batch_dedup_counts() {
1493 let conn = test_db();
1494 let projector = Projector::new(&conn);
1495
1496 let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1497 let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1498
1499 let stats1 = projector
1501 .project_batch(&[event1.clone(), event2.clone()])
1502 .unwrap();
1503 assert_eq!(stats1.projected, 2);
1504 assert_eq!(stats1.duplicates, 0);
1505
1506 let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1508 assert_eq!(stats2.projected, 0);
1509 assert_eq!(stats2.duplicates, 2);
1510 }
1511
1512 #[test]
1517 fn incremental_matches_full_replay() {
1518 let events = vec![
1519 make_create("bn-001", "Auth bug", "h1", 1000),
1520 make_event(
1521 EventType::Move,
1522 "bn-001",
1523 EventData::Move(MoveData {
1524 state: State::Doing,
1525 reason: None,
1526 extra: BTreeMap::new(),
1527 }),
1528 "h2",
1529 2000,
1530 ),
1531 make_event(
1532 EventType::Assign,
1533 "bn-001",
1534 EventData::Assign(AssignData {
1535 agent: "alice".into(),
1536 action: AssignAction::Assign,
1537 extra: BTreeMap::new(),
1538 }),
1539 "h3",
1540 3000,
1541 ),
1542 make_event(
1543 EventType::Comment,
1544 "bn-001",
1545 EventData::Comment(CommentData {
1546 body: "Working on it".into(),
1547 extra: BTreeMap::new(),
1548 }),
1549 "h4",
1550 4000,
1551 ),
1552 make_event(
1553 EventType::Update,
1554 "bn-001",
1555 EventData::Update(UpdateData {
1556 field: "title".into(),
1557 value: serde_json::json!("Auth bug (fixed)"),
1558 extra: BTreeMap::new(),
1559 }),
1560 "h5",
1561 5000,
1562 ),
1563 make_event(
1564 EventType::Move,
1565 "bn-001",
1566 EventData::Move(MoveData {
1567 state: State::Done,
1568 reason: Some("Shipped".into()),
1569 extra: BTreeMap::new(),
1570 }),
1571 "h6",
1572 6000,
1573 ),
1574 ];
1575
1576 let conn_full = test_db();
1578 let proj_full = Projector::new(&conn_full);
1579 proj_full.project_batch(&events).unwrap();
1580
1581 let conn_inc = test_db();
1583 let proj_inc = Projector::new(&conn_inc);
1584 for event in &events {
1585 proj_inc.project_event(event).unwrap();
1586 }
1587
1588 let item_full = query::get_item(&conn_full, "bn-001", false)
1590 .unwrap()
1591 .unwrap();
1592 let item_inc = query::get_item(&conn_inc, "bn-001", false)
1593 .unwrap()
1594 .unwrap();
1595
1596 assert_eq!(item_full.title, item_inc.title);
1597 assert_eq!(item_full.state, item_inc.state);
1598 assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1599
1600 let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1601 let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1602 assert_eq!(assignees_full.len(), assignees_inc.len());
1603
1604 let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1605 let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1606 assert_eq!(comments_full.len(), comments_inc.len());
1607 }
1608
1609 #[test]
1614 fn clear_and_replay_produces_same_result() {
1615 let conn = test_db();
1616 let projector = Projector::new(&conn);
1617
1618 let events = vec![
1619 make_create("bn-001", "Item 1", "h1", 1000),
1620 make_create("bn-002", "Item 2", "h2", 1001),
1621 make_event(
1622 EventType::Link,
1623 "bn-002",
1624 EventData::Link(LinkData {
1625 target: "bn-001".into(),
1626 link_type: "blocks".into(),
1627 extra: BTreeMap::new(),
1628 }),
1629 "h3",
1630 2000,
1631 ),
1632 ];
1633
1634 projector.project_batch(&events).unwrap();
1636 let count1: i64 = conn
1637 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1638 .unwrap();
1639 assert_eq!(count1, 2);
1640
1641 clear_projection(&conn).unwrap();
1643 let count_after_clear: i64 = conn
1644 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1645 .unwrap();
1646 assert_eq!(count_after_clear, 0);
1647
1648 projector.project_batch(&events).unwrap();
1649 let count2: i64 = conn
1650 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1651 .unwrap();
1652 assert_eq!(count2, 2);
1653
1654 let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1655 assert_eq!(deps.len(), 1);
1656 }
1657
1658 #[test]
1663 fn events_on_missing_item_create_placeholder() {
1664 let conn = test_db();
1665 let projector = Projector::new(&conn);
1666
1667 let comment = make_event(
1669 EventType::Comment,
1670 "bn-ghost",
1671 EventData::Comment(CommentData {
1672 body: "Comment on missing item".into(),
1673 extra: BTreeMap::new(),
1674 }),
1675 "ccc",
1676 2000,
1677 );
1678 projector.project_event(&comment).unwrap();
1679
1680 let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1682 assert_eq!(item.title, ""); assert_eq!(item.state, "open");
1684 }
1685
1686 #[test]
1691 fn project_create_populates_fts() {
1692 let conn = test_db();
1693 let projector = Projector::new(&conn);
1694 projector
1695 .project_event(&make_create(
1696 "bn-001",
1697 "Authentication timeout",
1698 "aaa",
1699 1000,
1700 ))
1701 .unwrap();
1702
1703 let hits = query::search(&conn, "authentication", 10).unwrap();
1704 assert_eq!(hits.len(), 1);
1705 assert_eq!(hits[0].item_id, "bn-001");
1706 }
1707
1708 #[test]
1709 fn project_update_title_updates_fts() {
1710 let conn = test_db();
1711 let projector = Projector::new(&conn);
1712 projector
1713 .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1714 .unwrap();
1715
1716 let update = make_event(
1717 EventType::Update,
1718 "bn-001",
1719 EventData::Update(UpdateData {
1720 field: "title".into(),
1721 value: serde_json::json!("Authorization failure"),
1722 extra: BTreeMap::new(),
1723 }),
1724 "bbb",
1725 2000,
1726 );
1727 projector.project_event(&update).unwrap();
1728
1729 let hits_old = query::search(&conn, "Old", 10).unwrap();
1731 assert!(hits_old.is_empty());
1732
1733 let hits_new = query::search(&conn, "authorization", 10).unwrap();
1735 assert_eq!(hits_new.len(), 1);
1736 }
1737
1738 #[test]
1743 fn batch_reports_correct_stats() {
1744 let conn = test_db();
1745 let projector = Projector::new(&conn);
1746
1747 let events = vec![
1748 make_create("bn-001", "Item 1", "h1", 1000),
1749 make_create("bn-002", "Item 2", "h2", 1001),
1750 make_create("bn-003", "Item 3", "h3", 1002),
1751 ];
1752
1753 let stats = projector.project_batch(&events).unwrap();
1754 assert_eq!(stats.projected, 3);
1755 assert_eq!(stats.duplicates, 0);
1756 assert_eq!(stats.errors, 0);
1757 }
1758
1759 #[test]
1764 fn full_lifecycle_all_event_types() {
1765 let conn = test_db();
1766 let projector = Projector::new(&conn);
1767
1768 let mut events = vec![
1769 make_create("bn-001", "Auth bug", "h01", 1000),
1771 make_create("bn-002", "Dep item", "h02", 1001),
1773 ];
1774
1775 events.push(make_event(
1777 EventType::Update,
1778 "bn-001",
1779 EventData::Update(UpdateData {
1780 field: "title".into(),
1781 value: serde_json::json!("Auth timeout bug"),
1782 extra: BTreeMap::new(),
1783 }),
1784 "h03",
1785 2000,
1786 ));
1787
1788 events.push(make_event(
1790 EventType::Move,
1791 "bn-001",
1792 EventData::Move(MoveData {
1793 state: State::Doing,
1794 reason: None,
1795 extra: BTreeMap::new(),
1796 }),
1797 "h04",
1798 3000,
1799 ));
1800
1801 events.push(make_event(
1803 EventType::Assign,
1804 "bn-001",
1805 EventData::Assign(AssignData {
1806 agent: "alice".into(),
1807 action: AssignAction::Assign,
1808 extra: BTreeMap::new(),
1809 }),
1810 "h05",
1811 4000,
1812 ));
1813
1814 events.push(make_event(
1816 EventType::Comment,
1817 "bn-001",
1818 EventData::Comment(CommentData {
1819 body: "Found root cause".into(),
1820 extra: BTreeMap::new(),
1821 }),
1822 "h06",
1823 5000,
1824 ));
1825
1826 events.push(make_event(
1828 EventType::Link,
1829 "bn-001",
1830 EventData::Link(LinkData {
1831 target: "bn-002".into(),
1832 link_type: "blocks".into(),
1833 extra: BTreeMap::new(),
1834 }),
1835 "h07",
1836 6000,
1837 ));
1838
1839 events.push(make_event(
1841 EventType::Unlink,
1842 "bn-001",
1843 EventData::Unlink(UnlinkData {
1844 target: "bn-002".into(),
1845 link_type: Some("blocks".into()),
1846 extra: BTreeMap::new(),
1847 }),
1848 "h08",
1849 7000,
1850 ));
1851
1852 events.push(make_event(
1854 EventType::Compact,
1855 "bn-001",
1856 EventData::Compact(CompactData {
1857 summary: "Auth token refresh race".into(),
1858 extra: BTreeMap::new(),
1859 }),
1860 "h09",
1861 8000,
1862 ));
1863
1864 events.push(make_event(
1866 EventType::Snapshot,
1867 "bn-001",
1868 EventData::Snapshot(SnapshotData {
1869 state: serde_json::json!({"id": "bn-001", "resolved": true}),
1870 extra: BTreeMap::new(),
1871 }),
1872 "h10",
1873 9000,
1874 ));
1875
1876 events.push(make_event(
1878 EventType::Redact,
1879 "bn-001",
1880 EventData::Redact(RedactData {
1881 target_hash: "blake3:h06".into(),
1882 reason: "Contained secret".into(),
1883 extra: BTreeMap::new(),
1884 }),
1885 "h11",
1886 10000,
1887 ));
1888
1889 events.push(make_event(
1891 EventType::Delete,
1892 "bn-001",
1893 EventData::Delete(DeleteData {
1894 reason: Some("Duplicate".into()),
1895 extra: BTreeMap::new(),
1896 }),
1897 "h12",
1898 11000,
1899 ));
1900
1901 let stats = projector.project_batch(&events).unwrap();
1902 assert_eq!(stats.projected, 12); assert_eq!(stats.duplicates, 0);
1904 assert_eq!(stats.errors, 0);
1905
1906 let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1908 assert_eq!(item.title, "Auth timeout bug");
1909 assert_eq!(item.state, "doing");
1910 assert!(item.is_deleted);
1911 assert_eq!(
1912 item.compact_summary.as_deref(),
1913 Some("Auth token refresh race")
1914 );
1915 let snapshot: Option<String> = conn
1916 .query_row(
1917 "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1918 [],
1919 |row| row.get(0),
1920 )
1921 .unwrap();
1922 assert!(snapshot.is_some());
1923
1924 let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1926 assert_eq!(comments.len(), 1);
1927 assert_eq!(comments[0].body, "[redacted]");
1928
1929 let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1931 assert!(deps.is_empty());
1932
1933 let redaction_count: i64 = conn
1935 .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1936 row.get(0)
1937 })
1938 .unwrap();
1939 assert_eq!(redaction_count, 1);
1940 }
1941
1942 #[test]
1943 fn late_create_populates_placeholder() {
1944 let conn = test_db();
1945 let projector = Projector::new(&conn);
1946
1947 let comment = make_event(
1949 EventType::Comment,
1950 "bn-late",
1951 EventData::Comment(CommentData {
1952 body: "Comment first".into(),
1953 extra: BTreeMap::new(),
1954 }),
1955 "h1",
1956 1000,
1957 );
1958 projector.project_event(&comment).unwrap();
1959
1960 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1961 assert_eq!(item.title, "");
1962 assert_eq!(item.created_at_us, 1000);
1963
1964 let create = make_create("bn-late", "Real Title", "h2", 900);
1966
1967 projector.project_event(&create).unwrap();
1968
1969 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1970 assert_eq!(item.title, "Real Title");
1971 assert_eq!(item.created_at_us, 900);
1972 }
1973
1974 #[test]
1975 fn late_create_backfills_placeholder_after_field_update() {
1976 let conn = test_db();
1977 let projector = Projector::new(&conn);
1978
1979 let update = make_event(
1980 EventType::Update,
1981 "bn-late",
1982 EventData::Update(UpdateData {
1983 field: "title".into(),
1984 value: serde_json::json!("Updated before create"),
1985 extra: BTreeMap::new(),
1986 }),
1987 "h1",
1988 1000,
1989 );
1990 projector.project_event(&update).unwrap();
1991
1992 let create = make_create("bn-late", "Initial title", "h2", 900);
1993 projector.project_event(&create).unwrap();
1994
1995 let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1996 assert_eq!(item.title, "Updated before create");
1997 assert_eq!(item.kind, "task");
1998 assert_eq!(item.size.as_deref(), Some("m"));
1999 assert_eq!(item.description.as_deref(), Some("A detailed description"));
2000 assert_eq!(item.created_at_us, 900);
2001 assert_eq!(item.updated_at_us, 1000);
2002
2003 let labels = query::get_labels(&conn, "bn-late").unwrap();
2004 assert_eq!(labels.len(), 2);
2005 assert_eq!(item.search_labels, "auth backend");
2006 }
2007
2008 #[test]
2012 fn projector_new_creates_tracking_table_on_fresh_db() {
2013 let mut conn = Connection::open_in_memory().expect("open in-memory db");
2014 migrations::migrate(&mut conn).expect("migrate");
2015 let projector = Projector::new(&conn);
2019 let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
2020 projector
2021 .project_event(&event)
2022 .expect("project_event should succeed on fresh DB");
2023
2024 let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
2025 assert_eq!(item.title, "Fresh item");
2026 }
2027}