1use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16const DEFAULT_REBUILD_BATCH_SIZE: usize = 4_000;
17
18fn rebuild_batch_size() -> usize {
19 std::env::var("BONES_REBUILD_BATCH_SIZE")
20 .ok()
21 .and_then(|v| v.parse::<usize>().ok())
22 .filter(|v| *v > 0)
23 .unwrap_or(DEFAULT_REBUILD_BATCH_SIZE)
24}
25
26fn configure_rebuild_pragmas(conn: &rusqlite::Connection) -> Result<()> {
27 conn.pragma_update(None, "temp_store", "MEMORY")
28 .context("PRAGMA temp_store = MEMORY")?;
29 conn.pragma_update(None, "cache_size", -131_072_i64)
30 .context("PRAGMA cache_size")?;
31 conn.pragma_update(None, "mmap_size", 268_435_456_i64)
32 .context("PRAGMA mmap_size")?;
33 Ok(())
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct RebuildReport {
43 pub event_count: usize,
45 pub item_count: usize,
47 pub elapsed: std::time::Duration,
49 pub shard_count: usize,
51 pub fts5_rebuilt: bool,
53}
54
55fn check_sealed_shard_integrity(shard_mgr: &ShardManager) -> Result<()> {
61 let issues = shard_mgr
62 .validate_sealed_shards()
63 .map_err(|e| anyhow::anyhow!("sealed shard validation: {e}"))?;
64 if !issues.is_empty() {
65 for issue in &issues {
66 tracing::error!(
67 shard = %issue.shard_name,
68 problem = %issue.problem,
69 "sealed shard integrity check failed"
70 );
71 }
72 anyhow::bail!(
73 "sealed shard corrupted: {} (run `bn doctor` to diagnose)",
74 issues[0].problem
75 );
76 }
77 Ok(())
78}
79
80pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
97 let start = Instant::now();
98
99 if db_path.exists() {
101 std::fs::remove_file(db_path)
102 .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
103 let wal_path = db_path.with_extension("db-wal");
105 let shm_path = db_path.with_extension("db-shm");
106 let _ = std::fs::remove_file(wal_path);
107 let _ = std::fs::remove_file(shm_path);
108 }
109
110 let conn = open_projection(db_path).context("create fresh projection database")?;
112 configure_rebuild_pragmas(&conn).context("configure rebuild sqlite pragmas")?;
113 project::ensure_tracking_table(&conn).context("create tracking table")?;
114
115 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
117 let shard_mgr = ShardManager::new(bones_dir);
118
119 let shards = shard_mgr
120 .list_shards()
121 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
122 let shard_count = shards.len();
123
124 check_sealed_shard_integrity(&shard_mgr)?;
125
126 let mut version_checked = false;
129 let mut shard_version = crate::event::parser::CURRENT_VERSION;
130 let mut line_no = 0;
131 let mut total_projected = 0;
132 let mut total_duplicates = 0;
133 let mut last_event_hash = None;
134 let mut total_byte_len = 0;
135
136 let batch_size = rebuild_batch_size();
137 let mut current_batch: Vec<Event> = Vec::with_capacity(batch_size);
138 let projector = project::Projector::new(&conn);
139
140 let shard_line_iter = shard_mgr.replay_lines()?;
141
142 for line_res in shard_line_iter {
143 let (offset, line): (usize, String) =
144 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
145 line_no += 1;
146 total_byte_len = offset + line.len();
147
148 if !version_checked && line.trim_start().starts_with("# bones event log v") {
149 version_checked = true;
150 shard_version = crate::event::parser::detect_version(&line)
151 .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
152 continue;
153 }
154
155 match crate::event::parser::parse_line(&line) {
156 Ok(crate::event::parser::ParsedLine::Event(event)) => {
157 let event = crate::event::migrate_event(*event, shard_version)
158 .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
159
160 last_event_hash = Some(event.event_hash.clone());
161 current_batch.push(event);
162
163 if current_batch.len() >= batch_size {
164 let stats = projector
165 .project_batch(¤t_batch)
166 .context("project batch during rebuild")?;
167 total_projected += stats.projected;
168 total_duplicates += stats.duplicates;
169 current_batch.clear();
170 }
171 }
172 Ok(
173 crate::event::parser::ParsedLine::Comment(_)
174 | crate::event::parser::ParsedLine::Blank,
175 ) => {}
176 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
177 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
178 }
179 Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
180 }
181 }
182
183 if !current_batch.is_empty() {
185 let stats = projector
186 .project_batch(¤t_batch)
187 .context("project final batch during rebuild")?;
188 total_projected += stats.projected;
189 total_duplicates += stats.duplicates;
190 }
191
192 let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
194 crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
195 .context("update projection cursor after rebuild")?;
196
197 let item_count: i64 = conn
199 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
200 .context("count items after rebuild")?;
201
202 let elapsed = start.elapsed();
203
204 tracing::info!(
205 event_count = total_projected,
206 duplicates = total_duplicates,
207 batch_size,
208 item_count,
209 shard_count,
210 elapsed_ms = elapsed.as_millis(),
211 "projection rebuild complete"
212 );
213
214 Ok(RebuildReport {
215 event_count: total_projected,
216 item_count: usize::try_from(item_count).unwrap_or(0),
217 elapsed,
218 shard_count,
219 fts5_rebuilt: true, })
221}
222
223#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::event::Event;
231 use crate::event::data::*;
232 use crate::event::types::EventType;
233 use crate::event::writer;
234 use crate::model::item::{Kind, Size, Urgency};
235 use crate::model::item_id::ItemId;
236 use crate::shard::ShardManager;
237 use std::collections::BTreeMap;
238 use tempfile::TempDir;
239
240 fn setup_bones_dir() -> (TempDir, ShardManager) {
241 let dir = TempDir::new().expect("create tempdir");
242 let shard_mgr = ShardManager::new(dir.path());
243 shard_mgr.ensure_dirs().expect("ensure dirs");
244 shard_mgr.init().expect("init shard");
245 (dir, shard_mgr)
246 }
247
248 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
249 let mut event = Event {
250 wall_ts_us: ts,
251 agent: "test-agent".into(),
252 itc: "itc:AQ".into(),
253 parents: vec![],
254 event_type: EventType::Create,
255 item_id: ItemId::new_unchecked(id),
256 data: EventData::Create(CreateData {
257 title: title.into(),
258 kind: Kind::Task,
259 size: Some(Size::M),
260 urgency: Urgency::Default,
261 labels: vec!["test".into()],
262 parent: None,
263 causation: None,
264 description: Some(format!("Description for {title}")),
265 extra: BTreeMap::new(),
266 }),
267 event_hash: String::new(),
268 };
269 writer::write_event(&mut event).expect("compute hash");
271 event
272 }
273
274 fn make_move_event(
275 id: &str,
276 state: crate::model::item::State,
277 ts: i64,
278 parent_hash: &str,
279 ) -> Event {
280 let mut event = Event {
281 wall_ts_us: ts,
282 agent: "test-agent".into(),
283 itc: "itc:AQ".into(),
284 parents: vec![parent_hash.into()],
285 event_type: EventType::Move,
286 item_id: ItemId::new_unchecked(id),
287 data: EventData::Move(MoveData {
288 state,
289 reason: None,
290 extra: BTreeMap::new(),
291 }),
292 event_hash: String::new(),
293 };
294 writer::write_event(&mut event).expect("compute hash");
295 event
296 }
297
298 fn append_event(shard_mgr: &ShardManager, event: &Event) {
299 let line = writer::write_line(event).expect("serialize event");
300 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
301 shard_mgr
302 .append_raw(year, month, &line)
303 .expect("append event");
304 }
305
306 #[test]
307 fn rebuild_empty_event_log() {
308 let (dir, _shard_mgr) = setup_bones_dir();
309 let db_path = dir.path().join("bones.db");
310 let events_dir = dir.path().join("events");
311
312 let report = rebuild(&events_dir, &db_path).unwrap();
313 assert_eq!(report.event_count, 0);
314 assert_eq!(report.item_count, 0);
315 assert_eq!(report.shard_count, 1); assert!(report.fts5_rebuilt);
317
318 let conn = open_projection(&db_path).unwrap();
320 let count: i64 = conn
321 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
322 .unwrap();
323 assert_eq!(count, 0);
324 }
325
326 #[test]
327 fn rebuild_with_events() {
328 let (dir, shard_mgr) = setup_bones_dir();
329 let db_path = dir.path().join("bones.db");
330 let events_dir = dir.path().join("events");
331
332 let create1 = make_create_event("bn-001", "First item", 1000);
334 let create2 = make_create_event("bn-002", "Second item", 1001);
335 let mv = make_move_event(
336 "bn-001",
337 crate::model::item::State::Doing,
338 2000,
339 &create1.event_hash,
340 );
341
342 append_event(&shard_mgr, &create1);
343 append_event(&shard_mgr, &create2);
344 append_event(&shard_mgr, &mv);
345
346 let report = rebuild(&events_dir, &db_path).unwrap();
347 assert_eq!(report.event_count, 3);
348 assert_eq!(report.item_count, 2);
349
350 let conn = open_projection(&db_path).unwrap();
352 let item: String = conn
353 .query_row(
354 "SELECT state FROM items WHERE item_id = 'bn-001'",
355 [],
356 |row| row.get(0),
357 )
358 .unwrap();
359 assert_eq!(item, "doing");
360 }
361
362 #[test]
363 fn rebuild_replaces_existing_db() {
364 let (dir, shard_mgr) = setup_bones_dir();
365 let db_path = dir.path().join("bones.db");
366 let events_dir = dir.path().join("events");
367
368 let create1 = make_create_event("bn-001", "Item 1", 1000);
370 append_event(&shard_mgr, &create1);
371
372 let report1 = rebuild(&events_dir, &db_path).unwrap();
373 assert_eq!(report1.event_count, 1);
374 assert_eq!(report1.item_count, 1);
375
376 let create2 = make_create_event("bn-002", "Item 2", 1001);
378 append_event(&shard_mgr, &create2);
379
380 let report2 = rebuild(&events_dir, &db_path).unwrap();
381 assert_eq!(report2.event_count, 2);
382 assert_eq!(report2.item_count, 2);
383 }
384
385 #[test]
386 fn rebuild_is_deterministic() {
387 let (dir, shard_mgr) = setup_bones_dir();
388 let events_dir = dir.path().join("events");
389
390 let create1 = make_create_event("bn-001", "Deterministic test", 1000);
391 let create2 = make_create_event("bn-002", "Another item", 1001);
392 append_event(&shard_mgr, &create1);
393 append_event(&shard_mgr, &create2);
394
395 let db_path_a = dir.path().join("bones_a.db");
397 let db_path_b = dir.path().join("bones_b.db");
398
399 let report_a = rebuild(&events_dir, &db_path_a).unwrap();
400 let report_b = rebuild(&events_dir, &db_path_b).unwrap();
401
402 assert_eq!(report_a.event_count, report_b.event_count);
403 assert_eq!(report_a.item_count, report_b.item_count);
404
405 let conn_a = open_projection(&db_path_a).unwrap();
407 let conn_b = open_projection(&db_path_b).unwrap();
408
409 let titles_a: Vec<String> = {
410 let mut stmt = conn_a
411 .prepare("SELECT title FROM items ORDER BY item_id")
412 .unwrap();
413 stmt.query_map([], |row| row.get(0))
414 .unwrap()
415 .map(|r| r.unwrap())
416 .collect()
417 };
418
419 let titles_b: Vec<String> = {
420 let mut stmt = conn_b
421 .prepare("SELECT title FROM items ORDER BY item_id")
422 .unwrap();
423 stmt.query_map([], |row| row.get(0))
424 .unwrap()
425 .map(|r| r.unwrap())
426 .collect()
427 };
428
429 assert_eq!(titles_a, titles_b);
430 }
431
432 #[test]
433 fn rebuild_populates_fts() {
434 let (dir, shard_mgr) = setup_bones_dir();
435 let db_path = dir.path().join("bones.db");
436 let events_dir = dir.path().join("events");
437
438 let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
439 append_event(&shard_mgr, &create);
440
441 rebuild(&events_dir, &db_path).unwrap();
442
443 let conn = open_projection(&db_path).unwrap();
444 let hits: i64 = conn
445 .query_row(
446 "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
447 [],
448 |row| row.get(0),
449 )
450 .unwrap();
451 assert_eq!(hits, 1);
452 }
453
454 #[test]
455 fn rebuild_updates_projection_cursor() {
456 let (dir, shard_mgr) = setup_bones_dir();
457 let db_path = dir.path().join("bones.db");
458 let events_dir = dir.path().join("events");
459
460 let create = make_create_event("bn-001", "Item", 1000);
461 append_event(&shard_mgr, &create);
462
463 rebuild(&events_dir, &db_path).unwrap();
464
465 let conn = open_projection(&db_path).unwrap();
466 let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
467 assert!(offset > 0, "cursor offset should be non-zero after rebuild");
468 assert!(hash.is_some(), "cursor hash should be set after rebuild");
469 }
470
471 #[test]
472 fn rebuild_handles_duplicate_events() {
473 let (dir, shard_mgr) = setup_bones_dir();
474 let db_path = dir.path().join("bones.db");
475 let events_dir = dir.path().join("events");
476
477 let create = make_create_event("bn-001", "Item", 1000);
479 append_event(&shard_mgr, &create);
480 append_event(&shard_mgr, &create);
481
482 let report = rebuild(&events_dir, &db_path).unwrap();
483 assert_eq!(report.event_count, 1);
485 assert_eq!(report.item_count, 1);
486 }
487
488 #[test]
489 fn rebuild_with_bd_prefix_events() {
490 let (dir, shard_mgr) = setup_bones_dir();
491 let db_path = dir.path().join("bones.db");
492 let events_dir = dir.path().join("events");
493
494 let create1 = make_create_event("bd-9mx", "Parent item", 1000);
496 let create2 = make_create_event("bd-4kz", "Child item", 1001);
497
498 append_event(&shard_mgr, &create1);
499 append_event(&shard_mgr, &create2);
500
501 let report = rebuild(&events_dir, &db_path).unwrap();
502 assert_eq!(
503 report.event_count, 2,
504 "should project 2 events with bd- prefix"
505 );
506 assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
507 }
508
509 #[test]
510 fn rebuild_performance_reasonable() {
511 let (dir, shard_mgr) = setup_bones_dir();
512 let db_path = dir.path().join("bones.db");
513 let events_dir = dir.path().join("events");
514
515 for i in 0..100_u32 {
517 let create = make_create_event(
518 &format!("bn-{i:04x}"),
519 &format!("Item {i}"),
520 i64::from(i) * 1000,
521 );
522 append_event(&shard_mgr, &create);
523 }
524
525 let report = rebuild(&events_dir, &db_path).unwrap();
526 assert_eq!(report.event_count, 100);
527 assert_eq!(report.item_count, 100);
528 assert!(
529 report.elapsed.as_millis() < 1000,
530 "rebuild of 100 items took {}ms, expected <1000ms",
531 report.elapsed.as_millis()
532 );
533 }
534}