Skip to main content

bones_core/db/
project.rs

1//! Event replay → `SQLite` projection pipeline.
2//!
3//! The [`Projector`] replays events from the TSJSON event log and upserts
4//! the resulting state into the `SQLite` projection database. It handles all
5//! 11 event types and supports both incremental (single-event) and full
6//! rebuild modes.
7//!
8//! # Deduplication
9//!
10//! Events are deduplicated by `event_hash`. When a duplicate hash is
11//! encountered (e.g. from git merge duplicating lines in shard files),
12//! the event is silently skipped.
13//!
14//! # Cursor
15//!
16//! After projecting a batch, the caller can persist the byte offset and
17//! last event hash via [`super::query::update_projection_cursor`] for
18//! incremental replay on next startup.
19
20use anyhow::{Context, Result};
21use rusqlite::{Connection, params};
22
23use crate::db::query;
24use crate::event::Event;
25use crate::event::data::{AssignAction, EventData};
26use crate::event::types::EventType;
27use crate::shard::ShardManager;
28
29// ---------------------------------------------------------------------------
30// ProjectionStats
31// ---------------------------------------------------------------------------
32
33/// Statistics returned after a projection run.
34#[derive(Debug, Clone, Default, PartialEq, Eq)]
35pub struct ProjectionStats {
36    /// Number of events successfully projected.
37    pub projected: usize,
38    /// Number of duplicate events skipped.
39    pub duplicates: usize,
40    /// Number of events that caused errors (logged and skipped).
41    pub errors: usize,
42}
43
44// ---------------------------------------------------------------------------
45// Projector
46// ---------------------------------------------------------------------------
47
48/// Replays events into the `SQLite` projection.
49///
50/// Create a `Projector` with a connection, then call [`project_event`] for
51/// each event or [`project_batch`] for a slice.
52pub struct Projector<'conn> {
53    conn: &'conn Connection,
54    has_agent_column: bool,
55}
56
57impl<'conn> Projector<'conn> {
58    /// Create a new projector backed by the given connection.
59    ///
60    /// Ensures the `projected_events` tracking table exists before any
61    /// projection work is attempted.
62    pub fn new(conn: &'conn Connection) -> Self {
63        // Best-effort: if the DDL fails (e.g. read-only DB) we'll still
64        // attempt projection and let it fail at the INSERT instead.
65        let _ = ensure_tracking_table(conn);
66        let has_agent_column = projected_events_has_agent_column(conn).unwrap_or(false);
67        Self {
68            conn,
69            has_agent_column,
70        }
71    }
72
73    /// Project a batch of events, returning aggregate statistics.
74    ///
75    /// Events are applied inside a single transaction for performance.
76    /// Duplicate events (same `event_hash`) are silently skipped.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the transaction fails to commit. Individual event
81    /// projection errors are counted in `stats.errors` but do not abort the
82    /// batch.
83    pub fn project_batch(&self, events: &[Event]) -> Result<ProjectionStats> {
84        let mut stats = ProjectionStats::default();
85
86        // Use a savepoint so we can commit all-or-nothing
87        self.conn
88            .execute_batch("BEGIN IMMEDIATE")
89            .context("begin projection transaction")?;
90
91        for event in events {
92            match self.project_event_inner(event) {
93                Ok(ProjectResult::Projected) => stats.projected += 1,
94                Ok(ProjectResult::Duplicate) => stats.duplicates += 1,
95                Err(e) => {
96                    tracing::warn!(
97                        event_hash = %event.event_hash,
98                        event_type = %event.event_type,
99                        item_id = %event.item_id,
100                        error = %e,
101                        "skipping event due to projection error"
102                    );
103                    stats.errors += 1;
104                }
105            }
106        }
107
108        self.conn
109            .execute_batch("COMMIT")
110            .context("commit projection transaction")?;
111
112        if stats.errors > 0 {
113            let reason = format!(
114                "project_batch encountered {} projection errors while applying {} events",
115                stats.errors,
116                events.len()
117            );
118            if let Err(err) = crate::db::mark_projection_dirty_from_connection(self.conn, &reason) {
119                tracing::warn!(error = %err, "failed to mark projection dirty after batch errors");
120            }
121        }
122
123        Ok(stats)
124    }
125
126    /// Project a single event (outside of any managed transaction).
127    ///
128    /// Returns `true` if the event was projected, `false` if it was a
129    /// duplicate.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the projection fails.
134    pub fn project_event(&self, event: &Event) -> Result<bool> {
135        let projected = match self.project_event_inner(event) {
136            Ok(ProjectResult::Projected) => true,
137            Ok(ProjectResult::Duplicate) => false,
138            Err(err) => {
139                let reason = format!(
140                    "project_event failed hash={} type={}",
141                    event.event_hash, event.event_type
142                );
143                if let Err(mark_err) =
144                    crate::db::mark_projection_dirty_from_connection(self.conn, &reason)
145                {
146                    tracing::warn!(
147                        error = %mark_err,
148                        event_hash = %event.event_hash,
149                        "failed to mark projection dirty after single-event projection failure"
150                    );
151                }
152                return Err(err);
153            }
154        };
155
156        if let Err(err) = self.update_cursor_to_event_log_end(&event.event_hash) {
157            tracing::warn!(
158                event_hash = %event.event_hash,
159                error = %err,
160                "failed to update projection cursor after single-event projection"
161            );
162            let reason = format!(
163                "cursor update failed after projecting hash={} error={err}",
164                event.event_hash
165            );
166            let _ = crate::db::mark_projection_dirty_from_connection(self.conn, &reason);
167        }
168
169        Ok(projected)
170    }
171
172    // -----------------------------------------------------------------------
173    // Internal dispatch
174    // -----------------------------------------------------------------------
175
176    fn project_event_inner(&self, event: &Event) -> Result<ProjectResult> {
177        // Dedup check: skip if event_hash already projected
178        if self.is_event_projected(&event.event_hash)? {
179            return Ok(ProjectResult::Duplicate);
180        }
181
182        match event.event_type {
183            EventType::Create => self.project_create(event)?,
184            EventType::Update => self.project_update(event)?,
185            EventType::Move => self.project_move(event)?,
186            EventType::Assign => self.project_assign(event)?,
187            EventType::Comment => self.project_comment(event)?,
188            EventType::Link => self.project_link(event)?,
189            EventType::Unlink => self.project_unlink(event)?,
190            EventType::Delete => self.project_delete(event)?,
191            EventType::Compact => self.project_compact(event)?,
192            EventType::Snapshot => self.project_snapshot(event)?,
193            EventType::Redact => self.project_redact(event)?,
194        }
195
196        // Record that this event hash has been projected
197        self.record_projected_hash(&event.event_hash, event)?;
198
199        Ok(ProjectResult::Projected)
200    }
201
202    // -----------------------------------------------------------------------
203    // Dedup tracking
204    // -----------------------------------------------------------------------
205
206    fn is_event_projected(&self, event_hash: &str) -> Result<bool> {
207        // Check item_comments table for comment events (unique on event_hash)
208        // and the event_redactions table for redact events.
209        // For a general dedup check, we use projection_meta's tracking.
210        // Simple approach: check if item_comments has this hash OR if
211        // event_redactions has it as target. For general dedup, we use
212        // a lightweight check in the projected_events tracking.
213        let exists: bool = self
214            .conn
215            .query_row(
216                "SELECT EXISTS(SELECT 1 FROM projected_events WHERE event_hash = ?1)",
217                params![event_hash],
218                |row| row.get(0),
219            )
220            .unwrap_or(false);
221        Ok(exists)
222    }
223
224    fn is_event_redacted(&self, event_hash: &str) -> Result<bool> {
225        let exists: bool = self
226            .conn
227            .query_row(
228                "SELECT EXISTS(SELECT 1 FROM event_redactions WHERE target_event_hash = ?1)",
229                params![event_hash],
230                |row| row.get(0),
231            )
232            .unwrap_or(false);
233        Ok(exists)
234    }
235
236    fn record_projected_hash(&self, event_hash: &str, event: &Event) -> Result<()> {
237        if self.has_agent_column {
238            self.conn
239                .execute(
240                    "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us, agent) \
241                     VALUES (?1, ?2, ?3, ?4, ?5)",
242                    params![
243                        event_hash,
244                        event.item_id.as_str(),
245                        event.event_type.as_str(),
246                        event.wall_ts_us,
247                        event.agent.as_str(),
248                    ],
249                )
250                .context("record projected event hash")?;
251            return Ok(());
252        }
253
254        self.conn
255            .execute(
256                "INSERT OR IGNORE INTO projected_events (event_hash, item_id, event_type, projected_at_us) \
257                 VALUES (?1, ?2, ?3, ?4)",
258                params![
259                    event_hash,
260                    event.item_id.as_str(),
261                    event.event_type.as_str(),
262                    event.wall_ts_us,
263                ],
264            )
265            .context("record projected event hash")?;
266        Ok(())
267    }
268
269    fn update_cursor_to_event_log_end(&self, event_hash: &str) -> Result<()> {
270        let main_db_file: String = self
271            .conn
272            .query_row(
273                "SELECT file FROM pragma_database_list WHERE name = 'main'",
274                [],
275                |row| row.get(0),
276            )
277            .context("read main database file from pragma_database_list")?;
278
279        if main_db_file.trim().is_empty() {
280            // In-memory projections used in tests have no on-disk log context.
281            return Ok(());
282        }
283
284        let db_path = std::path::Path::new(&main_db_file);
285        let Some(bones_dir) = db_path.parent() else {
286            return Ok(());
287        };
288
289        let shard_mgr = ShardManager::new(bones_dir);
290        let total_len = shard_mgr
291            .total_content_len()
292            .map_err(|e| anyhow::anyhow!("read event-log size for cursor update: {e}"))?;
293        let total_len_i64 = i64::try_from(total_len).unwrap_or(i64::MAX);
294
295        query::update_projection_cursor(self.conn, total_len_i64, Some(event_hash))
296            .context("write projection cursor after single-event projection")?;
297
298        Ok(())
299    }
300
301    // -----------------------------------------------------------------------
302    // Event type handlers
303    // -----------------------------------------------------------------------
304
305    fn project_create(&self, event: &Event) -> Result<()> {
306        let EventData::Create(ref data) = event.data else {
307            anyhow::bail!("expected Create data for item.create event");
308        };
309
310        let is_redacted = self.is_event_redacted(&event.event_hash)?;
311        let title = if is_redacted { "" } else { &data.title };
312        let description = if is_redacted {
313            None
314        } else {
315            data.description.as_deref()
316        };
317        let labels_str = if is_redacted {
318            String::new()
319        } else {
320            data.labels.join(" ")
321        };
322
323        self.conn
324            .execute(
325                "INSERT INTO items (
326                    item_id, title, description, kind, state, urgency,
327                    size, parent_id, is_deleted, search_labels,
328                    created_at_us, updated_at_us
329                ) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, 0, ?8, ?9, ?10)
330                ON CONFLICT(item_id) DO UPDATE SET
331                    title = CASE
332                        WHEN items.title = '' THEN excluded.title
333                        ELSE items.title
334                    END,
335                    description = COALESCE(items.description, excluded.description),
336                    kind = CASE
337                        WHEN items.kind = 'task' THEN excluded.kind
338                        ELSE items.kind
339                    END,
340                    urgency = CASE
341                        WHEN items.urgency = 'default' THEN excluded.urgency
342                        ELSE items.urgency
343                    END,
344                    size = COALESCE(items.size, excluded.size),
345                    parent_id = COALESCE(items.parent_id, excluded.parent_id),
346                    created_at_us = MIN(items.created_at_us, excluded.created_at_us),
347                    search_labels = CASE
348                        WHEN items.search_labels = '' THEN excluded.search_labels
349                        ELSE items.search_labels
350                    END,
351                    updated_at_us = MAX(items.updated_at_us, excluded.updated_at_us)
352                WHERE NOT EXISTS (
353                    SELECT 1 FROM projected_events
354                    WHERE item_id = excluded.item_id AND event_type = 'item.create'
355                )",
356                params![
357                    event.item_id.as_str(),
358                    title,
359                    description,
360                    data.kind.to_string(),
361                    data.urgency.to_string(),
362                    data.size.map(|s| s.to_string()),
363                    data.parent.as_deref(),
364                    labels_str,
365                    event.wall_ts_us,
366                    event.wall_ts_us,
367                ],
368            )
369            .with_context(|| format!("project create for {}", event.item_id))?;
370
371        // Insert initial labels
372        if !is_redacted {
373            for label in &data.labels {
374                self.conn
375                    .execute(
376                        "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
377                         VALUES (?1, ?2, ?3)",
378                        params![event.item_id.as_str(), label, event.wall_ts_us],
379                    )
380                    .with_context(|| format!("insert label '{label}' for {}", event.item_id))?;
381            }
382            self.refresh_search_labels(event.item_id.as_str(), event.wall_ts_us)?;
383        }
384
385        Ok(())
386    }
387
388    #[allow(clippy::too_many_lines)]
389    fn project_update(&self, event: &Event) -> Result<()> {
390        let EventData::Update(ref data) = event.data else {
391            anyhow::bail!("expected Update data for item.update event");
392        };
393
394        self.ensure_item_exists(event)?;
395
396        let is_redacted = self.is_event_redacted(&event.event_hash)?;
397
398        match data.field.as_str() {
399            "title" => {
400                let value = if is_redacted {
401                    "[redacted]"
402                } else {
403                    data.value.as_str().unwrap_or_default()
404                };
405                self.conn.execute(
406                    "UPDATE items SET title = ?1, updated_at_us = ?2 WHERE item_id = ?3",
407                    params![value, event.wall_ts_us, event.item_id.as_str()],
408                )?;
409            }
410            "description" => {
411                let value = if is_redacted {
412                    Some("[redacted]".to_string())
413                } else {
414                    data.value.as_str().map(String::from)
415                };
416                self.conn.execute(
417                    "UPDATE items SET description = ?1, updated_at_us = ?2 WHERE item_id = ?3",
418                    params![value, event.wall_ts_us, event.item_id.as_str()],
419                )?;
420            }
421            "kind" => {
422                let value = data.value.as_str().unwrap_or("task");
423                self.conn.execute(
424                    "UPDATE items SET kind = ?1, updated_at_us = ?2 WHERE item_id = ?3",
425                    params![value, event.wall_ts_us, event.item_id.as_str()],
426                )?;
427            }
428            "size" => {
429                let value = data.value.as_str().map(String::from);
430                self.conn.execute(
431                    "UPDATE items SET size = ?1, updated_at_us = ?2 WHERE item_id = ?3",
432                    params![value, event.wall_ts_us, event.item_id.as_str()],
433                )?;
434            }
435            "urgency" => {
436                let value = data.value.as_str().unwrap_or("default");
437                self.conn.execute(
438                    "UPDATE items SET urgency = ?1, updated_at_us = ?2 WHERE item_id = ?3",
439                    params![value, event.wall_ts_us, event.item_id.as_str()],
440                )?;
441            }
442            "parent" => {
443                let value = data.value.as_str().map(String::from);
444                self.conn.execute(
445                    "UPDATE items SET parent_id = ?1, updated_at_us = ?2 WHERE item_id = ?3",
446                    params![value, event.wall_ts_us, event.item_id.as_str()],
447                )?;
448            }
449            "labels" => {
450                // Labels update: supports both legacy array replacement
451                // and new add/remove action format (CRDT-friendly).
452                let mut changed = false;
453
454                if let Some(labels) = data.value.as_array() {
455                    // Legacy: replace entire label set
456                    self.conn.execute(
457                        "DELETE FROM item_labels WHERE item_id = ?1",
458                        params![event.item_id.as_str()],
459                    )?;
460
461                    for label_val in labels {
462                        if let Some(label) = label_val.as_str() {
463                            self.conn.execute(
464                                "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
465                                 VALUES (?1, ?2, ?3)",
466                                params![event.item_id.as_str(), label, event.wall_ts_us],
467                            )?;
468                        }
469                    }
470                    changed = true;
471                } else if let Some(obj) = data.value.as_object() {
472                    // New: add/remove single label
473                    let action = obj.get("action").and_then(|v| v.as_str()).unwrap_or("");
474                    let label = obj.get("label").and_then(|v| v.as_str()).unwrap_or("");
475
476                    if !label.is_empty() {
477                        match action {
478                            "add" => {
479                                self.conn.execute(
480                                    "INSERT OR IGNORE INTO item_labels (item_id, label, created_at_us)
481                                     VALUES (?1, ?2, ?3)",
482                                    params![event.item_id.as_str(), label, event.wall_ts_us],
483                                )?;
484                                changed = true;
485                            }
486                            "remove" => {
487                                self.conn.execute(
488                                    "DELETE FROM item_labels WHERE item_id = ?1 AND label = ?2",
489                                    params![event.item_id.as_str(), label],
490                                )?;
491                                changed = true;
492                            }
493                            _ => {}
494                        }
495                    }
496                }
497
498                if changed {
499                    self.refresh_search_labels(event.item_id.as_str(), event.wall_ts_us)?;
500                }
501            }
502            _ => {
503                // Unknown field — just bump updated_at
504                tracing::debug!(
505                    field = %data.field,
506                    item_id = %event.item_id,
507                    "ignoring update for unknown field"
508                );
509                self.conn.execute(
510                    "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
511                    params![event.wall_ts_us, event.item_id.as_str()],
512                )?;
513            }
514        }
515
516        Ok(())
517    }
518
519    fn project_move(&self, event: &Event) -> Result<()> {
520        let EventData::Move(ref data) = event.data else {
521            anyhow::bail!("expected Move data for item.move event");
522        };
523
524        self.ensure_item_exists(event)?;
525
526        self.conn
527            .execute(
528                "UPDATE items SET state = ?1, updated_at_us = ?2 WHERE item_id = ?3",
529                params![
530                    data.state.to_string(),
531                    event.wall_ts_us,
532                    event.item_id.as_str(),
533                ],
534            )
535            .with_context(|| format!("project move for {}", event.item_id))?;
536
537        Ok(())
538    }
539
540    fn project_assign(&self, event: &Event) -> Result<()> {
541        let EventData::Assign(ref data) = event.data else {
542            anyhow::bail!("expected Assign data for item.assign event");
543        };
544
545        self.ensure_item_exists(event)?;
546
547        match data.action {
548            AssignAction::Assign => {
549                self.conn
550                    .execute(
551                        "INSERT OR IGNORE INTO item_assignees (item_id, agent, created_at_us)
552                         VALUES (?1, ?2, ?3)",
553                        params![event.item_id.as_str(), data.agent, event.wall_ts_us],
554                    )
555                    .with_context(|| format!("assign {} to {}", data.agent, event.item_id))?;
556            }
557            AssignAction::Unassign => {
558                self.conn
559                    .execute(
560                        "DELETE FROM item_assignees WHERE item_id = ?1 AND agent = ?2",
561                        params![event.item_id.as_str(), data.agent],
562                    )
563                    .with_context(|| format!("unassign {} from {}", data.agent, event.item_id))?;
564            }
565        }
566
567        // Bump updated_at
568        self.conn.execute(
569            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
570            params![event.wall_ts_us, event.item_id.as_str()],
571        )?;
572
573        Ok(())
574    }
575
576    fn project_comment(&self, event: &Event) -> Result<()> {
577        let EventData::Comment(ref data) = event.data else {
578            anyhow::bail!("expected Comment data for item.comment event");
579        };
580
581        self.ensure_item_exists(event)?;
582
583        let is_redacted = self.is_event_redacted(&event.event_hash)?;
584        let body = if is_redacted {
585            "[redacted]"
586        } else {
587            &data.body
588        };
589
590        self.conn
591            .execute(
592                "INSERT OR IGNORE INTO item_comments (item_id, event_hash, author, body, created_at_us)
593                 VALUES (?1, ?2, ?3, ?4, ?5)",
594                params![
595                    event.item_id.as_str(),
596                    event.event_hash,
597                    event.agent,
598                    body,
599                    event.wall_ts_us,
600                ],
601            )
602            .with_context(|| format!("project comment for {}", event.item_id))?;
603
604        // Bump updated_at
605        self.conn.execute(
606            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
607            params![event.wall_ts_us, event.item_id.as_str()],
608        )?;
609
610        Ok(())
611    }
612
613    fn project_link(&self, event: &Event) -> Result<()> {
614        let EventData::Link(ref data) = event.data else {
615            anyhow::bail!("expected Link data for item.link event");
616        };
617
618        self.ensure_item_exists(event)?;
619
620        self.conn
621            .execute(
622                "INSERT OR IGNORE INTO item_dependencies (item_id, depends_on_item_id, link_type, created_at_us)
623                 VALUES (?1, ?2, ?3, ?4)",
624                params![
625                    event.item_id.as_str(),
626                    data.target,
627                    data.link_type,
628                    event.wall_ts_us,
629                ],
630            )
631            .with_context(|| {
632                format!("project link {} -> {}", event.item_id, data.target)
633            })?;
634
635        // Bump updated_at
636        self.conn.execute(
637            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
638            params![event.wall_ts_us, event.item_id.as_str()],
639        )?;
640
641        Ok(())
642    }
643
644    fn project_unlink(&self, event: &Event) -> Result<()> {
645        let EventData::Unlink(ref data) = event.data else {
646            anyhow::bail!("expected Unlink data for item.unlink event");
647        };
648
649        self.ensure_item_exists(event)?;
650
651        if let Some(ref link_type) = data.link_type {
652            self.conn
653                .execute(
654                    "DELETE FROM item_dependencies \
655                     WHERE item_id = ?1 AND depends_on_item_id = ?2 AND link_type = ?3",
656                    params![event.item_id.as_str(), data.target, link_type],
657                )
658                .with_context(|| format!("unlink {} -/-> {}", event.item_id, data.target))?;
659        } else {
660            // No link_type: remove all links to target
661            self.conn
662                .execute(
663                    "DELETE FROM item_dependencies \
664                     WHERE item_id = ?1 AND depends_on_item_id = ?2",
665                    params![event.item_id.as_str(), data.target],
666                )
667                .with_context(|| format!("unlink all {} -/-> {}", event.item_id, data.target))?;
668        }
669
670        // Bump updated_at
671        self.conn.execute(
672            "UPDATE items SET updated_at_us = ?1 WHERE item_id = ?2",
673            params![event.wall_ts_us, event.item_id.as_str()],
674        )?;
675
676        Ok(())
677    }
678
679    fn project_delete(&self, event: &Event) -> Result<()> {
680        self.ensure_item_exists(event)?;
681
682        self.conn
683            .execute(
684                "UPDATE items SET is_deleted = 1, deleted_at_us = ?1, updated_at_us = ?1 \
685                 WHERE item_id = ?2",
686                params![event.wall_ts_us, event.item_id.as_str()],
687            )
688            .with_context(|| format!("project delete for {}", event.item_id))?;
689
690        Ok(())
691    }
692
693    fn project_compact(&self, event: &Event) -> Result<()> {
694        let EventData::Compact(ref data) = event.data else {
695            anyhow::bail!("expected Compact data for item.compact event");
696        };
697
698        self.ensure_item_exists(event)?;
699
700        let is_redacted = self.is_event_redacted(&event.event_hash)?;
701        let summary = if is_redacted {
702            "[redacted]"
703        } else {
704            data.summary.as_str()
705        };
706
707        self.conn
708            .execute(
709                "UPDATE items SET compact_summary = ?1, updated_at_us = ?2 WHERE item_id = ?3",
710                params![summary, event.wall_ts_us, event.item_id.as_str()],
711            )
712            .with_context(|| format!("project compact for {}", event.item_id))?;
713
714        Ok(())
715    }
716
717    fn project_snapshot(&self, event: &Event) -> Result<()> {
718        let EventData::Snapshot(ref data) = event.data else {
719            anyhow::bail!("expected Snapshot data for item.snapshot event");
720        };
721
722        self.ensure_item_exists(event)?;
723
724        let json_str = serde_json::to_string(&data.state).context("serialize snapshot state")?;
725
726        self.conn
727            .execute(
728                "UPDATE items SET snapshot_json = ?1, updated_at_us = ?2 WHERE item_id = ?3",
729                params![json_str, event.wall_ts_us, event.item_id.as_str()],
730            )
731            .with_context(|| format!("project snapshot for {}", event.item_id))?;
732
733        Ok(())
734    }
735
736    fn project_redact(&self, event: &Event) -> Result<()> {
737        let EventData::Redact(ref data) = event.data else {
738            anyhow::bail!("expected Redact data for item.redact event");
739        };
740
741        // Insert redaction record
742        self.conn
743            .execute(
744                "INSERT OR IGNORE INTO event_redactions \
745                 (target_event_hash, item_id, reason, redacted_by, redacted_at_us) \
746                 VALUES (?1, ?2, ?3, ?4, ?5)",
747                params![
748                    data.target_hash,
749                    event.item_id.as_str(),
750                    data.reason,
751                    event.agent,
752                    event.wall_ts_us,
753                ],
754            )
755            .with_context(|| {
756                format!(
757                    "project redact for {} targeting {}",
758                    event.item_id, data.target_hash
759                )
760            })?;
761
762        // Redact the comment body if the target hash is a comment event
763        self.conn
764            .execute(
765                "UPDATE item_comments SET body = '[redacted]' WHERE event_hash = ?1",
766                params![data.target_hash],
767            )
768            .context("redact comment body")?;
769
770        Ok(())
771    }
772
773    // -----------------------------------------------------------------------
774    // Helpers
775    // -----------------------------------------------------------------------
776
777    /// Ensure the item exists in the projection. If not, create a placeholder
778    /// row so that subsequent operations (UPDATE, foreign keys) succeed.
779    ///
780    /// This handles out-of-order event replay and missing create events.
781    fn ensure_item_exists(&self, event: &Event) -> Result<()> {
782        let exists: bool = self
783            .conn
784            .query_row(
785                "SELECT EXISTS(SELECT 1 FROM items WHERE item_id = ?1)",
786                params![event.item_id.as_str()],
787                |row| row.get(0),
788            )
789            .context("check item exists")?;
790
791        if !exists {
792            self.conn
793                .execute(
794                    "INSERT INTO items (
795                        item_id, title, kind, state, urgency,
796                        is_deleted, search_labels, created_at_us, updated_at_us
797                    ) VALUES (?1, '', 'task', 'open', 'default', 0, '', ?2, ?2)",
798                    params![event.item_id.as_str(), event.wall_ts_us],
799                )
800                .with_context(|| format!("create placeholder item for {}", event.item_id))?;
801        }
802
803        Ok(())
804    }
805
806    fn refresh_search_labels(&self, item_id: &str, updated_at_us: i64) -> Result<()> {
807        let mut stmt = self
808            .conn
809            .prepare("SELECT label FROM item_labels WHERE item_id = ?1 ORDER BY label")?;
810        let label_rows = stmt.query_map(params![item_id], |row| row.get::<_, String>(0))?;
811
812        let mut label_strings = Vec::new();
813        for label_res in label_rows {
814            label_strings.push(label_res?);
815        }
816
817        let search_labels = label_strings.join(" ");
818        self.conn.execute(
819            "UPDATE items
820             SET search_labels = ?1,
821                 updated_at_us = MAX(updated_at_us, ?2)
822             WHERE item_id = ?3",
823            params![search_labels, updated_at_us, item_id],
824        )?;
825        Ok(())
826    }
827}
828
829enum ProjectResult {
830    Projected,
831    Duplicate,
832}
833
834// ---------------------------------------------------------------------------
835// Schema addition: projected_events tracking table
836// ---------------------------------------------------------------------------
837
838/// SQL to create the `projected_events` tracking table.
839///
840/// This is applied as part of the projection setup, not as a schema migration,
841/// because it is projection-internal bookkeeping.
842pub const PROJECTED_EVENTS_DDL: &str = "\
843CREATE TABLE IF NOT EXISTS projected_events (
844    event_hash TEXT PRIMARY KEY,
845    item_id TEXT NOT NULL,
846    event_type TEXT NOT NULL,
847    projected_at_us INTEGER NOT NULL,
848    agent TEXT NOT NULL DEFAULT ''
849);
850CREATE INDEX IF NOT EXISTS idx_projected_events_item
851    ON projected_events(item_id);
852CREATE INDEX IF NOT EXISTS idx_projected_events_agent
853    ON projected_events(agent);
854";
855
856/// Ensure the `projected_events` tracking table exists.
857///
858/// Call this once after opening the projection database and before
859/// projecting events.
860///
861/// # Errors
862///
863/// Returns an error if executing the DDL fails.
864pub fn ensure_tracking_table(conn: &Connection) -> Result<()> {
865    conn.execute_batch(PROJECTED_EVENTS_DDL)
866        .context("create projected_events tracking table")?;
867
868    if !projected_events_has_agent_column(conn)? {
869        conn.execute(
870            "ALTER TABLE projected_events ADD COLUMN agent TEXT NOT NULL DEFAULT ''",
871            [],
872        )
873        .context("add agent column to projected_events")?;
874    }
875
876    conn.execute_batch(
877        "CREATE INDEX IF NOT EXISTS idx_projected_events_agent ON projected_events(agent);",
878    )
879    .context("create projected_events_agent index")?;
880
881    Ok(())
882}
883
884fn projected_events_has_agent_column(conn: &Connection) -> Result<bool> {
885    let mut stmt = conn
886        .prepare("PRAGMA table_info(projected_events)")
887        .context("inspect projected_events schema")?;
888    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
889
890    for row in rows {
891        let name = row.context("read projected_events column")?;
892        if name == "agent" {
893            return Ok(true);
894        }
895    }
896
897    Ok(false)
898}
899
900/// Drop all projection data for a full rebuild.
901///
902/// Clears all items, edge tables, comments, redactions, FTS index, and the
903/// projected events tracking table. Schema structure is preserved.
904///
905/// # Errors
906///
907/// Returns an error if the truncation fails.
908pub fn clear_projection(conn: &Connection) -> Result<()> {
909    conn.execute_batch(
910        "DELETE FROM event_redactions;
911         DELETE FROM item_comments;
912         DELETE FROM item_dependencies;
913         DELETE FROM item_assignees;
914         DELETE FROM item_labels;
915         DELETE FROM items;
916         DELETE FROM projected_events;
917         UPDATE projection_meta SET last_event_offset = 0, last_event_hash = NULL WHERE id = 1;",
918    )
919    .context("clear projection tables")?;
920    Ok(())
921}
922
923// ---------------------------------------------------------------------------
924// Tests
925// ---------------------------------------------------------------------------
926
927#[cfg(test)]
928mod tests {
929    use super::*;
930    use crate::db::{migrations, query};
931    use crate::event::data::*;
932    use crate::event::types::EventType;
933    use crate::event::writer;
934    use crate::model::item::{Kind, Size, State, Urgency};
935    use crate::model::item_id::ItemId;
936    use crate::shard::ShardManager;
937    use rusqlite::Connection;
938    use std::collections::BTreeMap;
939    use tempfile::TempDir;
940
941    fn test_db() -> Connection {
942        let mut conn = Connection::open_in_memory().expect("open in-memory db");
943        migrations::migrate(&mut conn).expect("migrate");
944        ensure_tracking_table(&conn).expect("create tracking table");
945        conn
946    }
947
948    fn make_event(
949        event_type: EventType,
950        item_id: &str,
951        data: EventData,
952        hash: &str,
953        ts: i64,
954    ) -> Event {
955        Event {
956            wall_ts_us: ts,
957            agent: "test-agent".into(),
958            itc: "itc:AQ".into(),
959            parents: vec![],
960            event_type,
961            item_id: ItemId::new_unchecked(item_id),
962            data,
963            event_hash: format!("blake3:{hash}"),
964        }
965    }
966
967    fn make_create(id: &str, title: &str, hash: &str, ts: i64) -> Event {
968        make_event(
969            EventType::Create,
970            id,
971            EventData::Create(CreateData {
972                title: title.into(),
973                kind: Kind::Task,
974                size: Some(Size::M),
975                urgency: Urgency::Default,
976                labels: vec!["backend".into(), "auth".into()],
977                parent: None,
978                causation: None,
979                description: Some("A detailed description".into()),
980                extra: BTreeMap::new(),
981            }),
982            hash,
983            ts,
984        )
985    }
986
987    #[test]
988    fn project_event_updates_projection_cursor_for_file_backed_db() {
989        let dir = TempDir::new().expect("tempdir");
990        let bones_dir = dir.path().join(".bones");
991        std::fs::create_dir_all(&bones_dir).expect("create .bones");
992
993        let shard_mgr = ShardManager::new(&bones_dir);
994        shard_mgr.init().expect("init shard manager");
995
996        let db_path = bones_dir.join("bones.db");
997        let mut conn = Connection::open(&db_path).expect("open projection db");
998        migrations::migrate(&mut conn).expect("migrate");
999        ensure_tracking_table(&conn).expect("create tracking table");
1000
1001        let projector = Projector::new(&conn);
1002        let mut event = make_create("bn-001", "Cursor update", "h1", 1000);
1003        let line = writer::write_event(&mut event).expect("serialize event");
1004        shard_mgr
1005            .append(&line, false, std::time::Duration::from_secs(5))
1006            .expect("append event line");
1007
1008        projector.project_event(&event).expect("project event");
1009
1010        let (offset, hash) = query::get_projection_cursor(&conn).expect("read projection cursor");
1011        let expected_offset =
1012            i64::try_from(shard_mgr.total_content_len().expect("content len")).unwrap_or(i64::MAX);
1013
1014        assert_eq!(offset, expected_offset);
1015        assert_eq!(hash.as_deref(), Some(event.event_hash.as_str()));
1016    }
1017
1018    // -----------------------------------------------------------------------
1019    // Create
1020    // -----------------------------------------------------------------------
1021
1022    #[test]
1023    fn project_create_inserts_item() {
1024        let conn = test_db();
1025        let projector = Projector::new(&conn);
1026        let event = make_create("bn-001", "Fix auth timeout", "aaa", 1000);
1027
1028        let result = projector.project_event(&event).unwrap();
1029        assert!(result, "should return true for new projection");
1030
1031        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1032        assert_eq!(item.title, "Fix auth timeout");
1033        assert_eq!(item.kind, "task");
1034        assert_eq!(item.state, "open");
1035        assert_eq!(item.urgency, "default");
1036        assert_eq!(item.size.as_deref(), Some("m"));
1037        assert_eq!(item.description.as_deref(), Some("A detailed description"));
1038        assert_eq!(item.created_at_us, 1000);
1039        assert_eq!(item.updated_at_us, 1000);
1040    }
1041
1042    #[test]
1043    fn project_create_inserts_labels() {
1044        let conn = test_db();
1045        let projector = Projector::new(&conn);
1046        let event = make_create("bn-001", "Fix auth", "aaa", 1000);
1047        projector.project_event(&event).unwrap();
1048
1049        let labels = query::get_labels(&conn, "bn-001").unwrap();
1050        assert_eq!(labels.len(), 2);
1051        assert_eq!(labels[0].label, "auth");
1052        assert_eq!(labels[1].label, "backend");
1053    }
1054
1055    // -----------------------------------------------------------------------
1056    // Update
1057    // -----------------------------------------------------------------------
1058
1059    #[test]
1060    fn project_update_title() {
1061        let conn = test_db();
1062        let projector = Projector::new(&conn);
1063        projector
1064            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1065            .unwrap();
1066
1067        let update = make_event(
1068            EventType::Update,
1069            "bn-001",
1070            EventData::Update(UpdateData {
1071                field: "title".into(),
1072                value: serde_json::json!("New title"),
1073                extra: BTreeMap::new(),
1074            }),
1075            "bbb",
1076            2000,
1077        );
1078        projector.project_event(&update).unwrap();
1079
1080        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1081        assert_eq!(item.title, "New title");
1082        assert_eq!(item.updated_at_us, 2000);
1083    }
1084
1085    #[test]
1086    fn project_update_description() {
1087        let conn = test_db();
1088        let projector = Projector::new(&conn);
1089        projector
1090            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1091            .unwrap();
1092
1093        let update = make_event(
1094            EventType::Update,
1095            "bn-001",
1096            EventData::Update(UpdateData {
1097                field: "description".into(),
1098                value: serde_json::json!("Updated description"),
1099                extra: BTreeMap::new(),
1100            }),
1101            "bbb",
1102            2000,
1103        );
1104        projector.project_event(&update).unwrap();
1105
1106        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1107        assert_eq!(item.description.as_deref(), Some("Updated description"));
1108    }
1109
1110    #[test]
1111    fn project_update_labels() {
1112        let conn = test_db();
1113        let projector = Projector::new(&conn);
1114        projector
1115            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1116            .unwrap();
1117
1118        let update = make_event(
1119            EventType::Update,
1120            "bn-001",
1121            EventData::Update(UpdateData {
1122                field: "labels".into(),
1123                value: serde_json::json!(["frontend", "urgent"]),
1124                extra: BTreeMap::new(),
1125            }),
1126            "bbb",
1127            2000,
1128        );
1129        projector.project_event(&update).unwrap();
1130
1131        let labels = query::get_labels(&conn, "bn-001").unwrap();
1132        assert_eq!(labels.len(), 2);
1133        assert_eq!(labels[0].label, "frontend");
1134        assert_eq!(labels[1].label, "urgent");
1135    }
1136
1137    #[test]
1138    fn project_update_unknown_field_bumps_updated() {
1139        let conn = test_db();
1140        let projector = Projector::new(&conn);
1141        projector
1142            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1143            .unwrap();
1144
1145        let update = make_event(
1146            EventType::Update,
1147            "bn-001",
1148            EventData::Update(UpdateData {
1149                field: "future_field".into(),
1150                value: serde_json::json!("whatever"),
1151                extra: BTreeMap::new(),
1152            }),
1153            "bbb",
1154            2000,
1155        );
1156        projector.project_event(&update).unwrap();
1157
1158        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1159        assert_eq!(item.updated_at_us, 2000);
1160    }
1161
1162    // -----------------------------------------------------------------------
1163    // Move
1164    // -----------------------------------------------------------------------
1165
1166    #[test]
1167    fn project_move_updates_state() {
1168        let conn = test_db();
1169        let projector = Projector::new(&conn);
1170        projector
1171            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1172            .unwrap();
1173
1174        let mv = make_event(
1175            EventType::Move,
1176            "bn-001",
1177            EventData::Move(MoveData {
1178                state: State::Doing,
1179                reason: Some("Starting work".into()),
1180                extra: BTreeMap::new(),
1181            }),
1182            "bbb",
1183            2000,
1184        );
1185        projector.project_event(&mv).unwrap();
1186
1187        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1188        assert_eq!(item.state, "doing");
1189    }
1190
1191    // -----------------------------------------------------------------------
1192    // Assign / Unassign
1193    // -----------------------------------------------------------------------
1194
1195    #[test]
1196    fn project_assign_and_unassign() {
1197        let conn = test_db();
1198        let projector = Projector::new(&conn);
1199        projector
1200            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1201            .unwrap();
1202
1203        // Assign alice
1204        let assign = make_event(
1205            EventType::Assign,
1206            "bn-001",
1207            EventData::Assign(AssignData {
1208                agent: "alice".into(),
1209                action: AssignAction::Assign,
1210                extra: BTreeMap::new(),
1211            }),
1212            "bbb",
1213            2000,
1214        );
1215        projector.project_event(&assign).unwrap();
1216
1217        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1218        assert_eq!(assignees.len(), 1);
1219        assert_eq!(assignees[0].agent, "alice");
1220
1221        // Unassign alice
1222        let unassign = make_event(
1223            EventType::Assign,
1224            "bn-001",
1225            EventData::Assign(AssignData {
1226                agent: "alice".into(),
1227                action: AssignAction::Unassign,
1228                extra: BTreeMap::new(),
1229            }),
1230            "ccc",
1231            3000,
1232        );
1233        projector.project_event(&unassign).unwrap();
1234
1235        let assignees = query::get_assignees(&conn, "bn-001").unwrap();
1236        assert!(assignees.is_empty());
1237    }
1238
1239    // -----------------------------------------------------------------------
1240    // Comment
1241    // -----------------------------------------------------------------------
1242
1243    #[test]
1244    fn project_comment_inserts_row() {
1245        let conn = test_db();
1246        let projector = Projector::new(&conn);
1247        projector
1248            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1249            .unwrap();
1250
1251        let comment = make_event(
1252            EventType::Comment,
1253            "bn-001",
1254            EventData::Comment(CommentData {
1255                body: "This is a comment".into(),
1256                extra: BTreeMap::new(),
1257            }),
1258            "bbb",
1259            2000,
1260        );
1261        projector.project_event(&comment).unwrap();
1262
1263        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1264        assert_eq!(comments.len(), 1);
1265        assert_eq!(comments[0].body, "This is a comment");
1266        assert_eq!(comments[0].author, "test-agent");
1267        assert_eq!(comments[0].event_hash, "blake3:bbb");
1268    }
1269
1270    // -----------------------------------------------------------------------
1271    // Link / Unlink
1272    // -----------------------------------------------------------------------
1273
1274    #[test]
1275    fn project_link_and_unlink() {
1276        let conn = test_db();
1277        let projector = Projector::new(&conn);
1278        projector
1279            .project_event(&make_create("bn-001", "Blocker", "aaa", 1000))
1280            .unwrap();
1281        projector
1282            .project_event(&make_create("bn-002", "Blocked", "bbb", 1001))
1283            .unwrap();
1284
1285        // Link
1286        let link = make_event(
1287            EventType::Link,
1288            "bn-002",
1289            EventData::Link(LinkData {
1290                target: "bn-001".into(),
1291                link_type: "blocks".into(),
1292                extra: BTreeMap::new(),
1293            }),
1294            "ccc",
1295            2000,
1296        );
1297        projector.project_event(&link).unwrap();
1298
1299        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1300        assert_eq!(deps.len(), 1);
1301        assert_eq!(deps[0].depends_on_item_id, "bn-001");
1302
1303        // Unlink
1304        let unlink = make_event(
1305            EventType::Unlink,
1306            "bn-002",
1307            EventData::Unlink(UnlinkData {
1308                target: "bn-001".into(),
1309                link_type: Some("blocks".into()),
1310                extra: BTreeMap::new(),
1311            }),
1312            "ddd",
1313            3000,
1314        );
1315        projector.project_event(&unlink).unwrap();
1316
1317        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1318        assert!(deps.is_empty());
1319    }
1320
1321    // -----------------------------------------------------------------------
1322    // Delete
1323    // -----------------------------------------------------------------------
1324
1325    #[test]
1326    fn project_delete_soft_deletes() {
1327        let conn = test_db();
1328        let projector = Projector::new(&conn);
1329        projector
1330            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1331            .unwrap();
1332
1333        let delete = make_event(
1334            EventType::Delete,
1335            "bn-001",
1336            EventData::Delete(DeleteData {
1337                reason: Some("Duplicate".into()),
1338                extra: BTreeMap::new(),
1339            }),
1340            "bbb",
1341            2000,
1342        );
1343        projector.project_event(&delete).unwrap();
1344
1345        // Not visible without include_deleted
1346        assert!(query::get_item(&conn, "bn-001", false).unwrap().is_none());
1347        // Visible with include_deleted
1348        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1349        assert!(item.is_deleted);
1350        assert_eq!(item.deleted_at_us, Some(2000));
1351    }
1352
1353    // -----------------------------------------------------------------------
1354    // Compact
1355    // -----------------------------------------------------------------------
1356
1357    #[test]
1358    fn project_compact_sets_summary() {
1359        let conn = test_db();
1360        let projector = Projector::new(&conn);
1361        projector
1362            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1363            .unwrap();
1364
1365        let compact = make_event(
1366            EventType::Compact,
1367            "bn-001",
1368            EventData::Compact(CompactData {
1369                summary: "TL;DR: auth fix".into(),
1370                extra: BTreeMap::new(),
1371            }),
1372            "bbb",
1373            2000,
1374        );
1375        projector.project_event(&compact).unwrap();
1376
1377        let item = query::get_item(&conn, "bn-001", false).unwrap().unwrap();
1378        assert_eq!(item.compact_summary.as_deref(), Some("TL;DR: auth fix"));
1379    }
1380
1381    // -----------------------------------------------------------------------
1382    // Snapshot
1383    // -----------------------------------------------------------------------
1384
1385    #[test]
1386    fn project_snapshot_stores_json() {
1387        let conn = test_db();
1388        let projector = Projector::new(&conn);
1389        projector
1390            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1391            .unwrap();
1392
1393        let snapshot = make_event(
1394            EventType::Snapshot,
1395            "bn-001",
1396            EventData::Snapshot(SnapshotData {
1397                state: serde_json::json!({"id": "bn-001", "title": "Snapshotted"}),
1398                extra: BTreeMap::new(),
1399            }),
1400            "bbb",
1401            2000,
1402        );
1403        projector.project_event(&snapshot).unwrap();
1404
1405        let row: String = conn
1406            .query_row(
1407                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1408                [],
1409                |row| row.get(0),
1410            )
1411            .unwrap();
1412        let parsed: serde_json::Value = serde_json::from_str(&row).unwrap();
1413        assert_eq!(parsed["title"], "Snapshotted");
1414    }
1415
1416    // -----------------------------------------------------------------------
1417    // Redact
1418    // -----------------------------------------------------------------------
1419
1420    #[test]
1421    fn project_redact_records_and_blanks_comment() {
1422        let conn = test_db();
1423        let projector = Projector::new(&conn);
1424        projector
1425            .project_event(&make_create("bn-001", "Item", "aaa", 1000))
1426            .unwrap();
1427
1428        // First add a comment
1429        let comment = make_event(
1430            EventType::Comment,
1431            "bn-001",
1432            EventData::Comment(CommentData {
1433                body: "Secret password: hunter2".into(),
1434                extra: BTreeMap::new(),
1435            }),
1436            "comment_hash",
1437            2000,
1438        );
1439        projector.project_event(&comment).unwrap();
1440
1441        // Redact it
1442        let redact = make_event(
1443            EventType::Redact,
1444            "bn-001",
1445            EventData::Redact(RedactData {
1446                target_hash: "blake3:comment_hash".into(),
1447                reason: "Accidental secret".into(),
1448                extra: BTreeMap::new(),
1449            }),
1450            "redact_hash",
1451            3000,
1452        );
1453        projector.project_event(&redact).unwrap();
1454
1455        // Check redaction record
1456        let count: i64 = conn
1457            .query_row(
1458                "SELECT COUNT(*) FROM event_redactions WHERE target_event_hash = 'blake3:comment_hash'",
1459                [],
1460                |row| row.get(0),
1461            )
1462            .unwrap();
1463        assert_eq!(count, 1);
1464
1465        // Check comment body is redacted
1466        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1467        assert_eq!(comments.len(), 1);
1468        assert_eq!(comments[0].body, "[redacted]");
1469    }
1470
1471    // -----------------------------------------------------------------------
1472    // Dedup
1473    // -----------------------------------------------------------------------
1474
1475    #[test]
1476    fn duplicate_events_are_skipped() {
1477        let conn = test_db();
1478        let projector = Projector::new(&conn);
1479
1480        let event = make_create("bn-001", "Item", "aaa", 1000);
1481        assert!(projector.project_event(&event).unwrap()); // first time
1482        assert!(!projector.project_event(&event).unwrap()); // duplicate
1483
1484        // Only one item created
1485        let count: i64 = conn
1486            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1487            .unwrap();
1488        assert_eq!(count, 1);
1489    }
1490
1491    #[test]
1492    fn batch_dedup_counts() {
1493        let conn = test_db();
1494        let projector = Projector::new(&conn);
1495
1496        let event1 = make_create("bn-001", "Item 1", "aaa", 1000);
1497        let event2 = make_create("bn-002", "Item 2", "bbb", 1001);
1498
1499        // Project first batch
1500        let stats1 = projector
1501            .project_batch(&[event1.clone(), event2.clone()])
1502            .unwrap();
1503        assert_eq!(stats1.projected, 2);
1504        assert_eq!(stats1.duplicates, 0);
1505
1506        // Replay same batch — all duplicates
1507        let stats2 = projector.project_batch(&[event1, event2]).unwrap();
1508        assert_eq!(stats2.projected, 0);
1509        assert_eq!(stats2.duplicates, 2);
1510    }
1511
1512    // -----------------------------------------------------------------------
1513    // Full replay: incremental matches full rebuild
1514    // -----------------------------------------------------------------------
1515
1516    #[test]
1517    fn incremental_matches_full_replay() {
1518        let events = vec![
1519            make_create("bn-001", "Auth bug", "h1", 1000),
1520            make_event(
1521                EventType::Move,
1522                "bn-001",
1523                EventData::Move(MoveData {
1524                    state: State::Doing,
1525                    reason: None,
1526                    extra: BTreeMap::new(),
1527                }),
1528                "h2",
1529                2000,
1530            ),
1531            make_event(
1532                EventType::Assign,
1533                "bn-001",
1534                EventData::Assign(AssignData {
1535                    agent: "alice".into(),
1536                    action: AssignAction::Assign,
1537                    extra: BTreeMap::new(),
1538                }),
1539                "h3",
1540                3000,
1541            ),
1542            make_event(
1543                EventType::Comment,
1544                "bn-001",
1545                EventData::Comment(CommentData {
1546                    body: "Working on it".into(),
1547                    extra: BTreeMap::new(),
1548                }),
1549                "h4",
1550                4000,
1551            ),
1552            make_event(
1553                EventType::Update,
1554                "bn-001",
1555                EventData::Update(UpdateData {
1556                    field: "title".into(),
1557                    value: serde_json::json!("Auth bug (fixed)"),
1558                    extra: BTreeMap::new(),
1559                }),
1560                "h5",
1561                5000,
1562            ),
1563            make_event(
1564                EventType::Move,
1565                "bn-001",
1566                EventData::Move(MoveData {
1567                    state: State::Done,
1568                    reason: Some("Shipped".into()),
1569                    extra: BTreeMap::new(),
1570                }),
1571                "h6",
1572                6000,
1573            ),
1574        ];
1575
1576        // Full replay
1577        let conn_full = test_db();
1578        let proj_full = Projector::new(&conn_full);
1579        proj_full.project_batch(&events).unwrap();
1580
1581        // Incremental: one by one
1582        let conn_inc = test_db();
1583        let proj_inc = Projector::new(&conn_inc);
1584        for event in &events {
1585            proj_inc.project_event(event).unwrap();
1586        }
1587
1588        // Compare results
1589        let item_full = query::get_item(&conn_full, "bn-001", false)
1590            .unwrap()
1591            .unwrap();
1592        let item_inc = query::get_item(&conn_inc, "bn-001", false)
1593            .unwrap()
1594            .unwrap();
1595
1596        assert_eq!(item_full.title, item_inc.title);
1597        assert_eq!(item_full.state, item_inc.state);
1598        assert_eq!(item_full.updated_at_us, item_inc.updated_at_us);
1599
1600        let assignees_full = query::get_assignees(&conn_full, "bn-001").unwrap();
1601        let assignees_inc = query::get_assignees(&conn_inc, "bn-001").unwrap();
1602        assert_eq!(assignees_full.len(), assignees_inc.len());
1603
1604        let comments_full = query::get_comments(&conn_full, "bn-001", None, None).unwrap();
1605        let comments_inc = query::get_comments(&conn_inc, "bn-001", None, None).unwrap();
1606        assert_eq!(comments_full.len(), comments_inc.len());
1607    }
1608
1609    // -----------------------------------------------------------------------
1610    // Full rebuild (clear + replay)
1611    // -----------------------------------------------------------------------
1612
1613    #[test]
1614    fn clear_and_replay_produces_same_result() {
1615        let conn = test_db();
1616        let projector = Projector::new(&conn);
1617
1618        let events = vec![
1619            make_create("bn-001", "Item 1", "h1", 1000),
1620            make_create("bn-002", "Item 2", "h2", 1001),
1621            make_event(
1622                EventType::Link,
1623                "bn-002",
1624                EventData::Link(LinkData {
1625                    target: "bn-001".into(),
1626                    link_type: "blocks".into(),
1627                    extra: BTreeMap::new(),
1628                }),
1629                "h3",
1630                2000,
1631            ),
1632        ];
1633
1634        // First pass
1635        projector.project_batch(&events).unwrap();
1636        let count1: i64 = conn
1637            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1638            .unwrap();
1639        assert_eq!(count1, 2);
1640
1641        // Clear and replay
1642        clear_projection(&conn).unwrap();
1643        let count_after_clear: i64 = conn
1644            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1645            .unwrap();
1646        assert_eq!(count_after_clear, 0);
1647
1648        projector.project_batch(&events).unwrap();
1649        let count2: i64 = conn
1650            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
1651            .unwrap();
1652        assert_eq!(count2, 2);
1653
1654        let deps = query::get_dependencies(&conn, "bn-002").unwrap();
1655        assert_eq!(deps.len(), 1);
1656    }
1657
1658    // -----------------------------------------------------------------------
1659    // Placeholder item creation
1660    // -----------------------------------------------------------------------
1661
1662    #[test]
1663    fn events_on_missing_item_create_placeholder() {
1664        let conn = test_db();
1665        let projector = Projector::new(&conn);
1666
1667        // Comment on an item that hasn't been created yet
1668        let comment = make_event(
1669            EventType::Comment,
1670            "bn-ghost",
1671            EventData::Comment(CommentData {
1672                body: "Comment on missing item".into(),
1673                extra: BTreeMap::new(),
1674            }),
1675            "ccc",
1676            2000,
1677        );
1678        projector.project_event(&comment).unwrap();
1679
1680        // Item exists as placeholder
1681        let item = query::get_item(&conn, "bn-ghost", false).unwrap().unwrap();
1682        assert_eq!(item.title, ""); // placeholder has empty title
1683        assert_eq!(item.state, "open");
1684    }
1685
1686    // -----------------------------------------------------------------------
1687    // FTS integration
1688    // -----------------------------------------------------------------------
1689
1690    #[test]
1691    fn project_create_populates_fts() {
1692        let conn = test_db();
1693        let projector = Projector::new(&conn);
1694        projector
1695            .project_event(&make_create(
1696                "bn-001",
1697                "Authentication timeout",
1698                "aaa",
1699                1000,
1700            ))
1701            .unwrap();
1702
1703        let hits = query::search(&conn, "authentication", 10).unwrap();
1704        assert_eq!(hits.len(), 1);
1705        assert_eq!(hits[0].item_id, "bn-001");
1706    }
1707
1708    #[test]
1709    fn project_update_title_updates_fts() {
1710        let conn = test_db();
1711        let projector = Projector::new(&conn);
1712        projector
1713            .project_event(&make_create("bn-001", "Old title", "aaa", 1000))
1714            .unwrap();
1715
1716        let update = make_event(
1717            EventType::Update,
1718            "bn-001",
1719            EventData::Update(UpdateData {
1720                field: "title".into(),
1721                value: serde_json::json!("Authorization failure"),
1722                extra: BTreeMap::new(),
1723            }),
1724            "bbb",
1725            2000,
1726        );
1727        projector.project_event(&update).unwrap();
1728
1729        // Old title not found
1730        let hits_old = query::search(&conn, "Old", 10).unwrap();
1731        assert!(hits_old.is_empty());
1732
1733        // New title found
1734        let hits_new = query::search(&conn, "authorization", 10).unwrap();
1735        assert_eq!(hits_new.len(), 1);
1736    }
1737
1738    // -----------------------------------------------------------------------
1739    // Batch stats
1740    // -----------------------------------------------------------------------
1741
1742    #[test]
1743    fn batch_reports_correct_stats() {
1744        let conn = test_db();
1745        let projector = Projector::new(&conn);
1746
1747        let events = vec![
1748            make_create("bn-001", "Item 1", "h1", 1000),
1749            make_create("bn-002", "Item 2", "h2", 1001),
1750            make_create("bn-003", "Item 3", "h3", 1002),
1751        ];
1752
1753        let stats = projector.project_batch(&events).unwrap();
1754        assert_eq!(stats.projected, 3);
1755        assert_eq!(stats.duplicates, 0);
1756        assert_eq!(stats.errors, 0);
1757    }
1758
1759    // -----------------------------------------------------------------------
1760    // All 11 event types in sequence
1761    // -----------------------------------------------------------------------
1762
1763    #[test]
1764    fn full_lifecycle_all_event_types() {
1765        let conn = test_db();
1766        let projector = Projector::new(&conn);
1767
1768        let mut events = vec![
1769            // 1. Create
1770            make_create("bn-001", "Auth bug", "h01", 1000),
1771            // Create a second item for linking
1772            make_create("bn-002", "Dep item", "h02", 1001),
1773        ];
1774
1775        // 2. Update title
1776        events.push(make_event(
1777            EventType::Update,
1778            "bn-001",
1779            EventData::Update(UpdateData {
1780                field: "title".into(),
1781                value: serde_json::json!("Auth timeout bug"),
1782                extra: BTreeMap::new(),
1783            }),
1784            "h03",
1785            2000,
1786        ));
1787
1788        // 3. Move to doing
1789        events.push(make_event(
1790            EventType::Move,
1791            "bn-001",
1792            EventData::Move(MoveData {
1793                state: State::Doing,
1794                reason: None,
1795                extra: BTreeMap::new(),
1796            }),
1797            "h04",
1798            3000,
1799        ));
1800
1801        // 4. Assign
1802        events.push(make_event(
1803            EventType::Assign,
1804            "bn-001",
1805            EventData::Assign(AssignData {
1806                agent: "alice".into(),
1807                action: AssignAction::Assign,
1808                extra: BTreeMap::new(),
1809            }),
1810            "h05",
1811            4000,
1812        ));
1813
1814        // 5. Comment
1815        events.push(make_event(
1816            EventType::Comment,
1817            "bn-001",
1818            EventData::Comment(CommentData {
1819                body: "Found root cause".into(),
1820                extra: BTreeMap::new(),
1821            }),
1822            "h06",
1823            5000,
1824        ));
1825
1826        // 6. Link
1827        events.push(make_event(
1828            EventType::Link,
1829            "bn-001",
1830            EventData::Link(LinkData {
1831                target: "bn-002".into(),
1832                link_type: "blocks".into(),
1833                extra: BTreeMap::new(),
1834            }),
1835            "h07",
1836            6000,
1837        ));
1838
1839        // 7. Unlink
1840        events.push(make_event(
1841            EventType::Unlink,
1842            "bn-001",
1843            EventData::Unlink(UnlinkData {
1844                target: "bn-002".into(),
1845                link_type: Some("blocks".into()),
1846                extra: BTreeMap::new(),
1847            }),
1848            "h08",
1849            7000,
1850        ));
1851
1852        // 8. Compact
1853        events.push(make_event(
1854            EventType::Compact,
1855            "bn-001",
1856            EventData::Compact(CompactData {
1857                summary: "Auth token refresh race".into(),
1858                extra: BTreeMap::new(),
1859            }),
1860            "h09",
1861            8000,
1862        ));
1863
1864        // 9. Snapshot
1865        events.push(make_event(
1866            EventType::Snapshot,
1867            "bn-001",
1868            EventData::Snapshot(SnapshotData {
1869                state: serde_json::json!({"id": "bn-001", "resolved": true}),
1870                extra: BTreeMap::new(),
1871            }),
1872            "h10",
1873            9000,
1874        ));
1875
1876        // 10. Redact the comment
1877        events.push(make_event(
1878            EventType::Redact,
1879            "bn-001",
1880            EventData::Redact(RedactData {
1881                target_hash: "blake3:h06".into(),
1882                reason: "Contained secret".into(),
1883                extra: BTreeMap::new(),
1884            }),
1885            "h11",
1886            10000,
1887        ));
1888
1889        // 11. Delete
1890        events.push(make_event(
1891            EventType::Delete,
1892            "bn-001",
1893            EventData::Delete(DeleteData {
1894                reason: Some("Duplicate".into()),
1895                extra: BTreeMap::new(),
1896            }),
1897            "h12",
1898            11000,
1899        ));
1900
1901        let stats = projector.project_batch(&events).unwrap();
1902        assert_eq!(stats.projected, 12); // 2 creates + 10 mutations
1903        assert_eq!(stats.duplicates, 0);
1904        assert_eq!(stats.errors, 0);
1905
1906        // Verify final state
1907        let item = query::get_item(&conn, "bn-001", true).unwrap().unwrap();
1908        assert_eq!(item.title, "Auth timeout bug");
1909        assert_eq!(item.state, "doing");
1910        assert!(item.is_deleted);
1911        assert_eq!(
1912            item.compact_summary.as_deref(),
1913            Some("Auth token refresh race")
1914        );
1915        let snapshot: Option<String> = conn
1916            .query_row(
1917                "SELECT snapshot_json FROM items WHERE item_id = 'bn-001'",
1918                [],
1919                |row| row.get(0),
1920            )
1921            .unwrap();
1922        assert!(snapshot.is_some());
1923
1924        // Comment was redacted
1925        let comments = query::get_comments(&conn, "bn-001", None, None).unwrap();
1926        assert_eq!(comments.len(), 1);
1927        assert_eq!(comments[0].body, "[redacted]");
1928
1929        // Dependencies were unlinked
1930        let deps = query::get_dependencies(&conn, "bn-001").unwrap();
1931        assert!(deps.is_empty());
1932
1933        // Redaction record exists
1934        let redaction_count: i64 = conn
1935            .query_row("SELECT COUNT(*) FROM event_redactions", [], |row| {
1936                row.get(0)
1937            })
1938            .unwrap();
1939        assert_eq!(redaction_count, 1);
1940    }
1941
1942    #[test]
1943    fn late_create_populates_placeholder() {
1944        let conn = test_db();
1945        let projector = Projector::new(&conn);
1946
1947        // 1. Comment triggers placeholder creation
1948        let comment = make_event(
1949            EventType::Comment,
1950            "bn-late",
1951            EventData::Comment(CommentData {
1952                body: "Comment first".into(),
1953                extra: BTreeMap::new(),
1954            }),
1955            "h1",
1956            1000,
1957        );
1958        projector.project_event(&comment).unwrap();
1959
1960        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1961        assert_eq!(item.title, "");
1962        assert_eq!(item.created_at_us, 1000);
1963
1964        // 2. Late Create arrives
1965        let create = make_create("bn-late", "Real Title", "h2", 900);
1966
1967        projector.project_event(&create).unwrap();
1968
1969        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1970        assert_eq!(item.title, "Real Title");
1971        assert_eq!(item.created_at_us, 900);
1972    }
1973
1974    #[test]
1975    fn late_create_backfills_placeholder_after_field_update() {
1976        let conn = test_db();
1977        let projector = Projector::new(&conn);
1978
1979        let update = make_event(
1980            EventType::Update,
1981            "bn-late",
1982            EventData::Update(UpdateData {
1983                field: "title".into(),
1984                value: serde_json::json!("Updated before create"),
1985                extra: BTreeMap::new(),
1986            }),
1987            "h1",
1988            1000,
1989        );
1990        projector.project_event(&update).unwrap();
1991
1992        let create = make_create("bn-late", "Initial title", "h2", 900);
1993        projector.project_event(&create).unwrap();
1994
1995        let item = query::get_item(&conn, "bn-late", false).unwrap().unwrap();
1996        assert_eq!(item.title, "Updated before create");
1997        assert_eq!(item.kind, "task");
1998        assert_eq!(item.size.as_deref(), Some("m"));
1999        assert_eq!(item.description.as_deref(), Some("A detailed description"));
2000        assert_eq!(item.created_at_us, 900);
2001        assert_eq!(item.updated_at_us, 1000);
2002
2003        let labels = query::get_labels(&conn, "bn-late").unwrap();
2004        assert_eq!(labels.len(), 2);
2005        assert_eq!(item.search_labels, "auth backend");
2006    }
2007
2008    /// Regression: Projector::new() must create the projected_events table
2009    /// on a migrated DB that never had ensure_tracking_table() called.
2010    /// Without this, fresh installs fail with "record projected event hash".
2011    #[test]
2012    fn projector_new_creates_tracking_table_on_fresh_db() {
2013        let mut conn = Connection::open_in_memory().expect("open in-memory db");
2014        migrations::migrate(&mut conn).expect("migrate");
2015        // Do NOT call ensure_tracking_table — simulate fresh install.
2016
2017        // Projector::new should create the table automatically.
2018        let projector = Projector::new(&conn);
2019        let event = make_create("bn-fresh", "Fresh item", "h1", 1000);
2020        projector
2021            .project_event(&event)
2022            .expect("project_event should succeed on fresh DB");
2023
2024        let item = query::get_item(&conn, "bn-fresh", false).unwrap().unwrap();
2025        assert_eq!(item.title, "Fresh item");
2026    }
2027}