Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_plan;
5mod admin_migrate_repair;
6mod admin_migrate_status;
7mod admin_migrations;
8mod admin_squash;
9mod commands;
10mod doctor;
11mod helpers;
12mod introspect;
13mod query;
14mod schema;
15mod types;
16
17use anyhow::{Context, Result, bail};
18use std::sync::Arc;
19use systemprompt_database::{DatabaseAdminService, QueryExecutor};
20use systemprompt_runtime::{AppContext, DatabaseContext};
21
22use crate::cli_settings::CliConfig;
23use crate::shared::render_result;
24
25pub use commands::{DbCommands, MigrationsCommands};
26pub use types::*;
27
28struct DatabaseTool {
29    ctx: AppContext,
30    admin_service: DatabaseAdminService,
31    query_executor: QueryExecutor,
32}
33
34impl DatabaseTool {
35    async fn new() -> Result<Self> {
36        let ctx = AppContext::new()
37            .await
38            .context("Failed to connect to database. Check your profile configuration.")?;
39        let pool = ctx.db_pool().write_pool_arc()?;
40        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
41        let query_executor = QueryExecutor::new(pool);
42        Ok(Self {
43            ctx,
44            admin_service,
45            query_executor,
46        })
47    }
48}
49
50pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
51    if let DbCommands::Migrate {
52        allow_checksum_drift,
53    } = cmd
54    {
55        return admin::execute_migrate(config, allow_checksum_drift).await;
56    }
57
58    if let DbCommands::MigrateDown { extension, count } = cmd {
59        return admin::execute_migrate_down(config, &extension, count).await;
60    }
61
62    if let DbCommands::MigrateRepair {
63        extension,
64        apply,
65        json,
66    } = cmd
67    {
68        return admin::execute_migrate_repair(
69            config,
70            admin::RepairArgs {
71                extension: extension.as_deref(),
72                apply,
73                json,
74            },
75        )
76        .await;
77    }
78
79    if let DbCommands::MigrateSquash {
80        extension,
81        through,
82        apply,
83    } = cmd
84    {
85        return admin_squash::execute_squash(
86            config,
87            admin_squash::SquashArgs {
88                extension: &extension,
89                through,
90                apply,
91            },
92        )
93        .await;
94    }
95
96    let db = DatabaseTool::new().await?;
97
98    match cmd {
99        DbCommands::Query {
100            sql,
101            limit,
102            offset,
103            format: _,
104        } => {
105            let params = query::QueryParams {
106                sql: &sql,
107                limit,
108                offset,
109            };
110            let result = query::execute_query(&db.query_executor, &params, config).await?;
111            render_result(&result);
112            Ok(())
113        },
114        DbCommands::Execute { sql, format: _ } => {
115            let result = query::execute_write(&db.query_executor, &sql, config).await?;
116            render_result(&result);
117            Ok(())
118        },
119        DbCommands::Tables { filter } => {
120            schema::execute_tables(&db.admin_service, filter, config).await
121        },
122        DbCommands::Describe { table_name } => {
123            schema::execute_describe(&db.admin_service, &table_name, config).await
124        },
125        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
126        DbCommands::Migrate { .. }
127        | DbCommands::MigrateDown { .. }
128        | DbCommands::MigrateSquash { .. }
129        | DbCommands::MigrateRepair { .. } => unreachable!(),
130        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
131        DbCommands::MigratePlan { extension, json } => {
132            admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
133        },
134        DbCommands::MigrateStatus { extension, json } => {
135            admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
136        },
137        DbCommands::AssignAdmin { user } => {
138            admin::execute_assign_admin(&db.ctx, &user, config).await
139        },
140        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
141        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
142        DbCommands::Count { table_name } => {
143            schema::execute_count(&db.admin_service, &table_name, config).await
144        },
145        DbCommands::Indexes { table } => {
146            introspect::execute_indexes(&db.admin_service, table, config).await
147        },
148        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
149        DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
150    }
151}
152
153pub async fn execute_with_db(
154    cmd: DbCommands,
155    db_ctx: &DatabaseContext,
156    config: &CliConfig,
157) -> Result<()> {
158    let pool = db_ctx
159        .db_pool()
160        .write_pool_arc()
161        .context("Database must be PostgreSQL")?;
162    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
163    let query_executor = QueryExecutor::new(pool);
164
165    match cmd {
166        DbCommands::Query {
167            sql,
168            limit,
169            offset,
170            format: _,
171        } => {
172            let params = query::QueryParams {
173                sql: &sql,
174                limit,
175                offset,
176            };
177            let result = query::execute_query(&query_executor, &params, config).await?;
178            render_result(&result);
179            Ok(())
180        },
181        DbCommands::Execute { sql, format: _ } => {
182            let result = query::execute_write(&query_executor, &sql, config).await?;
183            render_result(&result);
184            Ok(())
185        },
186        DbCommands::Tables { filter } => {
187            schema::execute_tables(&admin_service, filter, config).await
188        },
189        DbCommands::Describe { table_name } => {
190            schema::execute_describe(&admin_service, &table_name, config).await
191        },
192        DbCommands::Info => schema::execute_info(&admin_service, config).await,
193        DbCommands::Migrate {
194            allow_checksum_drift,
195        } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
196        DbCommands::MigrateDown { extension, count } => {
197            admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
198        },
199        DbCommands::MigrateSquash {
200            extension,
201            through,
202            apply,
203        } => {
204            admin_squash::execute_squash_standalone(
205                db_ctx,
206                config,
207                admin_squash::SquashArgs {
208                    extension: &extension,
209                    through,
210                    apply,
211                },
212            )
213            .await
214        },
215        DbCommands::Migrations { cmd } => {
216            admin::execute_migrations_standalone(db_ctx, cmd, config).await
217        },
218        DbCommands::MigratePlan { extension, json } => {
219            admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
220        },
221        DbCommands::MigrateStatus { extension, json } => {
222            admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
223                .await
224        },
225        DbCommands::MigrateRepair {
226            extension,
227            apply,
228            json,
229        } => {
230            admin::execute_migrate_repair_standalone(
231                db_ctx,
232                config,
233                admin::RepairArgs {
234                    extension: extension.as_deref(),
235                    apply,
236                    json,
237                },
238            )
239            .await
240        },
241        DbCommands::AssignAdmin { .. } => {
242            bail!("assign-admin requires full profile context")
243        },
244        DbCommands::Status => admin::execute_status(&admin_service, config).await,
245        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
246        DbCommands::Count { table_name } => {
247            schema::execute_count(&admin_service, &table_name, config).await
248        },
249        DbCommands::Indexes { table } => {
250            introspect::execute_indexes(&admin_service, table, config).await
251        },
252        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
253        DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
254    }
255}