Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1//! `db` CLI command group: schema inspection, queries, and migration tooling.
2//!
3//! [`execute`] runs commands against a freshly opened [`AppContext`], while
4//! [`execute_with_db`] reuses a caller-supplied [`DatabaseContext`] for the
5//! standalone (profile-less) path. Subcommands cover ad-hoc queries, schema
6//! introspection, migration apply/down/repair/squash, and the schema doctor.
7
8mod admin;
9mod admin_migrate;
10mod admin_migrate_down;
11mod admin_migrate_mark_applied;
12mod admin_migrate_plan;
13mod admin_migrate_repair;
14mod admin_migrate_status;
15mod admin_migrations;
16mod admin_squash;
17mod commands;
18mod doctor;
19mod helpers;
20mod introspect;
21mod query;
22mod schema;
23mod types;
24
25use anyhow::{Context, Result, bail};
26use std::sync::Arc;
27use systemprompt_database::{DatabaseAdminService, QueryExecutor};
28use systemprompt_runtime::{AppContext, DatabaseContext};
29
30use crate::cli_settings::CliConfig;
31use crate::shared::render_result;
32
33pub use commands::{DbCommands, MigrationsCommands};
34pub use types::*;
35
36struct DatabaseTool {
37    ctx: AppContext,
38    admin_service: DatabaseAdminService,
39    query_executor: QueryExecutor,
40}
41
42impl DatabaseTool {
43    async fn new() -> Result<Self> {
44        let ctx = AppContext::new()
45            .await
46            .context("Failed to connect to database. Check your profile configuration.")?;
47        let pool = ctx.db_pool().write_pool_arc()?;
48        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
49        let query_executor = QueryExecutor::new(pool);
50        Ok(Self {
51            ctx,
52            admin_service,
53            query_executor,
54        })
55    }
56}
57
58pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
59    if let DbCommands::Migrate {
60        allow_checksum_drift,
61    } = cmd
62    {
63        return admin::execute_migrate(config, allow_checksum_drift).await;
64    }
65
66    if let DbCommands::MigrateDown { extension, count } = cmd {
67        return admin::execute_migrate_down(config, &extension, count).await;
68    }
69
70    if let DbCommands::MigrateRepair {
71        extension,
72        apply,
73        json,
74    } = cmd
75    {
76        return admin::execute_migrate_repair(
77            config,
78            admin::RepairArgs {
79                extension: extension.as_deref(),
80                apply,
81                json,
82            },
83        )
84        .await;
85    }
86
87    if let DbCommands::MigrateMarkApplied {
88        extension,
89        version,
90        json,
91    } = cmd
92    {
93        return admin::execute_migrate_mark_applied(
94            config,
95            admin::MarkAppliedArgs {
96                extension: &extension,
97                version,
98                json,
99            },
100        )
101        .await;
102    }
103
104    if let DbCommands::MigrateSquash {
105        extension,
106        through,
107        apply,
108    } = cmd
109    {
110        return admin_squash::execute_squash(
111            config,
112            admin_squash::SquashArgs {
113                extension: &extension,
114                through,
115                apply,
116            },
117        )
118        .await;
119    }
120
121    let db = DatabaseTool::new().await?;
122
123    match cmd {
124        DbCommands::Query {
125            sql,
126            limit,
127            offset,
128            format: _,
129        } => {
130            let params = query::QueryParams {
131                sql: &sql,
132                limit,
133                offset,
134            };
135            let result = query::execute_query(&db.query_executor, &params, config).await?;
136            render_result(&result);
137            Ok(())
138        },
139        DbCommands::Execute { sql, format: _ } => {
140            let result = query::execute_write(&db.query_executor, &sql, config).await?;
141            render_result(&result);
142            Ok(())
143        },
144        DbCommands::Tables { filter } => {
145            schema::execute_tables(&db.admin_service, filter, config).await
146        },
147        DbCommands::Describe { table_name } => {
148            schema::execute_describe(&db.admin_service, &table_name, config).await
149        },
150        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
151        DbCommands::Migrate { .. }
152        | DbCommands::MigrateDown { .. }
153        | DbCommands::MigrateSquash { .. }
154        | DbCommands::MigrateRepair { .. }
155        | DbCommands::MigrateMarkApplied { .. } => unreachable!(),
156        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
157        DbCommands::MigratePlan { extension, json } => {
158            admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
159        },
160        DbCommands::MigrateStatus { extension, json } => {
161            admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
162        },
163        DbCommands::AssignAdmin { user } => {
164            admin::execute_assign_admin(&db.ctx, &user, config).await
165        },
166        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
167        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
168        DbCommands::Count { table_name } => {
169            schema::execute_count(&db.admin_service, &table_name, config).await
170        },
171        DbCommands::Indexes { table } => {
172            introspect::execute_indexes(&db.admin_service, table, config).await
173        },
174        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
175        DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
176    }
177}
178
179pub async fn execute_with_db(
180    cmd: DbCommands,
181    db_ctx: &DatabaseContext,
182    config: &CliConfig,
183) -> Result<()> {
184    let pool = db_ctx
185        .db_pool()
186        .write_pool_arc()
187        .context("Database must be PostgreSQL")?;
188    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
189    let query_executor = QueryExecutor::new(pool);
190
191    match cmd {
192        DbCommands::Query {
193            sql,
194            limit,
195            offset,
196            format: _,
197        } => {
198            let params = query::QueryParams {
199                sql: &sql,
200                limit,
201                offset,
202            };
203            let result = query::execute_query(&query_executor, &params, config).await?;
204            render_result(&result);
205            Ok(())
206        },
207        DbCommands::Execute { sql, format: _ } => {
208            let result = query::execute_write(&query_executor, &sql, config).await?;
209            render_result(&result);
210            Ok(())
211        },
212        DbCommands::Tables { filter } => {
213            schema::execute_tables(&admin_service, filter, config).await
214        },
215        DbCommands::Describe { table_name } => {
216            schema::execute_describe(&admin_service, &table_name, config).await
217        },
218        DbCommands::Info => schema::execute_info(&admin_service, config).await,
219        DbCommands::Migrate {
220            allow_checksum_drift,
221        } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
222        DbCommands::MigrateDown { extension, count } => {
223            admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
224        },
225        DbCommands::MigrateSquash {
226            extension,
227            through,
228            apply,
229        } => {
230            admin_squash::execute_squash_standalone(
231                db_ctx,
232                config,
233                admin_squash::SquashArgs {
234                    extension: &extension,
235                    through,
236                    apply,
237                },
238            )
239            .await
240        },
241        DbCommands::Migrations { cmd } => {
242            admin::execute_migrations_standalone(db_ctx, cmd, config).await
243        },
244        DbCommands::MigratePlan { extension, json } => {
245            admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
246        },
247        DbCommands::MigrateStatus { extension, json } => {
248            admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
249                .await
250        },
251        DbCommands::MigrateRepair {
252            extension,
253            apply,
254            json,
255        } => {
256            admin::execute_migrate_repair_standalone(
257                db_ctx,
258                config,
259                admin::RepairArgs {
260                    extension: extension.as_deref(),
261                    apply,
262                    json,
263                },
264            )
265            .await
266        },
267        DbCommands::MigrateMarkApplied {
268            extension,
269            version,
270            json,
271        } => {
272            admin::execute_migrate_mark_applied_standalone(
273                db_ctx,
274                config,
275                admin::MarkAppliedArgs {
276                    extension: &extension,
277                    version,
278                    json,
279                },
280            )
281            .await
282        },
283        DbCommands::AssignAdmin { .. } => {
284            bail!("assign-admin requires full profile context")
285        },
286        DbCommands::Status => admin::execute_status(&admin_service, config).await,
287        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
288        DbCommands::Count { table_name } => {
289            schema::execute_count(&admin_service, &table_name, config).await
290        },
291        DbCommands::Indexes { table } => {
292            introspect::execute_indexes(&admin_service, table, config).await
293        },
294        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
295        DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
296    }
297}