Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_plan;
5mod admin_migrate_status;
6mod admin_migrations;
7mod admin_squash;
8mod commands;
9mod doctor;
10mod helpers;
11mod introspect;
12mod query;
13mod schema;
14mod types;
15
16use anyhow::{Context, Result, bail};
17use std::sync::Arc;
18use systemprompt_database::{DatabaseAdminService, QueryExecutor};
19use systemprompt_runtime::{AppContext, DatabaseContext};
20
21use crate::cli_settings::CliConfig;
22use crate::shared::render_result;
23
24pub use commands::{DbCommands, MigrationsCommands};
25pub use types::*;
26
27struct DatabaseTool {
28    ctx: AppContext,
29    admin_service: DatabaseAdminService,
30    query_executor: QueryExecutor,
31}
32
33impl DatabaseTool {
34    async fn new() -> Result<Self> {
35        let ctx = AppContext::new()
36            .await
37            .context("Failed to connect to database. Check your profile configuration.")?;
38        let pool = ctx.db_pool().write_pool_arc()?;
39        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
40        let query_executor = QueryExecutor::new(pool);
41        Ok(Self {
42            ctx,
43            admin_service,
44            query_executor,
45        })
46    }
47}
48
49pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
50    if let DbCommands::Migrate {
51        allow_checksum_drift,
52    } = cmd
53    {
54        return admin::execute_migrate(config, allow_checksum_drift).await;
55    }
56
57    if let DbCommands::MigrateDown { extension, count } = cmd {
58        return admin::execute_migrate_down(config, &extension, count).await;
59    }
60
61    if let DbCommands::MigrateSquash {
62        extension,
63        through,
64        apply,
65    } = cmd
66    {
67        return admin_squash::execute_squash(
68            config,
69            admin_squash::SquashArgs {
70                extension: &extension,
71                through,
72                apply,
73            },
74        )
75        .await;
76    }
77
78    let db = DatabaseTool::new().await?;
79
80    match cmd {
81        DbCommands::Query {
82            sql,
83            limit,
84            offset,
85            format: _,
86        } => {
87            let params = query::QueryParams {
88                sql: &sql,
89                limit,
90                offset,
91            };
92            let result = query::execute_query(&db.query_executor, &params, config).await?;
93            render_result(&result);
94            Ok(())
95        },
96        DbCommands::Execute { sql, format: _ } => {
97            let result = query::execute_write(&db.query_executor, &sql, config).await?;
98            render_result(&result);
99            Ok(())
100        },
101        DbCommands::Tables { filter } => {
102            schema::execute_tables(&db.admin_service, filter, config).await
103        },
104        DbCommands::Describe { table_name } => {
105            schema::execute_describe(&db.admin_service, &table_name, config).await
106        },
107        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
108        DbCommands::Migrate { .. }
109        | DbCommands::MigrateDown { .. }
110        | DbCommands::MigrateSquash { .. } => unreachable!(),
111        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
112        DbCommands::MigratePlan { extension, json } => {
113            admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
114        },
115        DbCommands::MigrateStatus { extension, json } => {
116            admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
117        },
118        DbCommands::AssignAdmin { user } => {
119            admin::execute_assign_admin(&db.ctx, &user, config).await
120        },
121        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
122        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
123        DbCommands::Count { table_name } => {
124            schema::execute_count(&db.admin_service, &table_name, config).await
125        },
126        DbCommands::Indexes { table } => {
127            introspect::execute_indexes(&db.admin_service, table, config).await
128        },
129        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
130        DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
131    }
132}
133
134pub async fn execute_with_db(
135    cmd: DbCommands,
136    db_ctx: &DatabaseContext,
137    config: &CliConfig,
138) -> Result<()> {
139    let pool = db_ctx
140        .db_pool()
141        .write_pool_arc()
142        .context("Database must be PostgreSQL")?;
143    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
144    let query_executor = QueryExecutor::new(pool);
145
146    match cmd {
147        DbCommands::Query {
148            sql,
149            limit,
150            offset,
151            format: _,
152        } => {
153            let params = query::QueryParams {
154                sql: &sql,
155                limit,
156                offset,
157            };
158            let result = query::execute_query(&query_executor, &params, config).await?;
159            render_result(&result);
160            Ok(())
161        },
162        DbCommands::Execute { sql, format: _ } => {
163            let result = query::execute_write(&query_executor, &sql, config).await?;
164            render_result(&result);
165            Ok(())
166        },
167        DbCommands::Tables { filter } => {
168            schema::execute_tables(&admin_service, filter, config).await
169        },
170        DbCommands::Describe { table_name } => {
171            schema::execute_describe(&admin_service, &table_name, config).await
172        },
173        DbCommands::Info => schema::execute_info(&admin_service, config).await,
174        DbCommands::Migrate {
175            allow_checksum_drift,
176        } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
177        DbCommands::MigrateDown { extension, count } => {
178            admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
179        },
180        DbCommands::MigrateSquash {
181            extension,
182            through,
183            apply,
184        } => {
185            admin_squash::execute_squash_standalone(
186                db_ctx,
187                config,
188                admin_squash::SquashArgs {
189                    extension: &extension,
190                    through,
191                    apply,
192                },
193            )
194            .await
195        },
196        DbCommands::Migrations { cmd } => {
197            admin::execute_migrations_standalone(db_ctx, cmd, config).await
198        },
199        DbCommands::MigratePlan { extension, json } => {
200            admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
201        },
202        DbCommands::MigrateStatus { extension, json } => {
203            admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
204                .await
205        },
206        DbCommands::AssignAdmin { .. } => {
207            bail!("assign-admin requires full profile context")
208        },
209        DbCommands::Status => admin::execute_status(&admin_service, config).await,
210        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
211        DbCommands::Count { table_name } => {
212            schema::execute_count(&admin_service, &table_name, config).await
213        },
214        DbCommands::Indexes { table } => {
215            introspect::execute_indexes(&admin_service, table, config).await
216        },
217        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
218        DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
219    }
220}