Skip to main content

bones_core/db/
project.rs

1//! Event replay → `SQLite` projection pipeline.
2//!
3//! The [`Projector`] replays events from the TSJSON event log and upserts
4//! the resulting state into the `SQLite` projection database. It handles all
5//! 11 event types and supports both incremental (single-event) and full
6//! rebuild modes.
7//!
8//! # Deduplication
9//!
10//! Events are deduplicated by `event_hash`. When a duplicate hash is
11//! encountered (e.g. from git merge duplicating lines in shard files),
12//! the event is silently skipped.
13//!
14//! # Cursor
15//!
16//! After projecting a batch, the caller can persist the byte offset and
17//! last event hash via [`super::query::update_projection_cursor`] for
18//! incremental replay on next startup.
19
20use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29// ---------------------------------------------------------------------------
30// ProjectionStats
31// ---------------------------------------------------------------------------
32
33/// Statistics returned after a projection run.
34#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36    /// Number of events successfully projected.
37    pub projected: usize,
38    /// Number of duplicate events skipped.
39    pub duplicates: usize,
40    /// Number of events that caused errors (logged and skipped).
41    pub errors: usize,
42}
43
44// ---------------------------------------------------------------------------
45// Projector
46// ---------------------------------------------------------------------------
47
48/// Replays events into the `SQLite` projection.
49///
50/// Create a `Projector` with a connection, then call [`project_event`] for
51/// each event or [`project_batch`] for a slice.
52pub struct Projector<'conn> {
53    conn: &'conn Connection,
54    has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58    /// Create a new projector backed by the given connection.
59    ///
60    /// Ensures the `projected_events` tracking table exists before any
61    /// projection work is attempted.
62    pub fn new(conn: &'conn Connection) -> Self {
63        // Best-effort: if the DDL fails (e.g. read-only DB) we'll still
64        // attempt projection and let it fail at the INSERT instead.
65        let _ = ensure_tracking_table(conn);
66        let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67        Self {
68            conn,
69            has_agent_column,
70        }
71    }
72
73    /// Project a batch of events, returning aggregate statistics.
74    ///
75    /// Events are applied inside a single transaction for performance.
76    /// Duplicate events (same `event_hash`) are silently skipped.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the transaction fails to commit. Individual event
81    /// projection errors are counted in `stats.errors` but do not abort the
82    /// batch.
83    pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84        let mut stats = ProjectionStats::default();
85
86        // Use a savepoint so we can commit all-or-nothing
87        self.conn
88            .execute_batch("BEGIN IMMEDIATE")
89            .context("begin projection transaction")?;
90
91        for event in events {
92            match self.project_event_inner(event) {
93                Ok(ProjectResult::Projected) => stats.projected += 1,
94                Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95                Err(e) => {
96                    tracing::warn!(
97                        event_hash = %event.event_hash,
98                        event_type = %event.event_type,
99                        item_id = %event.item_id,
100                        error = %e,
101                        "skipping event due to projection error"
102                    );
103                    stats.errors += 1;
104                }
105            }
106        }
107
108        self.conn
109            .execute_batch("COMMIT")
110            .context("commit projection transaction")?;
111
112        Ok(stats)
113    }
114
115    /// Project a single event (outside of any managed transaction).
116    ///
117    /// Returns `true` if the event was projected, `false` if it was a
118    /// duplicate.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the projection fails.
123    pub fn project_event(&self, event: &Event) -> Result<bool> {
124        let projected = match self.project_event_inner(event)? {
125            ProjectResult::Projected => true,
126            ProjectResult::Duplicate => false,
127        };
128
129        if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
130            tracing::warn!(
131                event_hash = %event.event_hash,
132                error = %err,
133                "failed to update projection cursor after single-event projection"
134            );
135        }
136
137        Ok(projected)
138    }
139
140    // -----------------------------------------------------------------------
141    // Internal dispatch
142    // -----------------------------------------------------------------------
143
144    fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
145        // Dedup check: skip if event_hash already projected
146        if self.is_event_projected(&event.event_hash)? {
147            return Ok(ProjectResult::Duplicate);
148        }
149
150        match event.event_type {
151            EventType::Create => self.project_create(event)?,
152            EventType::Update => self.project_update(event)?,
153            EventType::Move => self.project_move(event)?,
154            EventType::Assign => self.project_assign(event)?,
155            EventType::Comment => self.project_comment(event)?,
156            EventType::Link => self.project_link(event)?,
157            EventType::Unlink => self.project_unlink(event)?,
158            EventType::Delete => self.project_delete(event)?,
159            EventType::Compact => self.project_compact(event)?,
160            EventType::Snapshot => self.project_snapshot(event)?,
161            EventType::Redact => self.project_redact(event)?,
162        }
163
164        // Record that this event hash has been projected
165        self.record_projected_hash(&event.event_hash, event)?;
166
167        Ok(ProjectResult::Projected)
168    }
169
170    // -----------------------------------------------------------------------
171    // Dedup tracking
172    // -----------------------------------------------------------------------
173
174    fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
175        // Check item_comments table for comment events (unique on event_hash)
176        // and the event_redactions table for redact events.
177        // For a general dedup check, we use projection_meta's tracking.
178        // Simple approach: check if item_comments has this hash OR if
179        // event_redactions has it as target. For general dedup, we use
180        // a lightweight check in the projected_events tracking.
181        let exists: bool = self
182            .conn
183            .query_row(
184                "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
185                params![event_hash],
186                |row| row.get(0),
187            )
188            .unwrap_or(false);
189        Ok(exists)
190    }
191
192    fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
193        let exists: bool = self
194            .conn
195            .query_row(
196                "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
197                params![event_hash],
198                |row| row.get(0),
199            )
200            .unwrap_or(false);
201        Ok(exists)
202    }
203
204    fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
205        if self.has_agent_column {
206            self.conn
207                .execute(
208                    "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
209                     VALUES (?1, ?2, ?3, ?4, ?5)",
210                    params![
211                        event_hash,
212                        event.item_id.as_str(),
213                        event.event_type.as_str(),
214                        event.wall_ts_us,
215                        event.agent.as_str(),
216                    ],
217                )
218                .context("record projected event hash")?;
219            return Ok(());
220        }
221
222        self.conn
223            .execute(
224                "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
225                 VALUES (?1, ?2, ?3, ?4)",
226                params![
227                    event_hash,
228                    event.item_id.as_str(),
229                    event.event_type.as_str(),
230                    event.wall_ts_us,
231                ],
232            )
233            .context("record projected event hash")?;
234        Ok(())
235    }
236
237    fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
238        let main_db_file: String = self
239            .conn
240            .query_row(
241                "SELECT file FROM pragma_database_list WHERE name = 'main'",
242                [],
243                |row| row.get(0),
244            )
245            .context("read main database file from pragma_database_list")?;
246
247        if main_db_file.trim().is_empty() {
248            // In-memory projections used in tests have no on-disk log context.
249            return Ok(());
250        }
251
252        let db_path = std::path::Path::new(&main_db_file);
253        let Some(bones_dir) = db_path.parent() else {
254            return Ok(());
255        };
256
257        let shard_mgr = ShardManager::new(bones_dir);
258        let total_len = shard_mgr
259            .total_content_len()
260            .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
261        let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
262
263        query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
264            .context("write projection cursor after single-event projection")?;
265
266        Ok(())
267    }
268
269    // -----------------------------------------------------------------------
270    // Event type handlers
271    // -----------------------------------------------------------------------
272
273    fn project_create(&self, event: &Event) -> Result<()> {
274        let EventData::Create(ref data) = event.data else {
275            anyhow::bail!("expected Create data for item.create event");
276        };
277
278        let is_redacted = self.is_event_redacted(&event.event_hash)?;
279        let title = if is_redacted { "" } else { &data.title };
280        let description = if is_redacted {
281            None
282        } else {
283            data.description.as_deref()
284        };
285        let labels_str = if is_redacted {
286            String::new()
287        } else {
288            data.labels.join(" ")
289        };
290
291        self.conn
292            .execute(
293                "INSERT INTO items (
294                    item_id, title, description, kind, state, urgency,
295                    size, parent_id, is_deleted, search_labels,
296                    created_at_us, updated_at_us
297                ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
298                ON CONFLICT(item_id) DO UPDATE SET
299                    title = excluded.title,
300                    description = excluded.description,
301                    kind = excluded.kind,
302                    urgency = excluded.urgency,
303                    size = excluded.size,
304                    parent_id = excluded.parent_id,
305                    created_at_us = excluded.created_at_us,
306                    search_labels = excluded.search_labels,
307                    updated_at_us = excluded.updated_at_us
308                WHERE items.title = ''",
309                params![
310                    event.item_id.as_str(),
311                    title,
312                    description,
313                    data.kind.to_string(),
314                    data.urgency.to_string(),
315                    data.size.map(|s| s.to_string()),
316                    data.parent.as_deref(),
317                    labels_str,
318                    event.wall_ts_us,
319                    event.wall_ts_us,
320                ],
321            )
322            .with_context(|| format!("project create for {}", event.item_id))?;
323
324        // Insert initial labels
325        if !is_redacted {
326            for label in &data.labels {
327                self.conn
328                    .execute(
329                        "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
330                         VALUES (?1, ?2, ?3)",
331                        params![event.item_id.as_str(), label, event.wall_ts_us],
332                    )
333                    .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
334            }
335        }
336
337        Ok(())
338    }
339
340    #[allow(clippy::too_many_lines)]
341    fn project_update(&self, event: &Event) -> Result<()> {
342        let EventData::Update(ref data) = event.data else {
343            anyhow::bail!("expected Update data for item.update event");
344        };
345
346        self.ensure_item_exists(event)?;
347
348        let is_redacted = self.is_event_redacted(&event.event_hash)?;
349
350        match data.field.as_str() {
351            "title" => {
352                let value = if is_redacted {
353                    "[redacted]"
354                } else {
355                    data.value.as_str().unwrap_or_default()
356                };
357                self.conn.execute(
358                    "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
359                    params![value, event.wall_ts_us, event.item_id.as_str()],
360                )?;
361            }
362            "description" => {
363                let value = if is_redacted {
364                    Some("[redacted]".to_string())
365                } else {
366                    data.value.as_str().map(String::from)
367                };
368                self.conn.execute(
369                    "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
370                    params![value, event.wall_ts_us, event.item_id.as_str()],
371                )?;
372            }
373            "kind" => {
374                let value = data.value.as_str().unwrap_or("task");
375                self.conn.execute(
376                    "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
377                    params![value, event.wall_ts_us, event.item_id.as_str()],
378                )?;
379            }
380            "size" => {
381                let value = data.value.as_str().map(String::from);
382                self.conn.execute(
383                    "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
384                    params![value, event.wall_ts_us, event.item_id.as_str()],
385                )?;
386            }
387            "urgency" => {
388                let value = data.value.as_str().unwrap_or("default");
389                self.conn.execute(
390                    "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
391                    params![value, event.wall_ts_us, event.item_id.as_str()],
392                )?;
393            }
394            "parent" => {
395                let value = data.value.as_str().map(String::from);
396                self.conn.execute(
397                    "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
398                    params![value, event.wall_ts_us, event.item_id.as_str()],
399                )?;
400            }
401            "labels" => {
402                // Labels update: supports both legacy array replacement
403                // and new add/remove action format (CRDT-friendly).
404                let mut changed = false;
405
406                if let Some(labels) = data.value.as_array() {
407                    // Legacy: replace entire label set
408                    self.conn.execute(
409                        "DELETE FROM item_labels WHERE item_id = ?1",
410                        params![event.item_id.as_str()],
411                    )?;
412
413                    for label_val in labels {
414                        if let Some(label) = label_val.as_str() {
415                            self.conn.execute(
416                                "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
417                                 VALUES (?1, ?2, ?3)",
418                                params![event.item_id.as_str(), label, event.wall_ts_us],
419                            )?;
420                        }
421                    }
422                    changed = true;
423                } else if let Some(obj) = data.value.as_object() {
424                    // New: add/remove single label
425                    let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
426                    let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
427
428                    if !label.is_empty() {
429                        match action {
430                            "add" => {
431                                self.conn.execute(
432                                    "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
433                                     VALUES (?1, ?2, ?3)",
434                                    params![event.item_id.as_str(), label, event.wall_ts_us],
435                                )?;
436                                changed = true;
437                            }
438                            "remove" => {
439                                self.conn.execute(
440                                    "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
441                                    params![event.item_id.as_str(), label],
442                                )?;
443                                changed = true;
444                            }
445                            _ => {}
446                        }
447                    }
448                }
449
450                if changed {
451                    // Reconstruct search_labels from item_labels table to keep FTS in sync.
452                    let mut stmt = self.conn.prepare(
453                        "SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label",
454                    )?;
455                    let label_rows = stmt.query_map(params![event.item_id.as_str()], |row| {
456                        row.get::<_, String>(0)
457                    })?;
458
459                    let mut label_strings = Vec::new();
460                    for label_res in label_rows {
461                        label_strings.push(label_res?);
462                    }
463
464                    let search_labels = label_strings.join(" ");
465                    self.conn.execute(
466                        "UPDATE items SET search_labels = ?1, updated_at_us = ?2 WHERE item_id = ?3",
467                        params![search_labels, event.wall_ts_us, event.item_id.as_str()],
468                    )?;
469                }
470            }
471            _ => {
472                // Unknown field — just bump updated_at
473                tracing::debug!(
474                    field = %data.field,
475                    item_id = %event.item_id,
476                    "ignoring update for unknown field"
477                );
478                self.conn.execute(
479                    "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
480                    params![event.wall_ts_us, event.item_id.as_str()],
481                )?;
482            }
483        }
484
485        Ok(())
486    }
487
488    fn project_move(&self, event: &Event) -> Result<()> {
489        let EventData::Move(ref data) = event.data else {
490            anyhow::bail!("expected Move data for item.move event");
491        };
492
493        self.ensure_item_exists(event)?;
494
495        self.conn
496            .execute(
497                "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
498                params![
499                    data.state.to_string(),
500                    event.wall_ts_us,
501                    event.item_id.as_str(),
502                ],
503            )
504            .with_context(|| format!("project move for {}", event.item_id))?;
505
506        Ok(())
507    }
508
509    fn project_assign(&self, event: &Event) -> Result<()> {
510        let EventData::Assign(ref data) = event.data else {
511            anyhow::bail!("expected Assign data for item.assign event");
512        };
513
514        self.ensure_item_exists(event)?;
515
516        match data.action {
517            AssignAction::Assign => {
518                self.conn
519                    .execute(
520                        "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
521                         VALUES (?1, ?2, ?3)",
522                        params![event.item_id.as_str(), data.agent, event.wall_ts_us],
523                    )
524                    .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
525            }
526            AssignAction::Unassign => {
527                self.conn
528                    .execute(
529                        "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
530                        params![event.item_id.as_str(), data.agent],
531                    )
532                    .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
533            }
534        }
535
536        // Bump updated_at
537        self.conn.execute(
538            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
539            params![event.wall_ts_us, event.item_id.as_str()],
540        )?;
541
542        Ok(())
543    }
544
545    fn project_comment(&self, event: &Event) -> Result<()> {
546        let EventData::Comment(ref data) = event.data else {
547            anyhow::bail!("expected Comment data for item.comment event");
548        };
549
550        self.ensure_item_exists(event)?;
551
552        let is_redacted = self.is_event_redacted(&event.event_hash)?;
553        let body = if is_redacted {
554            "[redacted]"
555        } else {
556            &data.body
557        };
558
559        self.conn
560            .execute(
561                "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
562                 VALUES (?1, ?2, ?3, ?4, ?5)",
563                params![
564                    event.item_id.as_str(),
565                    event.event_hash,
566                    event.agent,
567                    body,
568                    event.wall_ts_us,
569                ],
570            )
571            .with_context(|| format!("project comment for {}", event.item_id))?;
572
573        // Bump updated_at
574        self.conn.execute(
575            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
576            params![event.wall_ts_us, event.item_id.as_str()],
577        )?;
578
579        Ok(())
580    }
581
582    fn project_link(&self, event: &Event) -> Result<()> {
583        let EventData::Link(ref data) = event.data else {
584            anyhow::bail!("expected Link data for item.link event");
585        };
586
587        self.ensure_item_exists(event)?;
588
589        self.conn
590            .execute(
591                "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
592                 VALUES (?1, ?2, ?3, ?4)",
593                params![
594                    event.item_id.as_str(),
595                    data.target,
596                    data.link_type,
597                    event.wall_ts_us,
598                ],
599            )
600            .with_context(|| {
601                format!("project link {} -> {}", event.item_id, data.target)
602            })?;
603
604        // Bump updated_at
605        self.conn.execute(
606            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
607            params![event.wall_ts_us, event.item_id.as_str()],
608        )?;
609
610        Ok(())
611    }
612
613    fn project_unlink(&self, event: &Event) -> Result<()> {
614        let EventData::Unlink(ref data) = event.data else {
615            anyhow::bail!("expected Unlink data for item.unlink event");
616        };
617
618        self.ensure_item_exists(event)?;
619
620        if let Some(ref link_type) = data.link_type {
621            self.conn
622                .execute(
623                    "DELETE FROM item_dependencies \
624                     WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
625                    params![event.item_id.as_str(), data.target, link_type],
626                )
627                .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
628        } else {
629            // No link_type: remove all links to target
630            self.conn
631                .execute(
632                    "DELETE FROM item_dependencies \
633                     WHERE item_id = ?1 AND depends_on_item_id = ?2",
634                    params![event.item_id.as_str(), data.target],
635                )
636                .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
637        }
638
639        // Bump updated_at
640        self.conn.execute(
641            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
642            params![event.wall_ts_us, event.item_id.as_str()],
643        )?;
644
645        Ok(())
646    }
647
648    fn project_delete(&self, event: &Event) -> Result<()> {
649        self.ensure_item_exists(event)?;
650
651        self.conn
652            .execute(
653                "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
654                 WHERE item_id = ?2",
655                params![event.wall_ts_us, event.item_id.as_str()],
656            )
657            .with_context(|| format!("project delete for {}", event.item_id))?;
658
659        Ok(())
660    }
661
662    fn project_compact(&self, event: &Event) -> Result<()> {
663        let EventData::Compact(ref data) = event.data else {
664            anyhow::bail!("expected Compact data for item.compact event");
665        };
666
667        self.ensure_item_exists(event)?;
668
669        let is_redacted = self.is_event_redacted(&event.event_hash)?;
670        let summary = if is_redacted {
671            "[redacted]"
672        } else {
673            data.summary.as_str()
674        };
675
676        self.conn
677            .execute(
678                "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
679                params![summary, event.wall_ts_us, event.item_id.as_str()],
680            )
681            .with_context(|| format!("project compact for {}", event.item_id))?;
682
683        Ok(())
684    }
685
686    fn project_snapshot(&self, event: &Event) -> Result<()> {
687        let EventData::Snapshot(ref data) = event.data else {
688            anyhow::bail!("expected Snapshot data for item.snapshot event");
689        };
690
691        self.ensure_item_exists(event)?;
692
693        let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
694
695        self.conn
696            .execute(
697                "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
698                params![json_str, event.wall_ts_us, event.item_id.as_str()],
699            )
700            .with_context(|| format!("project snapshot for {}", event.item_id))?;
701
702        Ok(())
703    }
704
705    fn project_redact(&self, event: &Event) -> Result<()> {
706        let EventData::Redact(ref data) = event.data else {
707            anyhow::bail!("expected Redact data for item.redact event");
708        };
709
710        // Insert redaction record
711        self.conn
712            .execute(
713                "INSERT OR IGNORE INTO event_redactions \
714                 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
715                 VALUES (?1, ?2, ?3, ?4, ?5)",
716                params![
717                    data.target_hash,
718                    event.item_id.as_str(),
719                    data.reason,
720                    event.agent,
721                    event.wall_ts_us,
722                ],
723            )
724            .with_context(|| {
725                format!(
726                    "project redact for {} targeting {}",
727                    event.item_id, data.target_hash
728                )
729            })?;
730
731        // Redact the comment body if the target hash is a comment event
732        self.conn
733            .execute(
734                "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
735                params![data.target_hash],
736            )
737            .context("redact comment body")?;
738
739        Ok(())
740    }
741
742    // -----------------------------------------------------------------------
743    // Helpers
744    // -----------------------------------------------------------------------
745
746    /// Ensure the item exists in the projection. If not, create a placeholder
747    /// row so that subsequent operations (UPDATE, foreign keys) succeed.
748    ///
749    /// This handles out-of-order event replay and missing create events.
750    fn ensure_item_exists(&self, event: &Event) -> Result<()> {
751        let exists: bool = self
752            .conn
753            .query_row(
754                "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
755                params![event.item_id.as_str()],
756                |row| row.get(0),
757            )
758            .context("check item exists")?;
759
760        if !exists {
761            self.conn
762                .execute(
763                    "INSERT INTO items (
764                        item_id, title, kind, state, urgency,
765                        is_deleted, search_labels, created_at_us, updated_at_us
766                    ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
767                    params![event.item_id.as_str(), event.wall_ts_us],
768                )
769                .with_context(|| format!("create placeholder item for {}", event.item_id))?;
770        }
771
772        Ok(())
773    }
774}
775
776enum ProjectResult {
777    Projected,
778    Duplicate,
779}
780
781// ---------------------------------------------------------------------------
782// Schema addition: projected_events tracking table
783// ---------------------------------------------------------------------------
784
785/// SQL to create the `projected_events` tracking table.
786///
787/// This is applied as part of the projection setup, not as a schema migration,
788/// because it is projection-internal bookkeeping.
789pub const PROJECTED_EVENTS_DDL: &str = "\
790CREATE TABLE IF NOT EXISTS projected_events (
791    event_hash TEXT PRIMARY KEY,
792    item_id TEXT NOT NULL,
793    event_type TEXT NOT NULL,
794    projected_at_us INTEGER NOT NULL,
795    agent TEXT NOT NULL DEFAULT ''
796);
797CREATE INDEX IF NOT EXISTS idx_projected_events_item
798    ON projected_events(item_id);
799CREATE INDEX IF NOT EXISTS idx_projected_events_agent
800    ON projected_events(agent);
801";
802
803/// Ensure the `projected_events` tracking table exists.
804///
805/// Call this once after opening the projection database and before
806/// projecting events.
807///
808/// # Errors
809///
810/// Returns an error if executing the DDL fails.
811pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
812    conn.execute_batch(PROJECTED_EVENTS_DDL)
813        .context("create projected_events tracking table")?;
814
815    if !projected_events_has_agent_column(conn)? {
816        conn.execute(
817            "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
818            [],
819        )
820        .context("add agent column to projected_events")?;
821    }
822
823    conn.execute_batch(
824        "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
825    )
826    .context("create projected_events_agent index")?;
827
828    Ok(())
829}
830
831fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
832    let mut stmt = conn
833        .prepare("PRAGMA table_info(projected_events)")
834        .context("inspect projected_events schema")?;
835    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
836
837    for row in rows {
838        let name = row.context("read projected_events column")?;
839        if name == "agent" {
840            return Ok(true);
841        }
842    }
843
844    Ok(false)
845}
846
847/// Drop all projection data for a full rebuild.
848///
849/// Clears all items, edge tables, comments, redactions, FTS index, and the
850/// projected events tracking table. Schema structure is preserved.
851///
852/// # Errors
853///
854/// Returns an error if the truncation fails.
855pub fn clear_projection(conn: &Connection) -> Result<()> {
856    conn.execute_batch(
857        "DELETE FROM event_redactions;
858         DELETE FROM item_comments;
859         DELETE FROM item_dependencies;
860         DELETE FROM item_assignees;
861         DELETE FROM item_labels;
862         DELETE FROM items;
863         DELETE FROM projected_events;
864         UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
865    )
866    .context("clear projection tables")?;
867    Ok(())
868}
869
870// ---------------------------------------------------------------------------
871// Tests
872// ---------------------------------------------------------------------------
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use crate::db::{migrations, query};
878    use crate::event::data::*;
879    use crate::event::types::EventType;
880    use crate::event::writer;
881    use crate::model::item::{Kind, Size, State, Urgency};
882    use crate::model::item_id::ItemId;
883    use crate::shard::ShardManager;
884    use rusqlite::Connection;
885    use std::collections::BTreeMap;
886    use tempfile::TempDir;
887
888    fn test_db() -> Connection {
889        let mut conn = Connection::open_in_memory().expect("open in-memory db");
890        migrations::migrate(&mut conn).expect("migrate");
891        ensure_tracking_table(&conn).expect("create tracking table");
892        conn
893    }
894
895    fn make_event(
896        event_type: EventType,
897        item_id: &str,
898        data: EventData,
899        hash: &str,
900        ts: i64,
901    ) -> Event {
902        Event {
903            wall_ts_us: ts,
904            agent: "test-agent".into(),
905            itc: "itc:AQ".into(),
906            parents: vec![],
907            event_type,
908            item_id: ItemId::new_unchecked(item_id),
909            data,
910            event_hash: format!("blake3:{hash}"),
911        }
912    }
913
914    fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
915        make_event(
916            EventType::Create,
917            id,
918            EventData::Create(CreateData {
919                title: title.into(),
920                kind: Kind::Task,
921                size: Some(Size::M),
922                urgency: Urgency::Default,
923                labels: vec!["backend".into(), "auth".into()],
924                parent: None,
925                causation: None,
926                description: Some("A detailed description".into()),
927                extra: BTreeMap::new(),
928            }),
929            hash,
930            ts,
931        )
932    }
933
934    #[test]
935    fn project_event_updates_projection_cursor_for_file_backed_db() {
936        let dir = TempDir::new().expect("tempdir");
937        let bones_dir = dir.path().join(".bones");
938        std::fs::create_dir_all(&bones_dir).expect("create .bones");
939
940        let shard_mgr = ShardManager::new(&bones_dir);
941        shard_mgr.init().expect("init shard manager");
942
943        let db_path = bones_dir.join("bones.db");
944        let mut conn = Connection::open(&db_path).expect("open projection db");
945        migrations::migrate(&mut conn).expect("migrate");
946        ensure_tracking_table(&conn).expect("create tracking table");
947
948        let projector = Projector::new(&conn);
949        let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
950        let line = writer::write_event(&mut event).expect("serialize event");
951        shard_mgr
952            .append(&line, false, std::time::Duration::from_secs(5))
953            .expect("append event line");
954
955        projector.project_event(&event).expect("project event");
956
957        let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
958        let expected_offset =
959            i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
960
961        assert_eq!(offset, expected_offset);
962        assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
963    }
964
965    // -----------------------------------------------------------------------
966    // Create
967    // -----------------------------------------------------------------------
968
969    #[test]
970    fn project_create_inserts_item() {
971        let conn = test_db();
972        let projector = Projector::new(&conn);
973        let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
974
975        let result = projector.project_event(&event).unwrap();
976        assert!(result, "should return true for new projection");
977
978        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
979        assert_eq!(item.title, "Fix auth timeout");
980        assert_eq!(item.kind, "task");
981        assert_eq!(item.state, "open");
982        assert_eq!(item.urgency, "default");
983        assert_eq!(item.size.as_deref(), Some("m"));
984        assert_eq!(item.description.as_deref(), Some("A detailed description"));
985        assert_eq!(item.created_at_us, 1000);
986        assert_eq!(item.updated_at_us, 1000);
987    }
988
989    #[test]
990    fn project_create_inserts_labels() {
991        let conn = test_db();
992        let projector = Projector::new(&conn);
993        let event = make_create("bn-001", "Fix auth", "aaa", 1000);
994        projector.project_event(&event).unwrap();
995
996        let labels = query::get_labels(&conn, "bn-001").unwrap();
997        assert_eq!(labels.len(), 2);
998        assert_eq!(labels[0].label, "auth");
999        assert_eq!(labels[1].label, "backend");
1000    }
1001
1002    // -----------------------------------------------------------------------
1003    // Update
1004    // -----------------------------------------------------------------------
1005
1006    #[test]
1007    fn project_update_title() {
1008        let conn = test_db();
1009        let projector = Projector::new(&conn);
1010        projector
1011            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1012            .unwrap();
1013
1014        let update = make_event(
1015            EventType::Update,
1016            "bn-001",
1017            EventData::Update(UpdateData {
1018                field: "title".into(),
1019                value: serde_json::json!("New title"),
1020                extra: BTreeMap::new(),
1021            }),
1022            "bbb",
1023            2000,
1024        );
1025        projector.project_event(&update).unwrap();
1026
1027        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1028        assert_eq!(item.title, "New title");
1029        assert_eq!(item.updated_at_us, 2000);
1030    }
1031
1032    #[test]
1033    fn project_update_description() {
1034        let conn = test_db();
1035        let projector = Projector::new(&conn);
1036        projector
1037            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1038            .unwrap();
1039
1040        let update = make_event(
1041            EventType::Update,
1042            "bn-001",
1043            EventData::Update(UpdateData {
1044                field: "description".into(),
1045                value: serde_json::json!("Updated description"),
1046                extra: BTreeMap::new(),
1047            }),
1048            "bbb",
1049            2000,
1050        );
1051        projector.project_event(&update).unwrap();
1052
1053        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1054        assert_eq!(item.description.as_deref(), Some("Updated description"));
1055    }
1056
1057    #[test]
1058    fn project_update_labels() {
1059        let conn = test_db();
1060        let projector = Projector::new(&conn);
1061        projector
1062            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1063            .unwrap();
1064
1065        let update = make_event(
1066            EventType::Update,
1067            "bn-001",
1068            EventData::Update(UpdateData {
1069                field: "labels".into(),
1070                value: serde_json::json!(["frontend", "urgent"]),
1071                extra: BTreeMap::new(),
1072            }),
1073            "bbb",
1074            2000,
1075        );
1076        projector.project_event(&update).unwrap();
1077
1078        let labels = query::get_labels(&conn, "bn-001").unwrap();
1079        assert_eq!(labels.len(), 2);
1080        assert_eq!(labels[0].label, "frontend");
1081        assert_eq!(labels[1].label, "urgent");
1082    }
1083
1084    #[test]
1085    fn project_update_unknown_field_bumps_updated() {
1086        let conn = test_db();
1087        let projector = Projector::new(&conn);
1088        projector
1089            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1090            .unwrap();
1091
1092        let update = make_event(
1093            EventType::Update,
1094            "bn-001",
1095            EventData::Update(UpdateData {
1096                field: "future_field".into(),
1097                value: serde_json::json!("whatever"),
1098                extra: BTreeMap::new(),
1099            }),
1100            "bbb",
1101            2000,
1102        );
1103        projector.project_event(&update).unwrap();
1104
1105        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1106        assert_eq!(item.updated_at_us, 2000);
1107    }
1108
1109    // -----------------------------------------------------------------------
1110    // Move
1111    // -----------------------------------------------------------------------
1112
1113    #[test]
1114    fn project_move_updates_state() {
1115        let conn = test_db();
1116        let projector = Projector::new(&conn);
1117        projector
1118            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1119            .unwrap();
1120
1121        let mv = make_event(
1122            EventType::Move,
1123            "bn-001",
1124            EventData::Move(MoveData {
1125                state: State::Doing,
1126                reason: Some("Starting work".into()),
1127                extra: BTreeMap::new(),
1128            }),
1129            "bbb",
1130            2000,
1131        );
1132        projector.project_event(&mv).unwrap();
1133
1134        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1135        assert_eq!(item.state, "doing");
1136    }
1137
1138    // -----------------------------------------------------------------------
1139    // Assign / Unassign
1140    // -----------------------------------------------------------------------
1141
1142    #[test]
1143    fn project_assign_and_unassign() {
1144        let conn = test_db();
1145        let projector = Projector::new(&conn);
1146        projector
1147            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1148            .unwrap();
1149
1150        // Assign alice
1151        let assign = make_event(
1152            EventType::Assign,
1153            "bn-001",
1154            EventData::Assign(AssignData {
1155                agent: "alice".into(),
1156                action: AssignAction::Assign,
1157                extra: BTreeMap::new(),
1158            }),
1159            "bbb",
1160            2000,
1161        );
1162        projector.project_event(&assign).unwrap();
1163
1164        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1165        assert_eq!(assignees.len(), 1);
1166        assert_eq!(assignees[0].agent, "alice");
1167
1168        // Unassign alice
1169        let unassign = make_event(
1170            EventType::Assign,
1171            "bn-001",
1172            EventData::Assign(AssignData {
1173                agent: "alice".into(),
1174                action: AssignAction::Unassign,
1175                extra: BTreeMap::new(),
1176            }),
1177            "ccc",
1178            3000,
1179        );
1180        projector.project_event(&unassign).unwrap();
1181
1182        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1183        assert!(assignees.is_empty());
1184    }
1185
1186    // -----------------------------------------------------------------------
1187    // Comment
1188    // -----------------------------------------------------------------------
1189
1190    #[test]
1191    fn project_comment_inserts_row() {
1192        let conn = test_db();
1193        let projector = Projector::new(&conn);
1194        projector
1195            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1196            .unwrap();
1197
1198        let comment = make_event(
1199            EventType::Comment,
1200            "bn-001",
1201            EventData::Comment(CommentData {
1202                body: "This is a comment".into(),
1203                extra: BTreeMap::new(),
1204            }),
1205            "bbb",
1206            2000,
1207        );
1208        projector.project_event(&comment).unwrap();
1209
1210        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1211        assert_eq!(comments.len(), 1);
1212        assert_eq!(comments[0].body, "This is a comment");
1213        assert_eq!(comments[0].author, "test-agent");
1214        assert_eq!(comments[0].event_hash, "blake3:bbb");
1215    }
1216
1217    // -----------------------------------------------------------------------
1218    // Link / Unlink
1219    // -----------------------------------------------------------------------
1220
1221    #[test]
1222    fn project_link_and_unlink() {
1223        let conn = test_db();
1224        let projector = Projector::new(&conn);
1225        projector
1226            .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1227            .unwrap();
1228        projector
1229            .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1230            .unwrap();
1231
1232        // Link
1233        let link = make_event(
1234            EventType::Link,
1235            "bn-002",
1236            EventData::Link(LinkData {
1237                target: "bn-001".into(),
1238                link_type: "blocks".into(),
1239                extra: BTreeMap::new(),
1240            }),
1241            "ccc",
1242            2000,
1243        );
1244        projector.project_event(&link).unwrap();
1245
1246        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1247        assert_eq!(deps.len(), 1);
1248        assert_eq!(deps[0].depends_on_item_id, "bn-001");
1249
1250        // Unlink
1251        let unlink = make_event(
1252            EventType::Unlink,
1253            "bn-002",
1254            EventData::Unlink(UnlinkData {
1255                target: "bn-001".into(),
1256                link_type: Some("blocks".into()),
1257                extra: BTreeMap::new(),
1258            }),
1259            "ddd",
1260            3000,
1261        );
1262        projector.project_event(&unlink).unwrap();
1263
1264        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1265        assert!(deps.is_empty());
1266    }
1267
1268    // -----------------------------------------------------------------------
1269    // Delete
1270    // -----------------------------------------------------------------------
1271
1272    #[test]
1273    fn project_delete_soft_deletes() {
1274        let conn = test_db();
1275        let projector = Projector::new(&conn);
1276        projector
1277            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1278            .unwrap();
1279
1280        let delete = make_event(
1281            EventType::Delete,
1282            "bn-001",
1283            EventData::Delete(DeleteData {
1284                reason: Some("Duplicate".into()),
1285                extra: BTreeMap::new(),
1286            }),
1287            "bbb",
1288            2000,
1289        );
1290        projector.project_event(&delete).unwrap();
1291
1292        // Not visible without include_deleted
1293        assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1294        // Visible with include_deleted
1295        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1296        assert!(item.is_deleted);
1297        assert_eq!(item.deleted_at_us, Some(2000));
1298    }
1299
1300    // -----------------------------------------------------------------------
1301    // Compact
1302    // -----------------------------------------------------------------------
1303
1304    #[test]
1305    fn project_compact_sets_summary() {
1306        let conn = test_db();
1307        let projector = Projector::new(&conn);
1308        projector
1309            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1310            .unwrap();
1311
1312        let compact = make_event(
1313            EventType::Compact,
1314            "bn-001",
1315            EventData::Compact(CompactData {
1316                summary: "TL;DR: auth fix".into(),
1317                extra: BTreeMap::new(),
1318            }),
1319            "bbb",
1320            2000,
1321        );
1322        projector.project_event(&compact).unwrap();
1323
1324        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1325        assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1326    }
1327
1328    // -----------------------------------------------------------------------
1329    // Snapshot
1330    // -----------------------------------------------------------------------
1331
1332    #[test]
1333    fn project_snapshot_stores_json() {
1334        let conn = test_db();
1335        let projector = Projector::new(&conn);
1336        projector
1337            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1338            .unwrap();
1339
1340        let snapshot = make_event(
1341            EventType::Snapshot,
1342            "bn-001",
1343            EventData::Snapshot(SnapshotData {
1344                state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1345                extra: BTreeMap::new(),
1346            }),
1347            "bbb",
1348            2000,
1349        );
1350        projector.project_event(&snapshot).unwrap();
1351
1352        let row: String = conn
1353            .query_row(
1354                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1355                [],
1356                |row| row.get(0),
1357            )
1358            .unwrap();
1359        let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1360        assert_eq!(parsed["title"], "Snapshotted");
1361    }
1362
1363    // -----------------------------------------------------------------------
1364    // Redact
1365    // -----------------------------------------------------------------------
1366
1367    #[test]
1368    fn project_redact_records_and_blanks_comment() {
1369        let conn = test_db();
1370        let projector = Projector::new(&conn);
1371        projector
1372            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1373            .unwrap();
1374
1375        // First add a comment
1376        let comment = make_event(
1377            EventType::Comment,
1378            "bn-001",
1379            EventData::Comment(CommentData {
1380                body: "Secret password: hunter2".into(),
1381                extra: BTreeMap::new(),
1382            }),
1383            "comment_hash",
1384            2000,
1385        );
1386        projector.project_event(&comment).unwrap();
1387
1388        // Redact it
1389        let redact = make_event(
1390            EventType::Redact,
1391            "bn-001",
1392            EventData::Redact(RedactData {
1393                target_hash: "blake3:comment_hash".into(),
1394                reason: "Accidental secret".into(),
1395                extra: BTreeMap::new(),
1396            }),
1397            "redact_hash",
1398            3000,
1399        );
1400        projector.project_event(&redact).unwrap();
1401
1402        // Check redaction record
1403        let count: i64 = conn
1404            .query_row(
1405                "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1406                [],
1407                |row| row.get(0),
1408            )
1409            .unwrap();
1410        assert_eq!(count, 1);
1411
1412        // Check comment body is redacted
1413        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1414        assert_eq!(comments.len(), 1);
1415        assert_eq!(comments[0].body, "[redacted]");
1416    }
1417
1418    // -----------------------------------------------------------------------
1419    // Dedup
1420    // -----------------------------------------------------------------------
1421
1422    #[test]
1423    fn duplicate_events_are_skipped() {
1424        let conn = test_db();
1425        let projector = Projector::new(&conn);
1426
1427        let event = make_create("bn-001", "Item", "aaa", 1000);
1428        assert!(projector.project_event(&event).unwrap()); // first time
1429        assert!(!projector.project_event(&event).unwrap()); // duplicate
1430
1431        // Only one item created
1432        let count: i64 = conn
1433            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1434            .unwrap();
1435        assert_eq!(count, 1);
1436    }
1437
1438    #[test]
1439    fn batch_dedup_counts() {
1440        let conn = test_db();
1441        let projector = Projector::new(&conn);
1442
1443        let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1444        let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1445
1446        // Project first batch
1447        let stats1 = projector
1448            .project_batch(&[event1.clone(), event2.clone()])
1449            .unwrap();
1450        assert_eq!(stats1.projected, 2);
1451        assert_eq!(stats1.duplicates, 0);
1452
1453        // Replay same batch — all duplicates
1454        let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1455        assert_eq!(stats2.projected, 0);
1456        assert_eq!(stats2.duplicates, 2);
1457    }
1458
1459    // -----------------------------------------------------------------------
1460    // Full replay: incremental matches full rebuild
1461    // -----------------------------------------------------------------------
1462
1463    #[test]
1464    fn incremental_matches_full_replay() {
1465        let events = vec![
1466            make_create("bn-001", "Auth bug", "h1", 1000),
1467            make_event(
1468                EventType::Move,
1469                "bn-001",
1470                EventData::Move(MoveData {
1471                    state: State::Doing,
1472                    reason: None,
1473                    extra: BTreeMap::new(),
1474                }),
1475                "h2",
1476                2000,
1477            ),
1478            make_event(
1479                EventType::Assign,
1480                "bn-001",
1481                EventData::Assign(AssignData {
1482                    agent: "alice".into(),
1483                    action: AssignAction::Assign,
1484                    extra: BTreeMap::new(),
1485                }),
1486                "h3",
1487                3000,
1488            ),
1489            make_event(
1490                EventType::Comment,
1491                "bn-001",
1492                EventData::Comment(CommentData {
1493                    body: "Working on it".into(),
1494                    extra: BTreeMap::new(),
1495                }),
1496                "h4",
1497                4000,
1498            ),
1499            make_event(
1500                EventType::Update,
1501                "bn-001",
1502                EventData::Update(UpdateData {
1503                    field: "title".into(),
1504                    value: serde_json::json!("Auth bug (fixed)"),
1505                    extra: BTreeMap::new(),
1506                }),
1507                "h5",
1508                5000,
1509            ),
1510            make_event(
1511                EventType::Move,
1512                "bn-001",
1513                EventData::Move(MoveData {
1514                    state: State::Done,
1515                    reason: Some("Shipped".into()),
1516                    extra: BTreeMap::new(),
1517                }),
1518                "h6",
1519                6000,
1520            ),
1521        ];
1522
1523        // Full replay
1524        let conn_full = test_db();
1525        let proj_full = Projector::new(&conn_full);
1526        proj_full.project_batch(&events).unwrap();
1527
1528        // Incremental: one by one
1529        let conn_inc = test_db();
1530        let proj_inc = Projector::new(&conn_inc);
1531        for event in &events {
1532            proj_inc.project_event(event).unwrap();
1533        }
1534
1535        // Compare results
1536        let item_full = query::get_item(&conn_full, "bn-001", false)
1537            .unwrap()
1538            .unwrap();
1539        let item_inc = query::get_item(&conn_inc, "bn-001", false)
1540            .unwrap()
1541            .unwrap();
1542
1543        assert_eq!(item_full.title, item_inc.title);
1544        assert_eq!(item_full.state, item_inc.state);
1545        assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1546
1547        let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1548        let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1549        assert_eq!(assignees_full.len(), assignees_inc.len());
1550
1551        let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1552        let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1553        assert_eq!(comments_full.len(), comments_inc.len());
1554    }
1555
1556    // -----------------------------------------------------------------------
1557    // Full rebuild (clear + replay)
1558    // -----------------------------------------------------------------------
1559
1560    #[test]
1561    fn clear_and_replay_produces_same_result() {
1562        let conn = test_db();
1563        let projector = Projector::new(&conn);
1564
1565        let events = vec![
1566            make_create("bn-001", "Item 1", "h1", 1000),
1567            make_create("bn-002", "Item 2", "h2", 1001),
1568            make_event(
1569                EventType::Link,
1570                "bn-002",
1571                EventData::Link(LinkData {
1572                    target: "bn-001".into(),
1573                    link_type: "blocks".into(),
1574                    extra: BTreeMap::new(),
1575                }),
1576                "h3",
1577                2000,
1578            ),
1579        ];
1580
1581        // First pass
1582        projector.project_batch(&events).unwrap();
1583        let count1: i64 = conn
1584            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1585            .unwrap();
1586        assert_eq!(count1, 2);
1587
1588        // Clear and replay
1589        clear_projection(&conn).unwrap();
1590        let count_after_clear: i64 = conn
1591            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1592            .unwrap();
1593        assert_eq!(count_after_clear, 0);
1594
1595        projector.project_batch(&events).unwrap();
1596        let count2: i64 = conn
1597            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1598            .unwrap();
1599        assert_eq!(count2, 2);
1600
1601        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1602        assert_eq!(deps.len(), 1);
1603    }
1604
1605    // -----------------------------------------------------------------------
1606    // Placeholder item creation
1607    // -----------------------------------------------------------------------
1608
1609    #[test]
1610    fn events_on_missing_item_create_placeholder() {
1611        let conn = test_db();
1612        let projector = Projector::new(&conn);
1613
1614        // Comment on an item that hasn't been created yet
1615        let comment = make_event(
1616            EventType::Comment,
1617            "bn-ghost",
1618            EventData::Comment(CommentData {
1619                body: "Comment on missing item".into(),
1620                extra: BTreeMap::new(),
1621            }),
1622            "ccc",
1623            2000,
1624        );
1625        projector.project_event(&comment).unwrap();
1626
1627        // Item exists as placeholder
1628        let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1629        assert_eq!(item.title, ""); // placeholder has empty title
1630        assert_eq!(item.state, "open");
1631    }
1632
1633    // -----------------------------------------------------------------------
1634    // FTS integration
1635    // -----------------------------------------------------------------------
1636
1637    #[test]
1638    fn project_create_populates_fts() {
1639        let conn = test_db();
1640        let projector = Projector::new(&conn);
1641        projector
1642            .project_event(&make_create(
1643                "bn-001",
1644                "Authentication timeout",
1645                "aaa",
1646                1000,
1647            ))
1648            .unwrap();
1649
1650        let hits = query::search(&conn, "authentication", 10).unwrap();
1651        assert_eq!(hits.len(), 1);
1652        assert_eq!(hits[0].item_id, "bn-001");
1653    }
1654
1655    #[test]
1656    fn project_update_title_updates_fts() {
1657        let conn = test_db();
1658        let projector = Projector::new(&conn);
1659        projector
1660            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1661            .unwrap();
1662
1663        let update = make_event(
1664            EventType::Update,
1665            "bn-001",
1666            EventData::Update(UpdateData {
1667                field: "title".into(),
1668                value: serde_json::json!("Authorization failure"),
1669                extra: BTreeMap::new(),
1670            }),
1671            "bbb",
1672            2000,
1673        );
1674        projector.project_event(&update).unwrap();
1675
1676        // Old title not found
1677        let hits_old = query::search(&conn, "Old", 10).unwrap();
1678        assert!(hits_old.is_empty());
1679
1680        // New title found
1681        let hits_new = query::search(&conn, "authorization", 10).unwrap();
1682        assert_eq!(hits_new.len(), 1);
1683    }
1684
1685    // -----------------------------------------------------------------------
1686    // Batch stats
1687    // -----------------------------------------------------------------------
1688
1689    #[test]
1690    fn batch_reports_correct_stats() {
1691        let conn = test_db();
1692        let projector = Projector::new(&conn);
1693
1694        let events = vec![
1695            make_create("bn-001", "Item 1", "h1", 1000),
1696            make_create("bn-002", "Item 2", "h2", 1001),
1697            make_create("bn-003", "Item 3", "h3", 1002),
1698        ];
1699
1700        let stats = projector.project_batch(&events).unwrap();
1701        assert_eq!(stats.projected, 3);
1702        assert_eq!(stats.duplicates, 0);
1703        assert_eq!(stats.errors, 0);
1704    }
1705
1706    // -----------------------------------------------------------------------
1707    // All 11 event types in sequence
1708    // -----------------------------------------------------------------------
1709
1710    #[test]
1711    fn full_lifecycle_all_event_types() {
1712        let conn = test_db();
1713        let projector = Projector::new(&conn);
1714
1715        let mut events = vec![
1716            // 1. Create
1717            make_create("bn-001", "Auth bug", "h01", 1000),
1718            // Create a second item for linking
1719            make_create("bn-002", "Dep item", "h02", 1001),
1720        ];
1721
1722        // 2. Update title
1723        events.push(make_event(
1724            EventType::Update,
1725            "bn-001",
1726            EventData::Update(UpdateData {
1727                field: "title".into(),
1728                value: serde_json::json!("Auth timeout bug"),
1729                extra: BTreeMap::new(),
1730            }),
1731            "h03",
1732            2000,
1733        ));
1734
1735        // 3. Move to doing
1736        events.push(make_event(
1737            EventType::Move,
1738            "bn-001",
1739            EventData::Move(MoveData {
1740                state: State::Doing,
1741                reason: None,
1742                extra: BTreeMap::new(),
1743            }),
1744            "h04",
1745            3000,
1746        ));
1747
1748        // 4. Assign
1749        events.push(make_event(
1750            EventType::Assign,
1751            "bn-001",
1752            EventData::Assign(AssignData {
1753                agent: "alice".into(),
1754                action: AssignAction::Assign,
1755                extra: BTreeMap::new(),
1756            }),
1757            "h05",
1758            4000,
1759        ));
1760
1761        // 5. Comment
1762        events.push(make_event(
1763            EventType::Comment,
1764            "bn-001",
1765            EventData::Comment(CommentData {
1766                body: "Found root cause".into(),
1767                extra: BTreeMap::new(),
1768            }),
1769            "h06",
1770            5000,
1771        ));
1772
1773        // 6. Link
1774        events.push(make_event(
1775            EventType::Link,
1776            "bn-001",
1777            EventData::Link(LinkData {
1778                target: "bn-002".into(),
1779                link_type: "blocks".into(),
1780                extra: BTreeMap::new(),
1781            }),
1782            "h07",
1783            6000,
1784        ));
1785
1786        // 7. Unlink
1787        events.push(make_event(
1788            EventType::Unlink,
1789            "bn-001",
1790            EventData::Unlink(UnlinkData {
1791                target: "bn-002".into(),
1792                link_type: Some("blocks".into()),
1793                extra: BTreeMap::new(),
1794            }),
1795            "h08",
1796            7000,
1797        ));
1798
1799        // 8. Compact
1800        events.push(make_event(
1801            EventType::Compact,
1802            "bn-001",
1803            EventData::Compact(CompactData {
1804                summary: "Auth token refresh race".into(),
1805                extra: BTreeMap::new(),
1806            }),
1807            "h09",
1808            8000,
1809        ));
1810
1811        // 9. Snapshot
1812        events.push(make_event(
1813            EventType::Snapshot,
1814            "bn-001",
1815            EventData::Snapshot(SnapshotData {
1816                state: serde_json::json!({"id": "bn-001", "resolved": true}),
1817                extra: BTreeMap::new(),
1818            }),
1819            "h10",
1820            9000,
1821        ));
1822
1823        // 10. Redact the comment
1824        events.push(make_event(
1825            EventType::Redact,
1826            "bn-001",
1827            EventData::Redact(RedactData {
1828                target_hash: "blake3:h06".into(),
1829                reason: "Contained secret".into(),
1830                extra: BTreeMap::new(),
1831            }),
1832            "h11",
1833            10000,
1834        ));
1835
1836        // 11. Delete
1837        events.push(make_event(
1838            EventType::Delete,
1839            "bn-001",
1840            EventData::Delete(DeleteData {
1841                reason: Some("Duplicate".into()),
1842                extra: BTreeMap::new(),
1843            }),
1844            "h12",
1845            11000,
1846        ));
1847
1848        let stats = projector.project_batch(&events).unwrap();
1849        assert_eq!(stats.projected, 12); // 2 creates + 10 mutations
1850        assert_eq!(stats.duplicates, 0);
1851        assert_eq!(stats.errors, 0);
1852
1853        // Verify final state
1854        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1855        assert_eq!(item.title, "Auth timeout bug");
1856        assert_eq!(item.state, "doing");
1857        assert!(item.is_deleted);
1858        assert_eq!(
1859            item.compact_summary.as_deref(),
1860            Some("Auth token refresh race")
1861        );
1862        let snapshot: Option<String> = conn
1863            .query_row(
1864                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1865                [],
1866                |row| row.get(0),
1867            )
1868            .unwrap();
1869        assert!(snapshot.is_some());
1870
1871        // Comment was redacted
1872        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1873        assert_eq!(comments.len(), 1);
1874        assert_eq!(comments[0].body, "[redacted]");
1875
1876        // Dependencies were unlinked
1877        let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1878        assert!(deps.is_empty());
1879
1880        // Redaction record exists
1881        let redaction_count: i64 = conn
1882            .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1883                row.get(0)
1884            })
1885            .unwrap();
1886        assert_eq!(redaction_count, 1);
1887    }
1888
1889    #[test]
1890    fn late_create_populates_placeholder() {
1891        let conn = test_db();
1892        let projector = Projector::new(&conn);
1893
1894        // 1. Comment triggers placeholder creation
1895        let comment = make_event(
1896            EventType::Comment,
1897            "bn-late",
1898            EventData::Comment(CommentData {
1899                body: "Comment first".into(),
1900                extra: BTreeMap::new(),
1901            }),
1902            "h1",
1903            1000,
1904        );
1905        projector.project_event(&comment).unwrap();
1906
1907        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1908        assert_eq!(item.title, "");
1909        assert_eq!(item.created_at_us, 1000);
1910
1911        // 2. Late Create arrives
1912        let create = make_create("bn-late", "Real Title", "h2", 900);
1913
1914        projector.project_event(&create).unwrap();
1915
1916        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1917        assert_eq!(item.title, "Real Title");
1918        assert_eq!(item.created_at_us, 900);
1919    }
1920
1921    /// Regression: Projector::new() must create the projected_events table
1922    /// on a migrated DB that never had ensure_tracking_table() called.
1923    /// Without this, fresh installs fail with "record projected event hash".
1924    #[test]
1925    fn projector_new_creates_tracking_table_on_fresh_db() {
1926        let mut conn = Connection::open_in_memory().expect("open in-memory db");
1927        migrations::migrate(&mut conn).expect("migrate");
1928        // Do NOT call ensure_tracking_table — simulate fresh install.
1929
1930        // Projector::new should create the table automatically.
1931        let projector = Projector::new(&conn);
1932        let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
1933        projector
1934            .project_event(&event)
1935            .expect("project_event should succeed on fresh DB");
1936
1937        let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
1938        assert_eq!(item.title, "Fresh item");
1939    }
1940}