Skip to main content

alaz_db/
pool.rs

1use std::collections::HashSet;
2
3use sqlx::postgres::PgPoolOptions;
4use sqlx::{PgPool, Row};
5
6pub async fn create_pool(database_url: &str) -> alaz_core::Result<PgPool> {
7    let pool = PgPoolOptions::new()
8        .max_connections(20)
9        .connect(database_url)
10        .await?;
11    Ok(pool)
12}
13
14/// All migrations in order. Each entry is (version_key, sql_content).
15const MIGRATIONS: &[(&str, &str)] = &[
16    ("001", include_str!("migrations/001_initial.sql")),
17    ("002", include_str!("migrations/002_vault.sql")),
18    ("003", include_str!("migrations/003_dedup_indexes.sql")),
19    ("004", include_str!("migrations/004_memory_decay.sql")),
20    ("005", include_str!("migrations/005_search_feedback.sql")),
21    ("006", include_str!("migrations/006_enhanced_features.sql")),
22    ("007", include_str!("migrations/007_source_tracking.sql")),
23    ("008", include_str!("migrations/008_simple_fts.sql")),
24    ("009", include_str!("migrations/009_wilson_score.sql")),
25    ("010", include_str!("migrations/010_episode_enrichment.sql")),
26    ("011", include_str!("migrations/011_pattern_usage.sql")),
27    ("012", include_str!("migrations/012_signal_weights.sql")),
28    (
29        "013",
30        include_str!("migrations/013_search_explanations.sql"),
31    ),
32    ("014", include_str!("migrations/014_git_activity.sql")),
33    ("015", include_str!("migrations/015_code_symbols.sql")),
34    ("016", include_str!("migrations/016_spaced_repetition.sql")),
35    ("017", include_str!("migrations/017_context_tracking.sql")),
36    ("018", include_str!("migrations/018_learning_analytics.sql")),
37];
38
39/// Full migration file names for display purposes.
40const MIGRATION_NAMES: &[(&str, &str)] = &[
41    ("001", "001_initial.sql"),
42    ("002", "002_vault.sql"),
43    ("003", "003_dedup_indexes.sql"),
44    ("004", "004_memory_decay.sql"),
45    ("005", "005_search_feedback.sql"),
46    ("006", "006_enhanced_features.sql"),
47    ("007", "007_source_tracking.sql"),
48    ("008", "008_simple_fts.sql"),
49    ("009", "009_wilson_score.sql"),
50    ("010", "010_episode_enrichment.sql"),
51    ("011", "011_pattern_usage.sql"),
52    ("012", "012_signal_weights.sql"),
53    ("013", "013_search_explanations.sql"),
54    ("014", "014_git_activity.sql"),
55    ("015", "015_code_symbols.sql"),
56    ("016", "016_spaced_repetition.sql"),
57    ("017", "017_context_tracking.sql"),
58    ("018", "018_learning_analytics.sql"),
59];
60
61/// Migration status for a single migration file.
62#[derive(Debug)]
63pub struct MigrationInfo {
64    pub version: String,
65    pub name: String,
66    pub applied: bool,
67}
68
69/// Ensure the `_alaz_migrations` tracking table exists.
70async fn ensure_tracking_table(pool: &PgPool) -> alaz_core::Result<()> {
71    let sql = include_str!("migrations/000_migration_tracking.sql");
72    sqlx::raw_sql(sql).execute(pool).await?;
73    Ok(())
74}
75
76/// Get the set of already-applied migration versions.
77async fn get_applied_versions(pool: &PgPool) -> alaz_core::Result<HashSet<String>> {
78    let rows = sqlx::query("SELECT version FROM _alaz_migrations")
79        .fetch_all(pool)
80        .await?;
81    let versions: HashSet<String> = rows.iter().map(|r| r.get("version")).collect();
82    Ok(versions)
83}
84
85/// Detect if this is a pre-tracking database (tables exist but no tracking table was populated).
86/// We check for the `knowledge_items` table as a sentinel — if it exists, all prior migrations
87/// were already applied before tracking was added.
88async fn is_pre_tracking_database(pool: &PgPool) -> alaz_core::Result<bool> {
89    let row = sqlx::query(
90        "SELECT EXISTS (
91            SELECT 1 FROM information_schema.tables
92            WHERE table_schema = 'public' AND table_name = 'knowledge_items'
93        ) AS exists",
94    )
95    .fetch_one(pool)
96    .await?;
97    Ok(row.get::<bool, _>("exists"))
98}
99
100/// Backfill all existing migrations as applied (for databases that predate tracking).
101async fn backfill_applied_versions(pool: &PgPool) -> alaz_core::Result<()> {
102    for (version, _) in MIGRATIONS {
103        sqlx::query("INSERT INTO _alaz_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING")
104            .bind(version)
105            .execute(pool)
106            .await?;
107    }
108    tracing::info!(
109        "backfilled {} migrations as already applied (pre-tracking database)",
110        MIGRATIONS.len()
111    );
112    Ok(())
113}
114
115/// Run pending database migrations, returning the count of newly applied migrations.
116pub async fn run_migrations(pool: &PgPool) -> alaz_core::Result<usize> {
117    // Step 1: Ensure tracking table exists
118    ensure_tracking_table(pool).await?;
119
120    // Step 2: Handle pre-tracking databases (tables exist but no tracking records)
121    let applied = get_applied_versions(pool).await?;
122    if applied.is_empty() && is_pre_tracking_database(pool).await? {
123        backfill_applied_versions(pool).await?;
124        tracing::info!("migrations completed (0 new — all backfilled)");
125        return Ok(0);
126    }
127
128    // Step 3: Run pending migrations
129    let mut count = 0usize;
130    for (version, sql) in MIGRATIONS {
131        if applied.contains(*version) {
132            continue;
133        }
134
135        tracing::info!(version, "applying migration");
136        sqlx::raw_sql(sql).execute(pool).await?;
137
138        sqlx::query("INSERT INTO _alaz_migrations (version) VALUES ($1)")
139            .bind(version)
140            .execute(pool)
141            .await?;
142
143        count += 1;
144    }
145
146    tracing::info!(applied = count, "migrations completed");
147    Ok(count)
148}
149
150/// Return migration status (applied/pending) without executing anything.
151pub async fn migration_status(pool: &PgPool) -> alaz_core::Result<Vec<MigrationInfo>> {
152    ensure_tracking_table(pool).await?;
153    let applied = get_applied_versions(pool).await?;
154
155    let mut result = Vec::with_capacity(MIGRATION_NAMES.len());
156    for (version, name) in MIGRATION_NAMES {
157        result.push(MigrationInfo {
158            version: version.to_string(),
159            name: name.to_string(),
160            applied: applied.contains(*version),
161        });
162    }
163    Ok(result)
164}
165
166/// Dry-run: return versions that would be applied, without executing them.
167pub async fn migrations_pending(pool: &PgPool) -> alaz_core::Result<Vec<MigrationInfo>> {
168    ensure_tracking_table(pool).await?;
169    let applied = get_applied_versions(pool).await?;
170
171    let mut pending = Vec::new();
172    for (version, name) in MIGRATION_NAMES {
173        if !applied.contains(*version) {
174            pending.push(MigrationInfo {
175                version: version.to_string(),
176                name: name.to_string(),
177                applied: false,
178            });
179        }
180    }
181    Ok(pending)
182}