Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_mark_applied;
5mod admin_migrate_plan;
6mod admin_migrate_repair;
7mod admin_migrate_status;
8mod admin_migrations;
9mod admin_squash;
10mod commands;
11mod doctor;
12mod helpers;
13mod introspect;
14mod query;
15mod schema;
16mod types;
17
18use anyhow::{Context, Result, bail};
19use std::sync::Arc;
20use systemprompt_database::{DatabaseAdminService, QueryExecutor};
21use systemprompt_runtime::{AppContext, DatabaseContext};
22
23use crate::cli_settings::CliConfig;
24use crate::shared::render_result;
25
26pub use commands::{DbCommands, MigrationsCommands};
27pub use types::*;
28
29struct DatabaseTool {
30    ctx: AppContext,
31    admin_service: DatabaseAdminService,
32    query_executor: QueryExecutor,
33}
34
35impl DatabaseTool {
36    async fn new() -> Result<Self> {
37        let ctx = AppContext::new()
38            .await
39            .context("Failed to connect to database. Check your profile configuration.")?;
40        let pool = ctx.db_pool().write_pool_arc()?;
41        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
42        let query_executor = QueryExecutor::new(pool);
43        Ok(Self {
44            ctx,
45            admin_service,
46            query_executor,
47        })
48    }
49}
50
51pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
52    if let DbCommands::Migrate {
53        allow_checksum_drift,
54    } = cmd
55    {
56        return admin::execute_migrate(config, allow_checksum_drift).await;
57    }
58
59    if let DbCommands::MigrateDown { extension, count } = cmd {
60        return admin::execute_migrate_down(config, &extension, count).await;
61    }
62
63    if let DbCommands::MigrateRepair {
64        extension,
65        apply,
66        json,
67    } = cmd
68    {
69        return admin::execute_migrate_repair(
70            config,
71            admin::RepairArgs {
72                extension: extension.as_deref(),
73                apply,
74                json,
75            },
76        )
77        .await;
78    }
79
80    if let DbCommands::MigrateMarkApplied {
81        extension,
82        version,
83        json,
84    } = cmd
85    {
86        return admin::execute_migrate_mark_applied(
87            config,
88            admin::MarkAppliedArgs {
89                extension: &extension,
90                version,
91                json,
92            },
93        )
94        .await;
95    }
96
97    if let DbCommands::MigrateSquash {
98        extension,
99        through,
100        apply,
101    } = cmd
102    {
103        return admin_squash::execute_squash(
104            config,
105            admin_squash::SquashArgs {
106                extension: &extension,
107                through,
108                apply,
109            },
110        )
111        .await;
112    }
113
114    let db = DatabaseTool::new().await?;
115
116    match cmd {
117        DbCommands::Query {
118            sql,
119            limit,
120            offset,
121            format: _,
122        } => {
123            let params = query::QueryParams {
124                sql: &sql,
125                limit,
126                offset,
127            };
128            let result = query::execute_query(&db.query_executor, &params, config).await?;
129            render_result(&result);
130            Ok(())
131        },
132        DbCommands::Execute { sql, format: _ } => {
133            let result = query::execute_write(&db.query_executor, &sql, config).await?;
134            render_result(&result);
135            Ok(())
136        },
137        DbCommands::Tables { filter } => {
138            schema::execute_tables(&db.admin_service, filter, config).await
139        },
140        DbCommands::Describe { table_name } => {
141            schema::execute_describe(&db.admin_service, &table_name, config).await
142        },
143        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
144        DbCommands::Migrate { .. }
145        | DbCommands::MigrateDown { .. }
146        | DbCommands::MigrateSquash { .. }
147        | DbCommands::MigrateRepair { .. }
148        | DbCommands::MigrateMarkApplied { .. } => unreachable!(),
149        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
150        DbCommands::MigratePlan { extension, json } => {
151            admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
152        },
153        DbCommands::MigrateStatus { extension, json } => {
154            admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
155        },
156        DbCommands::AssignAdmin { user } => {
157            admin::execute_assign_admin(&db.ctx, &user, config).await
158        },
159        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
160        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
161        DbCommands::Count { table_name } => {
162            schema::execute_count(&db.admin_service, &table_name, config).await
163        },
164        DbCommands::Indexes { table } => {
165            introspect::execute_indexes(&db.admin_service, table, config).await
166        },
167        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
168        DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
169    }
170}
171
172pub async fn execute_with_db(
173    cmd: DbCommands,
174    db_ctx: &DatabaseContext,
175    config: &CliConfig,
176) -> Result<()> {
177    let pool = db_ctx
178        .db_pool()
179        .write_pool_arc()
180        .context("Database must be PostgreSQL")?;
181    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
182    let query_executor = QueryExecutor::new(pool);
183
184    match cmd {
185        DbCommands::Query {
186            sql,
187            limit,
188            offset,
189            format: _,
190        } => {
191            let params = query::QueryParams {
192                sql: &sql,
193                limit,
194                offset,
195            };
196            let result = query::execute_query(&query_executor, &params, config).await?;
197            render_result(&result);
198            Ok(())
199        },
200        DbCommands::Execute { sql, format: _ } => {
201            let result = query::execute_write(&query_executor, &sql, config).await?;
202            render_result(&result);
203            Ok(())
204        },
205        DbCommands::Tables { filter } => {
206            schema::execute_tables(&admin_service, filter, config).await
207        },
208        DbCommands::Describe { table_name } => {
209            schema::execute_describe(&admin_service, &table_name, config).await
210        },
211        DbCommands::Info => schema::execute_info(&admin_service, config).await,
212        DbCommands::Migrate {
213            allow_checksum_drift,
214        } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
215        DbCommands::MigrateDown { extension, count } => {
216            admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
217        },
218        DbCommands::MigrateSquash {
219            extension,
220            through,
221            apply,
222        } => {
223            admin_squash::execute_squash_standalone(
224                db_ctx,
225                config,
226                admin_squash::SquashArgs {
227                    extension: &extension,
228                    through,
229                    apply,
230                },
231            )
232            .await
233        },
234        DbCommands::Migrations { cmd } => {
235            admin::execute_migrations_standalone(db_ctx, cmd, config).await
236        },
237        DbCommands::MigratePlan { extension, json } => {
238            admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
239        },
240        DbCommands::MigrateStatus { extension, json } => {
241            admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
242                .await
243        },
244        DbCommands::MigrateRepair {
245            extension,
246            apply,
247            json,
248        } => {
249            admin::execute_migrate_repair_standalone(
250                db_ctx,
251                config,
252                admin::RepairArgs {
253                    extension: extension.as_deref(),
254                    apply,
255                    json,
256                },
257            )
258            .await
259        },
260        DbCommands::MigrateMarkApplied {
261            extension,
262            version,
263            json,
264        } => {
265            admin::execute_migrate_mark_applied_standalone(
266                db_ctx,
267                config,
268                admin::MarkAppliedArgs {
269                    extension: &extension,
270                    version,
271                    json,
272                },
273            )
274            .await
275        },
276        DbCommands::AssignAdmin { .. } => {
277            bail!("assign-admin requires full profile context")
278        },
279        DbCommands::Status => admin::execute_status(&admin_service, config).await,
280        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
281        DbCommands::Count { table_name } => {
282            schema::execute_count(&admin_service, &table_name, config).await
283        },
284        DbCommands::Indexes { table } => {
285            introspect::execute_indexes(&admin_service, table, config).await
286        },
287        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
288        DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
289    }
290}