systemprompt_cli/commands/infrastructure/db/
mod.rs1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_plan;
5mod admin_migrate_status;
6mod admin_migrations;
7mod admin_squash;
8mod commands;
9mod doctor;
10mod helpers;
11mod introspect;
12mod query;
13mod schema;
14mod types;
15
16use anyhow::{Context, Result, bail};
17use std::sync::Arc;
18use systemprompt_database::{DatabaseAdminService, QueryExecutor};
19use systemprompt_runtime::{AppContext, DatabaseContext};
20
21use crate::cli_settings::CliConfig;
22use crate::shared::render_result;
23
24pub use commands::{DbCommands, MigrationsCommands};
25pub use types::*;
26
27struct DatabaseTool {
28 ctx: AppContext,
29 admin_service: DatabaseAdminService,
30 query_executor: QueryExecutor,
31}
32
33impl DatabaseTool {
34 async fn new() -> Result<Self> {
35 let ctx = AppContext::new()
36 .await
37 .context("Failed to connect to database. Check your profile configuration.")?;
38 let pool = ctx.db_pool().write_pool_arc()?;
39 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
40 let query_executor = QueryExecutor::new(pool);
41 Ok(Self {
42 ctx,
43 admin_service,
44 query_executor,
45 })
46 }
47}
48
49pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
50 if let DbCommands::Migrate {
51 allow_checksum_drift,
52 } = cmd
53 {
54 return admin::execute_migrate(config, allow_checksum_drift).await;
55 }
56
57 if let DbCommands::MigrateDown { extension, count } = cmd {
58 return admin::execute_migrate_down(config, &extension, count).await;
59 }
60
61 if let DbCommands::MigrateSquash {
62 extension,
63 through,
64 apply,
65 } = cmd
66 {
67 return admin_squash::execute_squash(
68 config,
69 admin_squash::SquashArgs {
70 extension: &extension,
71 through,
72 apply,
73 },
74 )
75 .await;
76 }
77
78 let db = DatabaseTool::new().await?;
79
80 match cmd {
81 DbCommands::Query {
82 sql,
83 limit,
84 offset,
85 format: _,
86 } => {
87 let params = query::QueryParams {
88 sql: &sql,
89 limit,
90 offset,
91 };
92 let result = query::execute_query(&db.query_executor, ¶ms, config).await?;
93 render_result(&result);
94 Ok(())
95 },
96 DbCommands::Execute { sql, format: _ } => {
97 let result = query::execute_write(&db.query_executor, &sql, config).await?;
98 render_result(&result);
99 Ok(())
100 },
101 DbCommands::Tables { filter } => {
102 schema::execute_tables(&db.admin_service, filter, config).await
103 },
104 DbCommands::Describe { table_name } => {
105 schema::execute_describe(&db.admin_service, &table_name, config).await
106 },
107 DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
108 DbCommands::Migrate { .. }
109 | DbCommands::MigrateDown { .. }
110 | DbCommands::MigrateSquash { .. } => unreachable!(),
111 DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
112 DbCommands::MigratePlan { extension, json } => {
113 admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
114 },
115 DbCommands::MigrateStatus { extension, json } => {
116 admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
117 },
118 DbCommands::AssignAdmin { user } => {
119 admin::execute_assign_admin(&db.ctx, &user, config).await
120 },
121 DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
122 DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
123 DbCommands::Count { table_name } => {
124 schema::execute_count(&db.admin_service, &table_name, config).await
125 },
126 DbCommands::Indexes { table } => {
127 introspect::execute_indexes(&db.admin_service, table, config).await
128 },
129 DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
130 DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
131 }
132}
133
134pub async fn execute_with_db(
135 cmd: DbCommands,
136 db_ctx: &DatabaseContext,
137 config: &CliConfig,
138) -> Result<()> {
139 let pool = db_ctx
140 .db_pool()
141 .write_pool_arc()
142 .context("Database must be PostgreSQL")?;
143 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
144 let query_executor = QueryExecutor::new(pool);
145
146 match cmd {
147 DbCommands::Query {
148 sql,
149 limit,
150 offset,
151 format: _,
152 } => {
153 let params = query::QueryParams {
154 sql: &sql,
155 limit,
156 offset,
157 };
158 let result = query::execute_query(&query_executor, ¶ms, config).await?;
159 render_result(&result);
160 Ok(())
161 },
162 DbCommands::Execute { sql, format: _ } => {
163 let result = query::execute_write(&query_executor, &sql, config).await?;
164 render_result(&result);
165 Ok(())
166 },
167 DbCommands::Tables { filter } => {
168 schema::execute_tables(&admin_service, filter, config).await
169 },
170 DbCommands::Describe { table_name } => {
171 schema::execute_describe(&admin_service, &table_name, config).await
172 },
173 DbCommands::Info => schema::execute_info(&admin_service, config).await,
174 DbCommands::Migrate {
175 allow_checksum_drift,
176 } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
177 DbCommands::MigrateDown { extension, count } => {
178 admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
179 },
180 DbCommands::MigrateSquash {
181 extension,
182 through,
183 apply,
184 } => {
185 admin_squash::execute_squash_standalone(
186 db_ctx,
187 config,
188 admin_squash::SquashArgs {
189 extension: &extension,
190 through,
191 apply,
192 },
193 )
194 .await
195 },
196 DbCommands::Migrations { cmd } => {
197 admin::execute_migrations_standalone(db_ctx, cmd, config).await
198 },
199 DbCommands::MigratePlan { extension, json } => {
200 admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
201 },
202 DbCommands::MigrateStatus { extension, json } => {
203 admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
204 .await
205 },
206 DbCommands::AssignAdmin { .. } => {
207 bail!("assign-admin requires full profile context")
208 },
209 DbCommands::Status => admin::execute_status(&admin_service, config).await,
210 DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
211 DbCommands::Count { table_name } => {
212 schema::execute_count(&admin_service, &table_name, config).await
213 },
214 DbCommands::Indexes { table } => {
215 introspect::execute_indexes(&admin_service, table, config).await
216 },
217 DbCommands::Size => introspect::execute_size(&admin_service, config).await,
218 DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
219 }
220}