1use crate::shell::cli::workspace_path;
5use crate::store::{Store, cold_parquet, hot_log::HotLog};
6use anyhow::{Context, Result};
7use std::collections::BTreeMap;
8use std::path::Path;
9
10pub fn cmd_migrate_v2(workspace: Option<&Path>, allow_skew: bool) -> Result<()> {
11 let ws = workspace_path(workspace)?;
12 let root = ws.join(".kaizen");
13 let db_path = root.join("kaizen.db");
14 let backup = root.join("kaizen.db.v1.bak");
15 if db_path.exists() && !backup.exists() {
16 std::fs::copy(&db_path, &backup)
17 .with_context(|| format!("backup SQLite DB: {}", backup.display()))?;
18 }
19 let store = Store::open(&db_path)?;
20 let events = workspace_events(&store, &ws.to_string_lossy())?;
21 validate_timestamps(&events, allow_skew)?;
22 let mut hot = HotLog::open(&root)?;
23 for event in &events {
24 hot.append(event)?;
25 }
26 let parts = cold_parquet::write_daily_events(&root, &events)?;
27 store.sync_state_set_u64("storage_schema_v", 2)?;
28 println!(
29 "migrated v2: {} events, {} cold partitions, backup {}",
30 events.len(),
31 parts.len(),
32 backup.display()
33 );
34 Ok(())
35}
36
37pub fn cmd_migrate_v1(workspace: Option<&Path>) -> Result<()> {
38 let ws = workspace_path(workspace)?;
39 let root = ws.join(".kaizen");
40 let store = Store::open(&root.join("kaizen.db"))?;
41 let mut rows = BTreeMap::new();
42 for (_, event) in HotLog::replay(&root).unwrap_or_default() {
43 rows.insert((event.session_id.clone(), event.seq), event);
44 }
45 for event in cold_parquet::read_events_dir(&root).unwrap_or_default() {
46 rows.insert((event.session_id.clone(), event.seq), event);
47 }
48 for event in rows.values() {
49 store.append_event(event)?;
50 }
51 store.sync_state_set_u64("storage_schema_v", 1)?;
52 println!("migrated v1: {} events restored into SQLite", rows.len());
53 Ok(())
54}
55
56fn workspace_events(store: &Store, workspace: &str) -> Result<Vec<crate::core::event::Event>> {
57 let mut out = Vec::new();
58 for session in store.list_sessions(workspace)? {
59 out.extend(store.list_events_for_session(&session.id)?);
60 }
61 out.sort_by(|a, b| (a.ts_ms, &a.session_id, a.seq).cmp(&(b.ts_ms, &b.session_id, b.seq)));
62 Ok(out)
63}
64
65fn validate_timestamps(events: &[crate::core::event::Event], allow_skew: bool) -> Result<()> {
66 if allow_skew {
67 return Ok(());
68 }
69 let now = std::time::SystemTime::now()
70 .duration_since(std::time::UNIX_EPOCH)?
71 .as_millis() as u64;
72 let max = now.saturating_add(86_400_000);
73 if let Some(event) = events.iter().find(|e| e.ts_ms > max) {
74 anyhow::bail!(
75 "future ts_ms {} in session {} seq {} (pass --allow-skew to keep)",
76 event.ts_ms,
77 event.session_id,
78 event.seq
79 );
80 }
81 Ok(())
82}