Skip to main content

kaizen/shell/
migrate.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen migrate`: reversible SQLite ↔ tiered storage bootstrap.
3
4use crate::shell::cli::workspace_path;
5use crate::store::SessionFilter;
6use crate::store::{Store, cold_parquet, hot_log::HotLog};
7use anyhow::{Context, Result};
8use std::collections::BTreeMap;
9use std::path::Path;
10
11const MIGRATE_SESSION_PAGE: usize = 512;
12const MIGRATE_PARQUET_CHUNK: usize = 8192;
13
14pub fn cmd_migrate_v2(workspace: Option<&Path>, allow_skew: bool) -> Result<()> {
15    let ws = workspace_path(workspace)?;
16    let root = crate::core::paths::project_data_dir(&ws)?;
17    let db_path = root.join("kaizen.db");
18    let backup = root.join("kaizen.db.v1.bak");
19    if db_path.exists() && !backup.exists() {
20        std::fs::copy(&db_path, &backup)
21            .with_context(|| format!("backup SQLite DB: {}", backup.display()))?;
22    }
23    let store = Store::open(&db_path)?;
24    let workspace = ws.to_string_lossy().to_string();
25    validate_timestamps(&store, &workspace, allow_skew)?;
26    let mut hot = HotLog::open(&root)?;
27    let mut cold = cold_parquet::DailyEventWriter::new(&root, MIGRATE_PARQUET_CHUNK);
28    let mut total = 0_usize;
29    for_workspace_events(&store, &workspace, |event| {
30        hot.append(event)?;
31        cold.push(event.clone())?;
32        total += 1;
33        Ok(())
34    })?;
35    hot.flush()?;
36    let parts = cold.finish()?;
37    store.sync_state_set_u64("storage_schema_v", 2)?;
38    println!(
39        "migrated v2: {} events, {} cold partitions, backup {}",
40        total,
41        parts.len(),
42        backup.display()
43    );
44    Ok(())
45}
46
47fn for_workspace_events<F>(store: &Store, workspace: &str, mut f: F) -> Result<()>
48where
49    F: FnMut(&crate::core::event::Event) -> Result<()>,
50{
51    let mut offset = 0;
52    loop {
53        let page = store.list_sessions_page(
54            workspace,
55            offset,
56            MIGRATE_SESSION_PAGE,
57            SessionFilter::default(),
58        )?;
59        for session in page.rows {
60            for event in store.list_events_for_session(&session.id)? {
61                f(&event)?;
62            }
63        }
64        let Some(next) = page.next_offset else {
65            break;
66        };
67        offset = next;
68    }
69    Ok(())
70}
71
72pub fn cmd_migrate_v1(workspace: Option<&Path>) -> Result<()> {
73    let ws = workspace_path(workspace)?;
74    let root = crate::core::paths::project_data_dir(&ws)?;
75    let store = Store::open(&root.join("kaizen.db"))?;
76    let mut rows = BTreeMap::new();
77    for (_, event) in HotLog::replay(&root).unwrap_or_default() {
78        rows.insert((event.session_id.clone(), event.seq), event);
79    }
80    for event in cold_parquet::read_events_dir(&root).unwrap_or_default() {
81        rows.insert((event.session_id.clone(), event.seq), event);
82    }
83    for event in rows.values() {
84        store.append_event(event)?;
85    }
86    store.sync_state_set_u64("storage_schema_v", 1)?;
87    println!("migrated v1: {} events restored into SQLite", rows.len());
88    Ok(())
89}
90
91fn validate_timestamps(store: &Store, workspace: &str, allow_skew: bool) -> Result<()> {
92    if allow_skew {
93        return Ok(());
94    }
95    let now = std::time::SystemTime::now()
96        .duration_since(std::time::UNIX_EPOCH)?
97        .as_millis() as u64;
98    let max = now.saturating_add(86_400_000);
99    for_workspace_events(store, workspace, |event| {
100        if event.ts_ms <= max {
101            return Ok(());
102        }
103        anyhow::bail!(
104            "future ts_ms {} in session {} seq {} (pass --allow-skew to keep)",
105            event.ts_ms,
106            event.session_id,
107            event.seq
108        )
109    })?;
110    Ok(())
111}