1use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct RebuildReport {
23 pub event_count: usize,
25 pub item_count: usize,
27 pub elapsed: std::time::Duration,
29 pub shard_count: usize,
31 pub fts5_rebuilt: bool,
33}
34
35pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
56 let start = Instant::now();
57
58 if db_path.exists() {
60 std::fs::remove_file(db_path)
61 .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
62 let wal_path = db_path.with_extension("db-wal");
64 let shm_path = db_path.with_extension("db-shm");
65 let _ = std::fs::remove_file(wal_path);
66 let _ = std::fs::remove_file(shm_path);
67 }
68
69 let conn = open_projection(db_path).context("create fresh projection database")?;
71 project::ensure_tracking_table(&conn).context("create tracking table")?;
72
73 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
75 let shard_mgr = ShardManager::new(bones_dir);
76
77 let shards = shard_mgr
78 .list_shards()
79 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
80 let shard_count = shards.len();
81
82 let mut version_checked = false;
85 let mut shard_version = crate::event::parser::CURRENT_VERSION;
86 let mut line_no = 0;
87 let mut total_projected = 0;
88 let mut total_duplicates = 0;
89 let mut last_event_hash = None;
90 let mut total_byte_len = 0;
91
92 let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
93 let projector = project::Projector::new(&conn);
94
95 let shard_line_iter = shard_mgr.replay_lines()?;
96
97 for line_res in shard_line_iter {
98 let (offset, line): (usize, String) =
99 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
100 line_no += 1;
101 total_byte_len = offset + line.len();
102
103 if !version_checked && line.trim_start().starts_with("# bones event log v") {
104 version_checked = true;
105 shard_version = crate::event::parser::detect_version(&line)
106 .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
107 continue;
108 }
109
110 match crate::event::parser::parse_line(&line) {
111 Ok(crate::event::parser::ParsedLine::Event(event)) => {
112 let event = crate::event::migrate_event(*event, shard_version)
113 .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
114
115 last_event_hash = Some(event.event_hash.clone());
116 current_batch.push(event);
117
118 if current_batch.len() >= 1000 {
119 let stats = projector
120 .project_batch(¤t_batch)
121 .context("project batch during rebuild")?;
122 total_projected += stats.projected;
123 total_duplicates += stats.duplicates;
124 current_batch.clear();
125 }
126 }
127 Ok(
128 crate::event::parser::ParsedLine::Comment(_)
129 | crate::event::parser::ParsedLine::Blank,
130 ) => {}
131 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
132 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
133 }
134 Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
135 }
136 }
137
138 if !current_batch.is_empty() {
140 let stats = projector
141 .project_batch(¤t_batch)
142 .context("project final batch during rebuild")?;
143 total_projected += stats.projected;
144 total_duplicates += stats.duplicates;
145 }
146
147 let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
149 crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
150 .context("update projection cursor after rebuild")?;
151
152 let item_count: i64 = conn
154 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
155 .context("count items after rebuild")?;
156
157 let elapsed = start.elapsed();
158
159 tracing::info!(
160 event_count = total_projected,
161 duplicates = total_duplicates,
162 item_count,
163 shard_count,
164 elapsed_ms = elapsed.as_millis(),
165 "projection rebuild complete"
166 );
167
168 Ok(RebuildReport {
169 event_count: total_projected,
170 item_count: usize::try_from(item_count).unwrap_or(0),
171 elapsed,
172 shard_count,
173 fts5_rebuilt: true, })
175}
176
177#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::event::Event;
185 use crate::event::data::*;
186 use crate::event::types::EventType;
187 use crate::event::writer;
188 use crate::model::item::{Kind, Size, Urgency};
189 use crate::model::item_id::ItemId;
190 use crate::shard::ShardManager;
191 use std::collections::BTreeMap;
192 use tempfile::TempDir;
193
194 fn setup_bones_dir() -> (TempDir, ShardManager) {
195 let dir = TempDir::new().expect("create tempdir");
196 let shard_mgr = ShardManager::new(dir.path());
197 shard_mgr.ensure_dirs().expect("ensure dirs");
198 shard_mgr.init().expect("init shard");
199 (dir, shard_mgr)
200 }
201
202 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
203 let mut event = Event {
204 wall_ts_us: ts,
205 agent: "test-agent".into(),
206 itc: "itc:AQ".into(),
207 parents: vec![],
208 event_type: EventType::Create,
209 item_id: ItemId::new_unchecked(id),
210 data: EventData::Create(CreateData {
211 title: title.into(),
212 kind: Kind::Task,
213 size: Some(Size::M),
214 urgency: Urgency::Default,
215 labels: vec!["test".into()],
216 parent: None,
217 causation: None,
218 description: Some(format!("Description for {title}")),
219 extra: BTreeMap::new(),
220 }),
221 event_hash: String::new(),
222 };
223 writer::write_event(&mut event).expect("compute hash");
225 event
226 }
227
228 fn make_move_event(
229 id: &str,
230 state: crate::model::item::State,
231 ts: i64,
232 parent_hash: &str,
233 ) -> Event {
234 let mut event = Event {
235 wall_ts_us: ts,
236 agent: "test-agent".into(),
237 itc: "itc:AQ".into(),
238 parents: vec![parent_hash.into()],
239 event_type: EventType::Move,
240 item_id: ItemId::new_unchecked(id),
241 data: EventData::Move(MoveData {
242 state,
243 reason: None,
244 extra: BTreeMap::new(),
245 }),
246 event_hash: String::new(),
247 };
248 writer::write_event(&mut event).expect("compute hash");
249 event
250 }
251
252 fn append_event(shard_mgr: &ShardManager, event: &Event) {
253 let line = writer::write_line(event).expect("serialize event");
254 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
255 shard_mgr
256 .append_raw(year, month, &line)
257 .expect("append event");
258 }
259
260 #[test]
261 fn rebuild_empty_event_log() {
262 let (dir, _shard_mgr) = setup_bones_dir();
263 let db_path = dir.path().join("bones.db");
264 let events_dir = dir.path().join("events");
265
266 let report = rebuild(&events_dir, &db_path).unwrap();
267 assert_eq!(report.event_count, 0);
268 assert_eq!(report.item_count, 0);
269 assert_eq!(report.shard_count, 1); assert!(report.fts5_rebuilt);
271
272 let conn = open_projection(&db_path).unwrap();
274 let count: i64 = conn
275 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
276 .unwrap();
277 assert_eq!(count, 0);
278 }
279
280 #[test]
281 fn rebuild_with_events() {
282 let (dir, shard_mgr) = setup_bones_dir();
283 let db_path = dir.path().join("bones.db");
284 let events_dir = dir.path().join("events");
285
286 let create1 = make_create_event("bn-001", "First item", 1000);
288 let create2 = make_create_event("bn-002", "Second item", 1001);
289 let mv = make_move_event(
290 "bn-001",
291 crate::model::item::State::Doing,
292 2000,
293 &create1.event_hash,
294 );
295
296 append_event(&shard_mgr, &create1);
297 append_event(&shard_mgr, &create2);
298 append_event(&shard_mgr, &mv);
299
300 let report = rebuild(&events_dir, &db_path).unwrap();
301 assert_eq!(report.event_count, 3);
302 assert_eq!(report.item_count, 2);
303
304 let conn = open_projection(&db_path).unwrap();
306 let item: String = conn
307 .query_row(
308 "SELECT state FROM items WHERE item_id = 'bn-001'",
309 [],
310 |row| row.get(0),
311 )
312 .unwrap();
313 assert_eq!(item, "doing");
314 }
315
316 #[test]
317 fn rebuild_replaces_existing_db() {
318 let (dir, shard_mgr) = setup_bones_dir();
319 let db_path = dir.path().join("bones.db");
320 let events_dir = dir.path().join("events");
321
322 let create1 = make_create_event("bn-001", "Item 1", 1000);
324 append_event(&shard_mgr, &create1);
325
326 let report1 = rebuild(&events_dir, &db_path).unwrap();
327 assert_eq!(report1.event_count, 1);
328 assert_eq!(report1.item_count, 1);
329
330 let create2 = make_create_event("bn-002", "Item 2", 1001);
332 append_event(&shard_mgr, &create2);
333
334 let report2 = rebuild(&events_dir, &db_path).unwrap();
335 assert_eq!(report2.event_count, 2);
336 assert_eq!(report2.item_count, 2);
337 }
338
339 #[test]
340 fn rebuild_is_deterministic() {
341 let (dir, shard_mgr) = setup_bones_dir();
342 let events_dir = dir.path().join("events");
343
344 let create1 = make_create_event("bn-001", "Deterministic test", 1000);
345 let create2 = make_create_event("bn-002", "Another item", 1001);
346 append_event(&shard_mgr, &create1);
347 append_event(&shard_mgr, &create2);
348
349 let db_path_a = dir.path().join("bones_a.db");
351 let db_path_b = dir.path().join("bones_b.db");
352
353 let report_a = rebuild(&events_dir, &db_path_a).unwrap();
354 let report_b = rebuild(&events_dir, &db_path_b).unwrap();
355
356 assert_eq!(report_a.event_count, report_b.event_count);
357 assert_eq!(report_a.item_count, report_b.item_count);
358
359 let conn_a = open_projection(&db_path_a).unwrap();
361 let conn_b = open_projection(&db_path_b).unwrap();
362
363 let titles_a: Vec<String> = {
364 let mut stmt = conn_a
365 .prepare("SELECT title FROM items ORDER BY item_id")
366 .unwrap();
367 stmt.query_map([], |row| row.get(0))
368 .unwrap()
369 .map(|r| r.unwrap())
370 .collect()
371 };
372
373 let titles_b: Vec<String> = {
374 let mut stmt = conn_b
375 .prepare("SELECT title FROM items ORDER BY item_id")
376 .unwrap();
377 stmt.query_map([], |row| row.get(0))
378 .unwrap()
379 .map(|r| r.unwrap())
380 .collect()
381 };
382
383 assert_eq!(titles_a, titles_b);
384 }
385
386 #[test]
387 fn rebuild_populates_fts() {
388 let (dir, shard_mgr) = setup_bones_dir();
389 let db_path = dir.path().join("bones.db");
390 let events_dir = dir.path().join("events");
391
392 let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
393 append_event(&shard_mgr, &create);
394
395 rebuild(&events_dir, &db_path).unwrap();
396
397 let conn = open_projection(&db_path).unwrap();
398 let hits: i64 = conn
399 .query_row(
400 "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
401 [],
402 |row| row.get(0),
403 )
404 .unwrap();
405 assert_eq!(hits, 1);
406 }
407
408 #[test]
409 fn rebuild_updates_projection_cursor() {
410 let (dir, shard_mgr) = setup_bones_dir();
411 let db_path = dir.path().join("bones.db");
412 let events_dir = dir.path().join("events");
413
414 let create = make_create_event("bn-001", "Item", 1000);
415 append_event(&shard_mgr, &create);
416
417 rebuild(&events_dir, &db_path).unwrap();
418
419 let conn = open_projection(&db_path).unwrap();
420 let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
421 assert!(offset > 0, "cursor offset should be non-zero after rebuild");
422 assert!(hash.is_some(), "cursor hash should be set after rebuild");
423 }
424
425 #[test]
426 fn rebuild_handles_duplicate_events() {
427 let (dir, shard_mgr) = setup_bones_dir();
428 let db_path = dir.path().join("bones.db");
429 let events_dir = dir.path().join("events");
430
431 let create = make_create_event("bn-001", "Item", 1000);
433 append_event(&shard_mgr, &create);
434 append_event(&shard_mgr, &create);
435
436 let report = rebuild(&events_dir, &db_path).unwrap();
437 assert_eq!(report.event_count, 1);
439 assert_eq!(report.item_count, 1);
440 }
441
442 #[test]
443 fn rebuild_with_bd_prefix_events() {
444 let (dir, shard_mgr) = setup_bones_dir();
445 let db_path = dir.path().join("bones.db");
446 let events_dir = dir.path().join("events");
447
448 let create1 = make_create_event("bd-9mx", "Parent item", 1000);
450 let create2 = make_create_event("bd-4kz", "Child item", 1001);
451
452 append_event(&shard_mgr, &create1);
453 append_event(&shard_mgr, &create2);
454
455 let report = rebuild(&events_dir, &db_path).unwrap();
456 assert_eq!(
457 report.event_count, 2,
458 "should project 2 events with bd- prefix"
459 );
460 assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
461 }
462
463 #[test]
464 fn rebuild_performance_reasonable() {
465 let (dir, shard_mgr) = setup_bones_dir();
466 let db_path = dir.path().join("bones.db");
467 let events_dir = dir.path().join("events");
468
469 for i in 0..100_u32 {
471 let create = make_create_event(
472 &format!("bn-{i:04x}"),
473 &format!("Item {i}"),
474 i64::from(i) * 1000,
475 );
476 append_event(&shard_mgr, &create);
477 }
478
479 let report = rebuild(&events_dir, &db_path).unwrap();
480 assert_eq!(report.event_count, 100);
481 assert_eq!(report.item_count, 100);
482 assert!(
483 report.elapsed.as_millis() < 1000,
484 "rebuild of 100 items took {}ms, expected <1000ms",
485 report.elapsed.as_millis()
486 );
487 }
488}