1use std::collections::HashSet;
2
3use sqlx::postgres::PgPoolOptions;
4use sqlx::{PgPool, Row};
5
6pub async fn create_pool(database_url: &str) -> alaz_core::Result<PgPool> {
7 let pool = PgPoolOptions::new()
8 .max_connections(20)
9 .connect(database_url)
10 .await?;
11 Ok(pool)
12}
13
14const MIGRATIONS: &[(&str, &str)] = &[
16 ("001", include_str!("migrations/001_initial.sql")),
17 ("002", include_str!("migrations/002_vault.sql")),
18 ("003", include_str!("migrations/003_dedup_indexes.sql")),
19 ("004", include_str!("migrations/004_memory_decay.sql")),
20 ("005", include_str!("migrations/005_search_feedback.sql")),
21 ("006", include_str!("migrations/006_enhanced_features.sql")),
22 ("007", include_str!("migrations/007_source_tracking.sql")),
23 ("008", include_str!("migrations/008_simple_fts.sql")),
24 ("009", include_str!("migrations/009_wilson_score.sql")),
25 ("010", include_str!("migrations/010_episode_enrichment.sql")),
26 ("011", include_str!("migrations/011_pattern_usage.sql")),
27 ("012", include_str!("migrations/012_signal_weights.sql")),
28 (
29 "013",
30 include_str!("migrations/013_search_explanations.sql"),
31 ),
32 ("014", include_str!("migrations/014_git_activity.sql")),
33 ("015", include_str!("migrations/015_code_symbols.sql")),
34 ("016", include_str!("migrations/016_spaced_repetition.sql")),
35 ("017", include_str!("migrations/017_context_tracking.sql")),
36 ("018", include_str!("migrations/018_learning_analytics.sql")),
37];
38
39const MIGRATION_NAMES: &[(&str, &str)] = &[
41 ("001", "001_initial.sql"),
42 ("002", "002_vault.sql"),
43 ("003", "003_dedup_indexes.sql"),
44 ("004", "004_memory_decay.sql"),
45 ("005", "005_search_feedback.sql"),
46 ("006", "006_enhanced_features.sql"),
47 ("007", "007_source_tracking.sql"),
48 ("008", "008_simple_fts.sql"),
49 ("009", "009_wilson_score.sql"),
50 ("010", "010_episode_enrichment.sql"),
51 ("011", "011_pattern_usage.sql"),
52 ("012", "012_signal_weights.sql"),
53 ("013", "013_search_explanations.sql"),
54 ("014", "014_git_activity.sql"),
55 ("015", "015_code_symbols.sql"),
56 ("016", "016_spaced_repetition.sql"),
57 ("017", "017_context_tracking.sql"),
58 ("018", "018_learning_analytics.sql"),
59];
60
61#[derive(Debug)]
63pub struct MigrationInfo {
64 pub version: String,
65 pub name: String,
66 pub applied: bool,
67}
68
69async fn ensure_tracking_table(pool: &PgPool) -> alaz_core::Result<()> {
71 let sql = include_str!("migrations/000_migration_tracking.sql");
72 sqlx::raw_sql(sql).execute(pool).await?;
73 Ok(())
74}
75
76async fn get_applied_versions(pool: &PgPool) -> alaz_core::Result<HashSet<String>> {
78 let rows = sqlx::query("SELECT version FROM _alaz_migrations")
79 .fetch_all(pool)
80 .await?;
81 let versions: HashSet<String> = rows.iter().map(|r| r.get("version")).collect();
82 Ok(versions)
83}
84
85async fn is_pre_tracking_database(pool: &PgPool) -> alaz_core::Result<bool> {
89 let row = sqlx::query(
90 "SELECT EXISTS (
91 SELECT 1 FROM information_schema.tables
92 WHERE table_schema = 'public' AND table_name = 'knowledge_items'
93 ) AS exists",
94 )
95 .fetch_one(pool)
96 .await?;
97 Ok(row.get::<bool, _>("exists"))
98}
99
100async fn backfill_applied_versions(pool: &PgPool) -> alaz_core::Result<()> {
102 for (version, _) in MIGRATIONS {
103 sqlx::query("INSERT INTO _alaz_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING")
104 .bind(version)
105 .execute(pool)
106 .await?;
107 }
108 tracing::info!(
109 "backfilled {} migrations as already applied (pre-tracking database)",
110 MIGRATIONS.len()
111 );
112 Ok(())
113}
114
115pub async fn run_migrations(pool: &PgPool) -> alaz_core::Result<usize> {
117 ensure_tracking_table(pool).await?;
119
120 let applied = get_applied_versions(pool).await?;
122 if applied.is_empty() && is_pre_tracking_database(pool).await? {
123 backfill_applied_versions(pool).await?;
124 tracing::info!("migrations completed (0 new — all backfilled)");
125 return Ok(0);
126 }
127
128 let mut count = 0usize;
130 for (version, sql) in MIGRATIONS {
131 if applied.contains(*version) {
132 continue;
133 }
134
135 tracing::info!(version, "applying migration");
136 sqlx::raw_sql(sql).execute(pool).await?;
137
138 sqlx::query("INSERT INTO _alaz_migrations (version) VALUES ($1)")
139 .bind(version)
140 .execute(pool)
141 .await?;
142
143 count += 1;
144 }
145
146 tracing::info!(applied = count, "migrations completed");
147 Ok(count)
148}
149
150pub async fn migration_status(pool: &PgPool) -> alaz_core::Result<Vec<MigrationInfo>> {
152 ensure_tracking_table(pool).await?;
153 let applied = get_applied_versions(pool).await?;
154
155 let mut result = Vec::with_capacity(MIGRATION_NAMES.len());
156 for (version, name) in MIGRATION_NAMES {
157 result.push(MigrationInfo {
158 version: version.to_string(),
159 name: name.to_string(),
160 applied: applied.contains(*version),
161 });
162 }
163 Ok(result)
164}
165
166pub async fn migrations_pending(pool: &PgPool) -> alaz_core::Result<Vec<MigrationInfo>> {
168 ensure_tracking_table(pool).await?;
169 let applied = get_applied_versions(pool).await?;
170
171 let mut pending = Vec::new();
172 for (version, name) in MIGRATION_NAMES {
173 if !applied.contains(*version) {
174 pending.push(MigrationInfo {
175 version: version.to_string(),
176 name: name.to_string(),
177 applied: false,
178 });
179 }
180 }
181 Ok(pending)
182}