Skip to main content

bones_core/db/
project.rs

1//! Event replay → `SQLite` projection pipeline.
2//!
3//! The [`Projector`] replays events from the TSJSON event log and upserts
4//! the resulting state into the `SQLite` projection database. It handles all
5//! 11 event types and supports both incremental (single-event) and full
6//! rebuild modes.
7//!
8//! # Deduplication
9//!
10//! Events are deduplicated by `event_hash`. When a duplicate hash is
11//! encountered (e.g. from git merge duplicating lines in shard files),
12//! the event is silently skipped.
13//!
14//! # Cursor
15//!
16//! After projecting a batch, the caller can persist the byte offset and
17//! last event hash via [`super::query::update_projection_cursor`] for
18//! incremental replay on next startup.
19
20use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29// ---------------------------------------------------------------------------
30// ProjectionStats
31// ---------------------------------------------------------------------------
32
33/// Statistics returned after a projection run.
34#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36    /// Number of events successfully projected.
37    pub projected: usize,
38    /// Number of duplicate events skipped.
39    pub duplicates: usize,
40    /// Number of events that caused errors (logged and skipped).
41    pub errors: usize,
42}
43
44// ---------------------------------------------------------------------------
45// Projector
46// ---------------------------------------------------------------------------
47
48/// Replays events into the `SQLite` projection.
49///
50/// Create a `Projector` with a connection, then call [`project_event`] for
51/// each event or [`project_batch`] for a slice.
52pub struct Projector<'conn> {
53    conn: &'conn Connection,
54    has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58    /// Create a new projector backed by the given connection.
59    ///
60    /// Ensures the `projected_events` tracking table exists before any
61    /// projection work is attempted.
62    pub fn new(conn: &'conn Connection) -> Self {
63        // Best-effort: if the DDL fails (e.g. read-only DB) we'll still
64        // attempt projection and let it fail at the INSERT instead.
65        let _ = ensure_tracking_table(conn);
66        let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67        Self {
68            conn,
69            has_agent_column,
70        }
71    }
72
73    /// Project a batch of events, returning aggregate statistics.
74    ///
75    /// Events are applied inside a single transaction for performance.
76    /// Duplicate events (same `event_hash`) are silently skipped.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the transaction fails to commit. Individual event
81    /// projection errors are counted in `stats.errors` but do not abort the
82    /// batch.
83    pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84        let mut stats = ProjectionStats::default();
85
86        // Use a savepoint so we can commit all-or-nothing
87        self.conn
88            .execute_batch("BEGIN IMMEDIATE")
89            .context("begin projection transaction")?;
90
91        for event in events {
92            match self.project_event_inner(event) {
93                Ok(ProjectResult::Projected) => stats.projected += 1,
94                Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95                Err(e) => {
96                    tracing::warn!(
97                        event_hash = %event.event_hash,
98                        event_type = %event.event_type,
99                        item_id = %event.item_id,
100                        error = %e,
101                        "skipping event due to projection error"
102                    );
103                    stats.errors += 1;
104                }
105            }
106        }
107
108        self.conn
109            .execute_batch("COMMIT")
110            .context("commit projection transaction")?;
111
112        if stats.errors > 0 {
113            let reason = format!(
114                "project_batch encountered {} projection errors while applying {} events",
115                stats.errors,
116                events.len()
117            );
118            if let Err(err) = crate::db::mark_projection_dirty_from_connection(self.conn, &reason) {
119                tracing::warn!(error = %err, "failed to mark projection dirty after batch errors");
120            }
121        }
122
123        Ok(stats)
124    }
125
126    /// Project a single event (outside of any managed transaction).
127    ///
128    /// Returns `true` if the event was projected, `false` if it was a
129    /// duplicate.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the projection fails.
134    pub fn project_event(&self, event: &Event) -> Result<bool> {
135        let projected = match self.project_event_inner(event) {
136            Ok(ProjectResult::Projected) => true,
137            Ok(ProjectResult::Duplicate) => false,
138            Err(err) => {
139                let reason = format!(
140                    "project_event failed hash={} type={}",
141                    event.event_hash, event.event_type
142                );
143                if let Err(mark_err) =
144                    crate::db::mark_projection_dirty_from_connection(self.conn, &reason)
145                {
146                    tracing::warn!(
147                        error = %mark_err,
148                        event_hash = %event.event_hash,
149                        "failed to mark projection dirty after single-event projection failure"
150                    );
151                }
152                return Err(err);
153            }
154        };
155
156        if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
157            tracing::warn!(
158                event_hash = %event.event_hash,
159                error = %err,
160                "failed to update projection cursor after single-event projection"
161            );
162            let reason = format!(
163                "cursor update failed after projecting hash={} error={err}",
164                event.event_hash
165            );
166            let _ = crate::db::mark_projection_dirty_from_connection(self.conn, &reason);
167        }
168
169        Ok(projected)
170    }
171
172    // -----------------------------------------------------------------------
173    // Internal dispatch
174    // -----------------------------------------------------------------------
175
176    fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
177        // Dedup check: skip if event_hash already projected
178        if self.is_event_projected(&event.event_hash)? {
179            return Ok(ProjectResult::Duplicate);
180        }
181
182        match event.event_type {
183            EventType::Create => self.project_create(event)?,
184            EventType::Update => self.project_update(event)?,
185            EventType::Move => self.project_move(event)?,
186            EventType::Assign => self.project_assign(event)?,
187            EventType::Comment => self.project_comment(event)?,
188            EventType::Link => self.project_link(event)?,
189            EventType::Unlink => self.project_unlink(event)?,
190            EventType::Delete => self.project_delete(event)?,
191            EventType::Compact => self.project_compact(event)?,
192            EventType::Snapshot => self.project_snapshot(event)?,
193            EventType::Redact => self.project_redact(event)?,
194        }
195
196        // Record that this event hash has been projected
197        self.record_projected_hash(&event.event_hash, event)?;
198
199        Ok(ProjectResult::Projected)
200    }
201
202    // -----------------------------------------------------------------------
203    // Dedup tracking
204    // -----------------------------------------------------------------------
205
206    fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
207        // Check item_comments table for comment events (unique on event_hash)
208        // and the event_redactions table for redact events.
209        // For a general dedup check, we use projection_meta's tracking.
210        // Simple approach: check if item_comments has this hash OR if
211        // event_redactions has it as target. For general dedup, we use
212        // a lightweight check in the projected_events tracking.
213        let exists: bool = self
214            .conn
215            .query_row(
216                "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
217                params![event_hash],
218                |row| row.get(0),
219            )
220            .unwrap_or(false);
221        Ok(exists)
222    }
223
224    fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
225        let exists: bool = self
226            .conn
227            .query_row(
228                "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
229                params![event_hash],
230                |row| row.get(0),
231            )
232            .unwrap_or(false);
233        Ok(exists)
234    }
235
236    fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
237        if self.has_agent_column {
238            self.conn
239                .execute(
240                    "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
241                     VALUES (?1, ?2, ?3, ?4, ?5)",
242                    params![
243                        event_hash,
244                        event.item_id.as_str(),
245                        event.event_type.as_str(),
246                        event.wall_ts_us,
247                        event.agent.as_str(),
248                    ],
249                )
250                .context("record projected event hash")?;
251            return Ok(());
252        }
253
254        self.conn
255            .execute(
256                "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
257                 VALUES (?1, ?2, ?3, ?4)",
258                params![
259                    event_hash,
260                    event.item_id.as_str(),
261                    event.event_type.as_str(),
262                    event.wall_ts_us,
263                ],
264            )
265            .context("record projected event hash")?;
266        Ok(())
267    }
268
269    fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
270        let main_db_file: String = self
271            .conn
272            .query_row(
273                "SELECT file FROM pragma_database_list WHERE name = 'main'",
274                [],
275                |row| row.get(0),
276            )
277            .context("read main database file from pragma_database_list")?;
278
279        if main_db_file.trim().is_empty() {
280            // In-memory projections used in tests have no on-disk log context.
281            return Ok(());
282        }
283
284        let db_path = std::path::Path::new(&main_db_file);
285        let Some(bones_dir) = db_path.parent() else {
286            return Ok(());
287        };
288
289        let shard_mgr = ShardManager::new(bones_dir);
290        let total_len = shard_mgr
291            .total_content_len()
292            .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
293        let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
294
295        query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
296            .context("write projection cursor after single-event projection")?;
297
298        Ok(())
299    }
300
301    // -----------------------------------------------------------------------
302    // Event type handlers
303    // -----------------------------------------------------------------------
304
305    fn project_create(&self, event: &Event) -> Result<()> {
306        let EventData::Create(ref data) = event.data else {
307            anyhow::bail!("expected Create data for item.create event");
308        };
309
310        let is_redacted = self.is_event_redacted(&event.event_hash)?;
311        let title = if is_redacted { "" } else { &data.title };
312        let description = if is_redacted {
313            None
314        } else {
315            data.description.as_deref()
316        };
317        let labels_str = if is_redacted {
318            String::new()
319        } else {
320            data.labels.join(" ")
321        };
322
323        self.conn
324            .execute(
325                "INSERT INTO items (
326                    item_id, title, description, kind, state, urgency,
327                    size, parent_id, is_deleted, search_labels,
328                    created_at_us, updated_at_us
329                ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
330                ON CONFLICT(item_id) DO UPDATE SET
331                    title = excluded.title,
332                    description = excluded.description,
333                    kind = excluded.kind,
334                    urgency = excluded.urgency,
335                    size = excluded.size,
336                    parent_id = excluded.parent_id,
337                    created_at_us = excluded.created_at_us,
338                    search_labels = excluded.search_labels,
339                    updated_at_us = excluded.updated_at_us
340                WHERE items.title = ''",
341                params![
342                    event.item_id.as_str(),
343                    title,
344                    description,
345                    data.kind.to_string(),
346                    data.urgency.to_string(),
347                    data.size.map(|s| s.to_string()),
348                    data.parent.as_deref(),
349                    labels_str,
350                    event.wall_ts_us,
351                    event.wall_ts_us,
352                ],
353            )
354            .with_context(|| format!("project create for {}", event.item_id))?;
355
356        // Insert initial labels
357        if !is_redacted {
358            for label in &data.labels {
359                self.conn
360                    .execute(
361                        "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
362                         VALUES (?1, ?2, ?3)",
363                        params![event.item_id.as_str(), label, event.wall_ts_us],
364                    )
365                    .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
366            }
367        }
368
369        Ok(())
370    }
371
372    #[allow(clippy::too_many_lines)]
373    fn project_update(&self, event: &Event) -> Result<()> {
374        let EventData::Update(ref data) = event.data else {
375            anyhow::bail!("expected Update data for item.update event");
376        };
377
378        self.ensure_item_exists(event)?;
379
380        let is_redacted = self.is_event_redacted(&event.event_hash)?;
381
382        match data.field.as_str() {
383            "title" => {
384                let value = if is_redacted {
385                    "[redacted]"
386                } else {
387                    data.value.as_str().unwrap_or_default()
388                };
389                self.conn.execute(
390                    "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
391                    params![value, event.wall_ts_us, event.item_id.as_str()],
392                )?;
393            }
394            "description" => {
395                let value = if is_redacted {
396                    Some("[redacted]".to_string())
397                } else {
398                    data.value.as_str().map(String::from)
399                };
400                self.conn.execute(
401                    "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
402                    params![value, event.wall_ts_us, event.item_id.as_str()],
403                )?;
404            }
405            "kind" => {
406                let value = data.value.as_str().unwrap_or("task");
407                self.conn.execute(
408                    "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
409                    params![value, event.wall_ts_us, event.item_id.as_str()],
410                )?;
411            }
412            "size" => {
413                let value = data.value.as_str().map(String::from);
414                self.conn.execute(
415                    "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
416                    params![value, event.wall_ts_us, event.item_id.as_str()],
417                )?;
418            }
419            "urgency" => {
420                let value = data.value.as_str().unwrap_or("default");
421                self.conn.execute(
422                    "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
423                    params![value, event.wall_ts_us, event.item_id.as_str()],
424                )?;
425            }
426            "parent" => {
427                let value = data.value.as_str().map(String::from);
428                self.conn.execute(
429                    "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
430                    params![value, event.wall_ts_us, event.item_id.as_str()],
431                )?;
432            }
433            "labels" => {
434                // Labels update: supports both legacy array replacement
435                // and new add/remove action format (CRDT-friendly).
436                let mut changed = false;
437
438                if let Some(labels) = data.value.as_array() {
439                    // Legacy: replace entire label set
440                    self.conn.execute(
441                        "DELETE FROM item_labels WHERE item_id = ?1",
442                        params![event.item_id.as_str()],
443                    )?;
444
445                    for label_val in labels {
446                        if let Some(label) = label_val.as_str() {
447                            self.conn.execute(
448                                "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
449                                 VALUES (?1, ?2, ?3)",
450                                params![event.item_id.as_str(), label, event.wall_ts_us],
451                            )?;
452                        }
453                    }
454                    changed = true;
455                } else if let Some(obj) = data.value.as_object() {
456                    // New: add/remove single label
457                    let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
458                    let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
459
460                    if !label.is_empty() {
461                        match action {
462                            "add" => {
463                                self.conn.execute(
464                                    "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
465                                     VALUES (?1, ?2, ?3)",
466                                    params![event.item_id.as_str(), label, event.wall_ts_us],
467                                )?;
468                                changed = true;
469                            }
470                            "remove" => {
471                                self.conn.execute(
472                                    "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
473                                    params![event.item_id.as_str(), label],
474                                )?;
475                                changed = true;
476                            }
477                            _ => {}
478                        }
479                    }
480                }
481
482                if changed {
483                    // Reconstruct search_labels from item_labels table to keep FTS in sync.
484                    let mut stmt = self.conn.prepare(
485                        "SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label",
486                    )?;
487                    let label_rows = stmt.query_map(params![event.item_id.as_str()], |row| {
488                        row.get::<_, String>(0)
489                    })?;
490
491                    let mut label_strings = Vec::new();
492                    for label_res in label_rows {
493                        label_strings.push(label_res?);
494                    }
495
496                    let search_labels = label_strings.join(" ");
497                    self.conn.execute(
498                        "UPDATE items SET search_labels = ?1, updated_at_us = ?2 WHERE item_id = ?3",
499                        params![search_labels, event.wall_ts_us, event.item_id.as_str()],
500                    )?;
501                }
502            }
503            _ => {
504                // Unknown field — just bump updated_at
505                tracing::debug!(
506                    field = %data.field,
507                    item_id = %event.item_id,
508                    "ignoring update for unknown field"
509                );
510                self.conn.execute(
511                    "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
512                    params![event.wall_ts_us, event.item_id.as_str()],
513                )?;
514            }
515        }
516
517        Ok(())
518    }
519
520    fn project_move(&self, event: &Event) -> Result<()> {
521        let EventData::Move(ref data) = event.data else {
522            anyhow::bail!("expected Move data for item.move event");
523        };
524
525        self.ensure_item_exists(event)?;
526
527        self.conn
528            .execute(
529                "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
530                params![
531                    data.state.to_string(),
532                    event.wall_ts_us,
533                    event.item_id.as_str(),
534                ],
535            )
536            .with_context(|| format!("project move for {}", event.item_id))?;
537
538        Ok(())
539    }
540
541    fn project_assign(&self, event: &Event) -> Result<()> {
542        let EventData::Assign(ref data) = event.data else {
543            anyhow::bail!("expected Assign data for item.assign event");
544        };
545
546        self.ensure_item_exists(event)?;
547
548        match data.action {
549            AssignAction::Assign => {
550                self.conn
551                    .execute(
552                        "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
553                         VALUES (?1, ?2, ?3)",
554                        params![event.item_id.as_str(), data.agent, event.wall_ts_us],
555                    )
556                    .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
557            }
558            AssignAction::Unassign => {
559                self.conn
560                    .execute(
561                        "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
562                        params![event.item_id.as_str(), data.agent],
563                    )
564                    .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
565            }
566        }
567
568        // Bump updated_at
569        self.conn.execute(
570            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
571            params![event.wall_ts_us, event.item_id.as_str()],
572        )?;
573
574        Ok(())
575    }
576
577    fn project_comment(&self, event: &Event) -> Result<()> {
578        let EventData::Comment(ref data) = event.data else {
579            anyhow::bail!("expected Comment data for item.comment event");
580        };
581
582        self.ensure_item_exists(event)?;
583
584        let is_redacted = self.is_event_redacted(&event.event_hash)?;
585        let body = if is_redacted {
586            "[redacted]"
587        } else {
588            &data.body
589        };
590
591        self.conn
592            .execute(
593                "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
594                 VALUES (?1, ?2, ?3, ?4, ?5)",
595                params![
596                    event.item_id.as_str(),
597                    event.event_hash,
598                    event.agent,
599                    body,
600                    event.wall_ts_us,
601                ],
602            )
603            .with_context(|| format!("project comment for {}", event.item_id))?;
604
605        // Bump updated_at
606        self.conn.execute(
607            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
608            params![event.wall_ts_us, event.item_id.as_str()],
609        )?;
610
611        Ok(())
612    }
613
614    fn project_link(&self, event: &Event) -> Result<()> {
615        let EventData::Link(ref data) = event.data else {
616            anyhow::bail!("expected Link data for item.link event");
617        };
618
619        self.ensure_item_exists(event)?;
620
621        self.conn
622            .execute(
623                "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
624                 VALUES (?1, ?2, ?3, ?4)",
625                params![
626                    event.item_id.as_str(),
627                    data.target,
628                    data.link_type,
629                    event.wall_ts_us,
630                ],
631            )
632            .with_context(|| {
633                format!("project link {} -> {}", event.item_id, data.target)
634            })?;
635
636        // Bump updated_at
637        self.conn.execute(
638            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
639            params![event.wall_ts_us, event.item_id.as_str()],
640        )?;
641
642        Ok(())
643    }
644
645    fn project_unlink(&self, event: &Event) -> Result<()> {
646        let EventData::Unlink(ref data) = event.data else {
647            anyhow::bail!("expected Unlink data for item.unlink event");
648        };
649
650        self.ensure_item_exists(event)?;
651
652        if let Some(ref link_type) = data.link_type {
653            self.conn
654                .execute(
655                    "DELETE FROM item_dependencies \
656                     WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
657                    params![event.item_id.as_str(), data.target, link_type],
658                )
659                .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
660        } else {
661            // No link_type: remove all links to target
662            self.conn
663                .execute(
664                    "DELETE FROM item_dependencies \
665                     WHERE item_id = ?1 AND depends_on_item_id = ?2",
666                    params![event.item_id.as_str(), data.target],
667                )
668                .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
669        }
670
671        // Bump updated_at
672        self.conn.execute(
673            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
674            params![event.wall_ts_us, event.item_id.as_str()],
675        )?;
676
677        Ok(())
678    }
679
680    fn project_delete(&self, event: &Event) -> Result<()> {
681        self.ensure_item_exists(event)?;
682
683        self.conn
684            .execute(
685                "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
686                 WHERE item_id = ?2",
687                params![event.wall_ts_us, event.item_id.as_str()],
688            )
689            .with_context(|| format!("project delete for {}", event.item_id))?;
690
691        Ok(())
692    }
693
694    fn project_compact(&self, event: &Event) -> Result<()> {
695        let EventData::Compact(ref data) = event.data else {
696            anyhow::bail!("expected Compact data for item.compact event");
697        };
698
699        self.ensure_item_exists(event)?;
700
701        let is_redacted = self.is_event_redacted(&event.event_hash)?;
702        let summary = if is_redacted {
703            "[redacted]"
704        } else {
705            data.summary.as_str()
706        };
707
708        self.conn
709            .execute(
710                "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
711                params![summary, event.wall_ts_us, event.item_id.as_str()],
712            )
713            .with_context(|| format!("project compact for {}", event.item_id))?;
714
715        Ok(())
716    }
717
718    fn project_snapshot(&self, event: &Event) -> Result<()> {
719        let EventData::Snapshot(ref data) = event.data else {
720            anyhow::bail!("expected Snapshot data for item.snapshot event");
721        };
722
723        self.ensure_item_exists(event)?;
724
725        let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
726
727        self.conn
728            .execute(
729                "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
730                params![json_str, event.wall_ts_us, event.item_id.as_str()],
731            )
732            .with_context(|| format!("project snapshot for {}", event.item_id))?;
733
734        Ok(())
735    }
736
737    fn project_redact(&self, event: &Event) -> Result<()> {
738        let EventData::Redact(ref data) = event.data else {
739            anyhow::bail!("expected Redact data for item.redact event");
740        };
741
742        // Insert redaction record
743        self.conn
744            .execute(
745                "INSERT OR IGNORE INTO event_redactions \
746                 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
747                 VALUES (?1, ?2, ?3, ?4, ?5)",
748                params![
749                    data.target_hash,
750                    event.item_id.as_str(),
751                    data.reason,
752                    event.agent,
753                    event.wall_ts_us,
754                ],
755            )
756            .with_context(|| {
757                format!(
758                    "project redact for {} targeting {}",
759                    event.item_id, data.target_hash
760                )
761            })?;
762
763        // Redact the comment body if the target hash is a comment event
764        self.conn
765            .execute(
766                "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
767                params![data.target_hash],
768            )
769            .context("redact comment body")?;
770
771        Ok(())
772    }
773
774    // -----------------------------------------------------------------------
775    // Helpers
776    // -----------------------------------------------------------------------
777
778    /// Ensure the item exists in the projection. If not, create a placeholder
779    /// row so that subsequent operations (UPDATE, foreign keys) succeed.
780    ///
781    /// This handles out-of-order event replay and missing create events.
782    fn ensure_item_exists(&self, event: &Event) -> Result<()> {
783        let exists: bool = self
784            .conn
785            .query_row(
786                "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
787                params![event.item_id.as_str()],
788                |row| row.get(0),
789            )
790            .context("check item exists")?;
791
792        if !exists {
793            self.conn
794                .execute(
795                    "INSERT INTO items (
796                        item_id, title, kind, state, urgency,
797                        is_deleted, search_labels, created_at_us, updated_at_us
798                    ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
799                    params![event.item_id.as_str(), event.wall_ts_us],
800                )
801                .with_context(|| format!("create placeholder item for {}", event.item_id))?;
802        }
803
804        Ok(())
805    }
806}
807
808enum ProjectResult {
809    Projected,
810    Duplicate,
811}
812
813// ---------------------------------------------------------------------------
814// Schema addition: projected_events tracking table
815// ---------------------------------------------------------------------------
816
817/// SQL to create the `projected_events` tracking table.
818///
819/// This is applied as part of the projection setup, not as a schema migration,
820/// because it is projection-internal bookkeeping.
821pub const PROJECTED_EVENTS_DDL: &str = "\
822CREATE TABLE IF NOT EXISTS projected_events (
823    event_hash TEXT PRIMARY KEY,
824    item_id TEXT NOT NULL,
825    event_type TEXT NOT NULL,
826    projected_at_us INTEGER NOT NULL,
827    agent TEXT NOT NULL DEFAULT ''
828);
829CREATE INDEX IF NOT EXISTS idx_projected_events_item
830    ON projected_events(item_id);
831CREATE INDEX IF NOT EXISTS idx_projected_events_agent
832    ON projected_events(agent);
833";
834
835/// Ensure the `projected_events` tracking table exists.
836///
837/// Call this once after opening the projection database and before
838/// projecting events.
839///
840/// # Errors
841///
842/// Returns an error if executing the DDL fails.
843pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
844    conn.execute_batch(PROJECTED_EVENTS_DDL)
845        .context("create projected_events tracking table")?;
846
847    if !projected_events_has_agent_column(conn)? {
848        conn.execute(
849            "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
850            [],
851        )
852        .context("add agent column to projected_events")?;
853    }
854
855    conn.execute_batch(
856        "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
857    )
858    .context("create projected_events_agent index")?;
859
860    Ok(())
861}
862
863fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
864    let mut stmt = conn
865        .prepare("PRAGMA table_info(projected_events)")
866        .context("inspect projected_events schema")?;
867    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
868
869    for row in rows {
870        let name = row.context("read projected_events column")?;
871        if name == "agent" {
872            return Ok(true);
873        }
874    }
875
876    Ok(false)
877}
878
879/// Drop all projection data for a full rebuild.
880///
881/// Clears all items, edge tables, comments, redactions, FTS index, and the
882/// projected events tracking table. Schema structure is preserved.
883///
884/// # Errors
885///
886/// Returns an error if the truncation fails.
887pub fn clear_projection(conn: &Connection) -> Result<()> {
888    conn.execute_batch(
889        "DELETE FROM event_redactions;
890         DELETE FROM item_comments;
891         DELETE FROM item_dependencies;
892         DELETE FROM item_assignees;
893         DELETE FROM item_labels;
894         DELETE FROM items;
895         DELETE FROM projected_events;
896         UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
897    )
898    .context("clear projection tables")?;
899    Ok(())
900}
901
902// ---------------------------------------------------------------------------
903// Tests
904// ---------------------------------------------------------------------------
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909    use crate::db::{migrations, query};
910    use crate::event::data::*;
911    use crate::event::types::EventType;
912    use crate::event::writer;
913    use crate::model::item::{Kind, Size, State, Urgency};
914    use crate::model::item_id::ItemId;
915    use crate::shard::ShardManager;
916    use rusqlite::Connection;
917    use std::collections::BTreeMap;
918    use tempfile::TempDir;
919
920    fn test_db() -> Connection {
921        let mut conn = Connection::open_in_memory().expect("open in-memory db");
922        migrations::migrate(&mut conn).expect("migrate");
923        ensure_tracking_table(&conn).expect("create tracking table");
924        conn
925    }
926
927    fn make_event(
928        event_type: EventType,
929        item_id: &str,
930        data: EventData,
931        hash: &str,
932        ts: i64,
933    ) -> Event {
934        Event {
935            wall_ts_us: ts,
936            agent: "test-agent".into(),
937            itc: "itc:AQ".into(),
938            parents: vec![],
939            event_type,
940            item_id: ItemId::new_unchecked(item_id),
941            data,
942            event_hash: format!("blake3:{hash}"),
943        }
944    }
945
946    fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
947        make_event(
948            EventType::Create,
949            id,
950            EventData::Create(CreateData {
951                title: title.into(),
952                kind: Kind::Task,
953                size: Some(Size::M),
954                urgency: Urgency::Default,
955                labels: vec!["backend".into(), "auth".into()],
956                parent: None,
957                causation: None,
958                description: Some("A detailed description".into()),
959                extra: BTreeMap::new(),
960            }),
961            hash,
962            ts,
963        )
964    }
965
966    #[test]
967    fn project_event_updates_projection_cursor_for_file_backed_db() {
968        let dir = TempDir::new().expect("tempdir");
969        let bones_dir = dir.path().join(".bones");
970        std::fs::create_dir_all(&bones_dir).expect("create .bones");
971
972        let shard_mgr = ShardManager::new(&bones_dir);
973        shard_mgr.init().expect("init shard manager");
974
975        let db_path = bones_dir.join("bones.db");
976        let mut conn = Connection::open(&db_path).expect("open projection db");
977        migrations::migrate(&mut conn).expect("migrate");
978        ensure_tracking_table(&conn).expect("create tracking table");
979
980        let projector = Projector::new(&conn);
981        let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
982        let line = writer::write_event(&mut event).expect("serialize event");
983        shard_mgr
984            .append(&line, false, std::time::Duration::from_secs(5))
985            .expect("append event line");
986
987        projector.project_event(&event).expect("project event");
988
989        let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
990        let expected_offset =
991            i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
992
993        assert_eq!(offset, expected_offset);
994        assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
995    }
996
997    // -----------------------------------------------------------------------
998    // Create
999    // -----------------------------------------------------------------------
1000
1001    #[test]
1002    fn project_create_inserts_item() {
1003        let conn = test_db();
1004        let projector = Projector::new(&conn);
1005        let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
1006
1007        let result = projector.project_event(&event).unwrap();
1008        assert!(result, "should return true for new projection");
1009
1010        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1011        assert_eq!(item.title, "Fix auth timeout");
1012        assert_eq!(item.kind, "task");
1013        assert_eq!(item.state, "open");
1014        assert_eq!(item.urgency, "default");
1015        assert_eq!(item.size.as_deref(), Some("m"));
1016        assert_eq!(item.description.as_deref(), Some("A detailed description"));
1017        assert_eq!(item.created_at_us, 1000);
1018        assert_eq!(item.updated_at_us, 1000);
1019    }
1020
1021    #[test]
1022    fn project_create_inserts_labels() {
1023        let conn = test_db();
1024        let projector = Projector::new(&conn);
1025        let event = make_create("bn-001", "Fix auth", "aaa", 1000);
1026        projector.project_event(&event).unwrap();
1027
1028        let labels = query::get_labels(&conn, "bn-001").unwrap();
1029        assert_eq!(labels.len(), 2);
1030        assert_eq!(labels[0].label, "auth");
1031        assert_eq!(labels[1].label, "backend");
1032    }
1033
1034    // -----------------------------------------------------------------------
1035    // Update
1036    // -----------------------------------------------------------------------
1037
1038    #[test]
1039    fn project_update_title() {
1040        let conn = test_db();
1041        let projector = Projector::new(&conn);
1042        projector
1043            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1044            .unwrap();
1045
1046        let update = make_event(
1047            EventType::Update,
1048            "bn-001",
1049            EventData::Update(UpdateData {
1050                field: "title".into(),
1051                value: serde_json::json!("New title"),
1052                extra: BTreeMap::new(),
1053            }),
1054            "bbb",
1055            2000,
1056        );
1057        projector.project_event(&update).unwrap();
1058
1059        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1060        assert_eq!(item.title, "New title");
1061        assert_eq!(item.updated_at_us, 2000);
1062    }
1063
1064    #[test]
1065    fn project_update_description() {
1066        let conn = test_db();
1067        let projector = Projector::new(&conn);
1068        projector
1069            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1070            .unwrap();
1071
1072        let update = make_event(
1073            EventType::Update,
1074            "bn-001",
1075            EventData::Update(UpdateData {
1076                field: "description".into(),
1077                value: serde_json::json!("Updated description"),
1078                extra: BTreeMap::new(),
1079            }),
1080            "bbb",
1081            2000,
1082        );
1083        projector.project_event(&update).unwrap();
1084
1085        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1086        assert_eq!(item.description.as_deref(), Some("Updated description"));
1087    }
1088
1089    #[test]
1090    fn project_update_labels() {
1091        let conn = test_db();
1092        let projector = Projector::new(&conn);
1093        projector
1094            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1095            .unwrap();
1096
1097        let update = make_event(
1098            EventType::Update,
1099            "bn-001",
1100            EventData::Update(UpdateData {
1101                field: "labels".into(),
1102                value: serde_json::json!(["frontend", "urgent"]),
1103                extra: BTreeMap::new(),
1104            }),
1105            "bbb",
1106            2000,
1107        );
1108        projector.project_event(&update).unwrap();
1109
1110        let labels = query::get_labels(&conn, "bn-001").unwrap();
1111        assert_eq!(labels.len(), 2);
1112        assert_eq!(labels[0].label, "frontend");
1113        assert_eq!(labels[1].label, "urgent");
1114    }
1115
1116    #[test]
1117    fn project_update_unknown_field_bumps_updated() {
1118        let conn = test_db();
1119        let projector = Projector::new(&conn);
1120        projector
1121            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1122            .unwrap();
1123
1124        let update = make_event(
1125            EventType::Update,
1126            "bn-001",
1127            EventData::Update(UpdateData {
1128                field: "future_field".into(),
1129                value: serde_json::json!("whatever"),
1130                extra: BTreeMap::new(),
1131            }),
1132            "bbb",
1133            2000,
1134        );
1135        projector.project_event(&update).unwrap();
1136
1137        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1138        assert_eq!(item.updated_at_us, 2000);
1139    }
1140
1141    // -----------------------------------------------------------------------
1142    // Move
1143    // -----------------------------------------------------------------------
1144
1145    #[test]
1146    fn project_move_updates_state() {
1147        let conn = test_db();
1148        let projector = Projector::new(&conn);
1149        projector
1150            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1151            .unwrap();
1152
1153        let mv = make_event(
1154            EventType::Move,
1155            "bn-001",
1156            EventData::Move(MoveData {
1157                state: State::Doing,
1158                reason: Some("Starting work".into()),
1159                extra: BTreeMap::new(),
1160            }),
1161            "bbb",
1162            2000,
1163        );
1164        projector.project_event(&mv).unwrap();
1165
1166        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1167        assert_eq!(item.state, "doing");
1168    }
1169
1170    // -----------------------------------------------------------------------
1171    // Assign / Unassign
1172    // -----------------------------------------------------------------------
1173
1174    #[test]
1175    fn project_assign_and_unassign() {
1176        let conn = test_db();
1177        let projector = Projector::new(&conn);
1178        projector
1179            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1180            .unwrap();
1181
1182        // Assign alice
1183        let assign = make_event(
1184            EventType::Assign,
1185            "bn-001",
1186            EventData::Assign(AssignData {
1187                agent: "alice".into(),
1188                action: AssignAction::Assign,
1189                extra: BTreeMap::new(),
1190            }),
1191            "bbb",
1192            2000,
1193        );
1194        projector.project_event(&assign).unwrap();
1195
1196        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1197        assert_eq!(assignees.len(), 1);
1198        assert_eq!(assignees[0].agent, "alice");
1199
1200        // Unassign alice
1201        let unassign = make_event(
1202            EventType::Assign,
1203            "bn-001",
1204            EventData::Assign(AssignData {
1205                agent: "alice".into(),
1206                action: AssignAction::Unassign,
1207                extra: BTreeMap::new(),
1208            }),
1209            "ccc",
1210            3000,
1211        );
1212        projector.project_event(&unassign).unwrap();
1213
1214        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1215        assert!(assignees.is_empty());
1216    }
1217
1218    // -----------------------------------------------------------------------
1219    // Comment
1220    // -----------------------------------------------------------------------
1221
1222    #[test]
1223    fn project_comment_inserts_row() {
1224        let conn = test_db();
1225        let projector = Projector::new(&conn);
1226        projector
1227            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1228            .unwrap();
1229
1230        let comment = make_event(
1231            EventType::Comment,
1232            "bn-001",
1233            EventData::Comment(CommentData {
1234                body: "This is a comment".into(),
1235                extra: BTreeMap::new(),
1236            }),
1237            "bbb",
1238            2000,
1239        );
1240        projector.project_event(&comment).unwrap();
1241
1242        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1243        assert_eq!(comments.len(), 1);
1244        assert_eq!(comments[0].body, "This is a comment");
1245        assert_eq!(comments[0].author, "test-agent");
1246        assert_eq!(comments[0].event_hash, "blake3:bbb");
1247    }
1248
1249    // -----------------------------------------------------------------------
1250    // Link / Unlink
1251    // -----------------------------------------------------------------------
1252
1253    #[test]
1254    fn project_link_and_unlink() {
1255        let conn = test_db();
1256        let projector = Projector::new(&conn);
1257        projector
1258            .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1259            .unwrap();
1260        projector
1261            .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1262            .unwrap();
1263
1264        // Link
1265        let link = make_event(
1266            EventType::Link,
1267            "bn-002",
1268            EventData::Link(LinkData {
1269                target: "bn-001".into(),
1270                link_type: "blocks".into(),
1271                extra: BTreeMap::new(),
1272            }),
1273            "ccc",
1274            2000,
1275        );
1276        projector.project_event(&link).unwrap();
1277
1278        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1279        assert_eq!(deps.len(), 1);
1280        assert_eq!(deps[0].depends_on_item_id, "bn-001");
1281
1282        // Unlink
1283        let unlink = make_event(
1284            EventType::Unlink,
1285            "bn-002",
1286            EventData::Unlink(UnlinkData {
1287                target: "bn-001".into(),
1288                link_type: Some("blocks".into()),
1289                extra: BTreeMap::new(),
1290            }),
1291            "ddd",
1292            3000,
1293        );
1294        projector.project_event(&unlink).unwrap();
1295
1296        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1297        assert!(deps.is_empty());
1298    }
1299
1300    // -----------------------------------------------------------------------
1301    // Delete
1302    // -----------------------------------------------------------------------
1303
1304    #[test]
1305    fn project_delete_soft_deletes() {
1306        let conn = test_db();
1307        let projector = Projector::new(&conn);
1308        projector
1309            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1310            .unwrap();
1311
1312        let delete = make_event(
1313            EventType::Delete,
1314            "bn-001",
1315            EventData::Delete(DeleteData {
1316                reason: Some("Duplicate".into()),
1317                extra: BTreeMap::new(),
1318            }),
1319            "bbb",
1320            2000,
1321        );
1322        projector.project_event(&delete).unwrap();
1323
1324        // Not visible without include_deleted
1325        assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1326        // Visible with include_deleted
1327        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1328        assert!(item.is_deleted);
1329        assert_eq!(item.deleted_at_us, Some(2000));
1330    }
1331
1332    // -----------------------------------------------------------------------
1333    // Compact
1334    // -----------------------------------------------------------------------
1335
1336    #[test]
1337    fn project_compact_sets_summary() {
1338        let conn = test_db();
1339        let projector = Projector::new(&conn);
1340        projector
1341            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1342            .unwrap();
1343
1344        let compact = make_event(
1345            EventType::Compact,
1346            "bn-001",
1347            EventData::Compact(CompactData {
1348                summary: "TL;DR: auth fix".into(),
1349                extra: BTreeMap::new(),
1350            }),
1351            "bbb",
1352            2000,
1353        );
1354        projector.project_event(&compact).unwrap();
1355
1356        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1357        assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1358    }
1359
1360    // -----------------------------------------------------------------------
1361    // Snapshot
1362    // -----------------------------------------------------------------------
1363
1364    #[test]
1365    fn project_snapshot_stores_json() {
1366        let conn = test_db();
1367        let projector = Projector::new(&conn);
1368        projector
1369            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1370            .unwrap();
1371
1372        let snapshot = make_event(
1373            EventType::Snapshot,
1374            "bn-001",
1375            EventData::Snapshot(SnapshotData {
1376                state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1377                extra: BTreeMap::new(),
1378            }),
1379            "bbb",
1380            2000,
1381        );
1382        projector.project_event(&snapshot).unwrap();
1383
1384        let row: String = conn
1385            .query_row(
1386                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1387                [],
1388                |row| row.get(0),
1389            )
1390            .unwrap();
1391        let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1392        assert_eq!(parsed["title"], "Snapshotted");
1393    }
1394
1395    // -----------------------------------------------------------------------
1396    // Redact
1397    // -----------------------------------------------------------------------
1398
1399    #[test]
1400    fn project_redact_records_and_blanks_comment() {
1401        let conn = test_db();
1402        let projector = Projector::new(&conn);
1403        projector
1404            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1405            .unwrap();
1406
1407        // First add a comment
1408        let comment = make_event(
1409            EventType::Comment,
1410            "bn-001",
1411            EventData::Comment(CommentData {
1412                body: "Secret password: hunter2".into(),
1413                extra: BTreeMap::new(),
1414            }),
1415            "comment_hash",
1416            2000,
1417        );
1418        projector.project_event(&comment).unwrap();
1419
1420        // Redact it
1421        let redact = make_event(
1422            EventType::Redact,
1423            "bn-001",
1424            EventData::Redact(RedactData {
1425                target_hash: "blake3:comment_hash".into(),
1426                reason: "Accidental secret".into(),
1427                extra: BTreeMap::new(),
1428            }),
1429            "redact_hash",
1430            3000,
1431        );
1432        projector.project_event(&redact).unwrap();
1433
1434        // Check redaction record
1435        let count: i64 = conn
1436            .query_row(
1437                "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1438                [],
1439                |row| row.get(0),
1440            )
1441            .unwrap();
1442        assert_eq!(count, 1);
1443
1444        // Check comment body is redacted
1445        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1446        assert_eq!(comments.len(), 1);
1447        assert_eq!(comments[0].body, "[redacted]");
1448    }
1449
1450    // -----------------------------------------------------------------------
1451    // Dedup
1452    // -----------------------------------------------------------------------
1453
1454    #[test]
1455    fn duplicate_events_are_skipped() {
1456        let conn = test_db();
1457        let projector = Projector::new(&conn);
1458
1459        let event = make_create("bn-001", "Item", "aaa", 1000);
1460        assert!(projector.project_event(&event).unwrap()); // first time
1461        assert!(!projector.project_event(&event).unwrap()); // duplicate
1462
1463        // Only one item created
1464        let count: i64 = conn
1465            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1466            .unwrap();
1467        assert_eq!(count, 1);
1468    }
1469
1470    #[test]
1471    fn batch_dedup_counts() {
1472        let conn = test_db();
1473        let projector = Projector::new(&conn);
1474
1475        let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1476        let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1477
1478        // Project first batch
1479        let stats1 = projector
1480            .project_batch(&[event1.clone(), event2.clone()])
1481            .unwrap();
1482        assert_eq!(stats1.projected, 2);
1483        assert_eq!(stats1.duplicates, 0);
1484
1485        // Replay same batch — all duplicates
1486        let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1487        assert_eq!(stats2.projected, 0);
1488        assert_eq!(stats2.duplicates, 2);
1489    }
1490
1491    // -----------------------------------------------------------------------
1492    // Full replay: incremental matches full rebuild
1493    // -----------------------------------------------------------------------
1494
1495    #[test]
1496    fn incremental_matches_full_replay() {
1497        let events = vec![
1498            make_create("bn-001", "Auth bug", "h1", 1000),
1499            make_event(
1500                EventType::Move,
1501                "bn-001",
1502                EventData::Move(MoveData {
1503                    state: State::Doing,
1504                    reason: None,
1505                    extra: BTreeMap::new(),
1506                }),
1507                "h2",
1508                2000,
1509            ),
1510            make_event(
1511                EventType::Assign,
1512                "bn-001",
1513                EventData::Assign(AssignData {
1514                    agent: "alice".into(),
1515                    action: AssignAction::Assign,
1516                    extra: BTreeMap::new(),
1517                }),
1518                "h3",
1519                3000,
1520            ),
1521            make_event(
1522                EventType::Comment,
1523                "bn-001",
1524                EventData::Comment(CommentData {
1525                    body: "Working on it".into(),
1526                    extra: BTreeMap::new(),
1527                }),
1528                "h4",
1529                4000,
1530            ),
1531            make_event(
1532                EventType::Update,
1533                "bn-001",
1534                EventData::Update(UpdateData {
1535                    field: "title".into(),
1536                    value: serde_json::json!("Auth bug (fixed)"),
1537                    extra: BTreeMap::new(),
1538                }),
1539                "h5",
1540                5000,
1541            ),
1542            make_event(
1543                EventType::Move,
1544                "bn-001",
1545                EventData::Move(MoveData {
1546                    state: State::Done,
1547                    reason: Some("Shipped".into()),
1548                    extra: BTreeMap::new(),
1549                }),
1550                "h6",
1551                6000,
1552            ),
1553        ];
1554
1555        // Full replay
1556        let conn_full = test_db();
1557        let proj_full = Projector::new(&conn_full);
1558        proj_full.project_batch(&events).unwrap();
1559
1560        // Incremental: one by one
1561        let conn_inc = test_db();
1562        let proj_inc = Projector::new(&conn_inc);
1563        for event in &events {
1564            proj_inc.project_event(event).unwrap();
1565        }
1566
1567        // Compare results
1568        let item_full = query::get_item(&conn_full, "bn-001", false)
1569            .unwrap()
1570            .unwrap();
1571        let item_inc = query::get_item(&conn_inc, "bn-001", false)
1572            .unwrap()
1573            .unwrap();
1574
1575        assert_eq!(item_full.title, item_inc.title);
1576        assert_eq!(item_full.state, item_inc.state);
1577        assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1578
1579        let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1580        let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1581        assert_eq!(assignees_full.len(), assignees_inc.len());
1582
1583        let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1584        let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1585        assert_eq!(comments_full.len(), comments_inc.len());
1586    }
1587
1588    // -----------------------------------------------------------------------
1589    // Full rebuild (clear + replay)
1590    // -----------------------------------------------------------------------
1591
1592    #[test]
1593    fn clear_and_replay_produces_same_result() {
1594        let conn = test_db();
1595        let projector = Projector::new(&conn);
1596
1597        let events = vec![
1598            make_create("bn-001", "Item 1", "h1", 1000),
1599            make_create("bn-002", "Item 2", "h2", 1001),
1600            make_event(
1601                EventType::Link,
1602                "bn-002",
1603                EventData::Link(LinkData {
1604                    target: "bn-001".into(),
1605                    link_type: "blocks".into(),
1606                    extra: BTreeMap::new(),
1607                }),
1608                "h3",
1609                2000,
1610            ),
1611        ];
1612
1613        // First pass
1614        projector.project_batch(&events).unwrap();
1615        let count1: i64 = conn
1616            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1617            .unwrap();
1618        assert_eq!(count1, 2);
1619
1620        // Clear and replay
1621        clear_projection(&conn).unwrap();
1622        let count_after_clear: i64 = conn
1623            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1624            .unwrap();
1625        assert_eq!(count_after_clear, 0);
1626
1627        projector.project_batch(&events).unwrap();
1628        let count2: i64 = conn
1629            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1630            .unwrap();
1631        assert_eq!(count2, 2);
1632
1633        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1634        assert_eq!(deps.len(), 1);
1635    }
1636
1637    // -----------------------------------------------------------------------
1638    // Placeholder item creation
1639    // -----------------------------------------------------------------------
1640
1641    #[test]
1642    fn events_on_missing_item_create_placeholder() {
1643        let conn = test_db();
1644        let projector = Projector::new(&conn);
1645
1646        // Comment on an item that hasn't been created yet
1647        let comment = make_event(
1648            EventType::Comment,
1649            "bn-ghost",
1650            EventData::Comment(CommentData {
1651                body: "Comment on missing item".into(),
1652                extra: BTreeMap::new(),
1653            }),
1654            "ccc",
1655            2000,
1656        );
1657        projector.project_event(&comment).unwrap();
1658
1659        // Item exists as placeholder
1660        let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1661        assert_eq!(item.title, ""); // placeholder has empty title
1662        assert_eq!(item.state, "open");
1663    }
1664
1665    // -----------------------------------------------------------------------
1666    // FTS integration
1667    // -----------------------------------------------------------------------
1668
1669    #[test]
1670    fn project_create_populates_fts() {
1671        let conn = test_db();
1672        let projector = Projector::new(&conn);
1673        projector
1674            .project_event(&make_create(
1675                "bn-001",
1676                "Authentication timeout",
1677                "aaa",
1678                1000,
1679            ))
1680            .unwrap();
1681
1682        let hits = query::search(&conn, "authentication", 10).unwrap();
1683        assert_eq!(hits.len(), 1);
1684        assert_eq!(hits[0].item_id, "bn-001");
1685    }
1686
1687    #[test]
1688    fn project_update_title_updates_fts() {
1689        let conn = test_db();
1690        let projector = Projector::new(&conn);
1691        projector
1692            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1693            .unwrap();
1694
1695        let update = make_event(
1696            EventType::Update,
1697            "bn-001",
1698            EventData::Update(UpdateData {
1699                field: "title".into(),
1700                value: serde_json::json!("Authorization failure"),
1701                extra: BTreeMap::new(),
1702            }),
1703            "bbb",
1704            2000,
1705        );
1706        projector.project_event(&update).unwrap();
1707
1708        // Old title not found
1709        let hits_old = query::search(&conn, "Old", 10).unwrap();
1710        assert!(hits_old.is_empty());
1711
1712        // New title found
1713        let hits_new = query::search(&conn, "authorization", 10).unwrap();
1714        assert_eq!(hits_new.len(), 1);
1715    }
1716
1717    // -----------------------------------------------------------------------
1718    // Batch stats
1719    // -----------------------------------------------------------------------
1720
1721    #[test]
1722    fn batch_reports_correct_stats() {
1723        let conn = test_db();
1724        let projector = Projector::new(&conn);
1725
1726        let events = vec![
1727            make_create("bn-001", "Item 1", "h1", 1000),
1728            make_create("bn-002", "Item 2", "h2", 1001),
1729            make_create("bn-003", "Item 3", "h3", 1002),
1730        ];
1731
1732        let stats = projector.project_batch(&events).unwrap();
1733        assert_eq!(stats.projected, 3);
1734        assert_eq!(stats.duplicates, 0);
1735        assert_eq!(stats.errors, 0);
1736    }
1737
1738    // -----------------------------------------------------------------------
1739    // All 11 event types in sequence
1740    // -----------------------------------------------------------------------
1741
1742    #[test]
1743    fn full_lifecycle_all_event_types() {
1744        let conn = test_db();
1745        let projector = Projector::new(&conn);
1746
1747        let mut events = vec![
1748            // 1. Create
1749            make_create("bn-001", "Auth bug", "h01", 1000),
1750            // Create a second item for linking
1751            make_create("bn-002", "Dep item", "h02", 1001),
1752        ];
1753
1754        // 2. Update title
1755        events.push(make_event(
1756            EventType::Update,
1757            "bn-001",
1758            EventData::Update(UpdateData {
1759                field: "title".into(),
1760                value: serde_json::json!("Auth timeout bug"),
1761                extra: BTreeMap::new(),
1762            }),
1763            "h03",
1764            2000,
1765        ));
1766
1767        // 3. Move to doing
1768        events.push(make_event(
1769            EventType::Move,
1770            "bn-001",
1771            EventData::Move(MoveData {
1772                state: State::Doing,
1773                reason: None,
1774                extra: BTreeMap::new(),
1775            }),
1776            "h04",
1777            3000,
1778        ));
1779
1780        // 4. Assign
1781        events.push(make_event(
1782            EventType::Assign,
1783            "bn-001",
1784            EventData::Assign(AssignData {
1785                agent: "alice".into(),
1786                action: AssignAction::Assign,
1787                extra: BTreeMap::new(),
1788            }),
1789            "h05",
1790            4000,
1791        ));
1792
1793        // 5. Comment
1794        events.push(make_event(
1795            EventType::Comment,
1796            "bn-001",
1797            EventData::Comment(CommentData {
1798                body: "Found root cause".into(),
1799                extra: BTreeMap::new(),
1800            }),
1801            "h06",
1802            5000,
1803        ));
1804
1805        // 6. Link
1806        events.push(make_event(
1807            EventType::Link,
1808            "bn-001",
1809            EventData::Link(LinkData {
1810                target: "bn-002".into(),
1811                link_type: "blocks".into(),
1812                extra: BTreeMap::new(),
1813            }),
1814            "h07",
1815            6000,
1816        ));
1817
1818        // 7. Unlink
1819        events.push(make_event(
1820            EventType::Unlink,
1821            "bn-001",
1822            EventData::Unlink(UnlinkData {
1823                target: "bn-002".into(),
1824                link_type: Some("blocks".into()),
1825                extra: BTreeMap::new(),
1826            }),
1827            "h08",
1828            7000,
1829        ));
1830
1831        // 8. Compact
1832        events.push(make_event(
1833            EventType::Compact,
1834            "bn-001",
1835            EventData::Compact(CompactData {
1836                summary: "Auth token refresh race".into(),
1837                extra: BTreeMap::new(),
1838            }),
1839            "h09",
1840            8000,
1841        ));
1842
1843        // 9. Snapshot
1844        events.push(make_event(
1845            EventType::Snapshot,
1846            "bn-001",
1847            EventData::Snapshot(SnapshotData {
1848                state: serde_json::json!({"id": "bn-001", "resolved": true}),
1849                extra: BTreeMap::new(),
1850            }),
1851            "h10",
1852            9000,
1853        ));
1854
1855        // 10. Redact the comment
1856        events.push(make_event(
1857            EventType::Redact,
1858            "bn-001",
1859            EventData::Redact(RedactData {
1860                target_hash: "blake3:h06".into(),
1861                reason: "Contained secret".into(),
1862                extra: BTreeMap::new(),
1863            }),
1864            "h11",
1865            10000,
1866        ));
1867
1868        // 11. Delete
1869        events.push(make_event(
1870            EventType::Delete,
1871            "bn-001",
1872            EventData::Delete(DeleteData {
1873                reason: Some("Duplicate".into()),
1874                extra: BTreeMap::new(),
1875            }),
1876            "h12",
1877            11000,
1878        ));
1879
1880        let stats = projector.project_batch(&events).unwrap();
1881        assert_eq!(stats.projected, 12); // 2 creates + 10 mutations
1882        assert_eq!(stats.duplicates, 0);
1883        assert_eq!(stats.errors, 0);
1884
1885        // Verify final state
1886        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1887        assert_eq!(item.title, "Auth timeout bug");
1888        assert_eq!(item.state, "doing");
1889        assert!(item.is_deleted);
1890        assert_eq!(
1891            item.compact_summary.as_deref(),
1892            Some("Auth token refresh race")
1893        );
1894        let snapshot: Option<String> = conn
1895            .query_row(
1896                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1897                [],
1898                |row| row.get(0),
1899            )
1900            .unwrap();
1901        assert!(snapshot.is_some());
1902
1903        // Comment was redacted
1904        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1905        assert_eq!(comments.len(), 1);
1906        assert_eq!(comments[0].body, "[redacted]");
1907
1908        // Dependencies were unlinked
1909        let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1910        assert!(deps.is_empty());
1911
1912        // Redaction record exists
1913        let redaction_count: i64 = conn
1914            .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1915                row.get(0)
1916            })
1917            .unwrap();
1918        assert_eq!(redaction_count, 1);
1919    }
1920
1921    #[test]
1922    fn late_create_populates_placeholder() {
1923        let conn = test_db();
1924        let projector = Projector::new(&conn);
1925
1926        // 1. Comment triggers placeholder creation
1927        let comment = make_event(
1928            EventType::Comment,
1929            "bn-late",
1930            EventData::Comment(CommentData {
1931                body: "Comment first".into(),
1932                extra: BTreeMap::new(),
1933            }),
1934            "h1",
1935            1000,
1936        );
1937        projector.project_event(&comment).unwrap();
1938
1939        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1940        assert_eq!(item.title, "");
1941        assert_eq!(item.created_at_us, 1000);
1942
1943        // 2. Late Create arrives
1944        let create = make_create("bn-late", "Real Title", "h2", 900);
1945
1946        projector.project_event(&create).unwrap();
1947
1948        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1949        assert_eq!(item.title, "Real Title");
1950        assert_eq!(item.created_at_us, 900);
1951    }
1952
1953    /// Regression: Projector::new() must create the projected_events table
1954    /// on a migrated DB that never had ensure_tracking_table() called.
1955    /// Without this, fresh installs fail with "record projected event hash".
1956    #[test]
1957    fn projector_new_creates_tracking_table_on_fresh_db() {
1958        let mut conn = Connection::open_in_memory().expect("open in-memory db");
1959        migrations::migrate(&mut conn).expect("migrate");
1960        // Do NOT call ensure_tracking_table — simulate fresh install.
1961
1962        // Projector::new should create the table automatically.
1963        let projector = Projector::new(&conn);
1964        let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
1965        projector
1966            .project_event(&event)
1967            .expect("project_event should succeed on fresh DB");
1968
1969        let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
1970        assert_eq!(item.title, "Fresh item");
1971    }
1972}