1mod admin;
9mod admin_migrate;
10mod admin_migrate_down;
11mod admin_migrate_mark_applied;
12mod admin_migrate_plan;
13mod admin_migrate_repair;
14mod admin_migrate_status;
15mod admin_migrations;
16mod admin_squash;
17mod commands;
18mod doctor;
19mod helpers;
20mod introspect;
21mod query;
22mod schema;
23mod types;
24
25use anyhow::{Context, Result, bail};
26use std::sync::Arc;
27use systemprompt_database::{DatabaseAdminService, QueryExecutor};
28use systemprompt_runtime::{AppContext, DatabaseContext};
29
30use crate::cli_settings::CliConfig;
31use crate::shared::render_result;
32
33pub use commands::{DbCommands, MigrationsCommands};
34pub use types::*;
35
36struct DatabaseTool {
37 ctx: AppContext,
38 admin_service: DatabaseAdminService,
39 query_executor: QueryExecutor,
40}
41
42impl DatabaseTool {
43 async fn new() -> Result<Self> {
44 let ctx = AppContext::new()
45 .await
46 .context("Failed to connect to database. Check your profile configuration.")?;
47 let pool = ctx.db_pool().write_pool_arc()?;
48 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
49 let query_executor = QueryExecutor::new(pool);
50 Ok(Self {
51 ctx,
52 admin_service,
53 query_executor,
54 })
55 }
56}
57
58pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
59 if let DbCommands::Migrate {
60 allow_checksum_drift,
61 } = cmd
62 {
63 return admin::execute_migrate(config, allow_checksum_drift).await;
64 }
65
66 if let DbCommands::MigrateDown { extension, count } = cmd {
67 return admin::execute_migrate_down(config, &extension, count).await;
68 }
69
70 if let DbCommands::MigrateRepair {
71 extension,
72 apply,
73 json,
74 } = cmd
75 {
76 return admin::execute_migrate_repair(
77 config,
78 admin::RepairArgs {
79 extension: extension.as_deref(),
80 apply,
81 json,
82 },
83 )
84 .await;
85 }
86
87 if let DbCommands::MigrateMarkApplied {
88 extension,
89 version,
90 json,
91 } = cmd
92 {
93 return admin::execute_migrate_mark_applied(
94 config,
95 admin::MarkAppliedArgs {
96 extension: &extension,
97 version,
98 json,
99 },
100 )
101 .await;
102 }
103
104 if let DbCommands::MigrateSquash {
105 extension,
106 through,
107 apply,
108 } = cmd
109 {
110 return admin_squash::execute_squash(
111 config,
112 admin_squash::SquashArgs {
113 extension: &extension,
114 through,
115 apply,
116 },
117 )
118 .await;
119 }
120
121 let db = DatabaseTool::new().await?;
122
123 match cmd {
124 DbCommands::Query {
125 sql,
126 limit,
127 offset,
128 format: _,
129 } => {
130 let params = query::QueryParams {
131 sql: &sql,
132 limit,
133 offset,
134 };
135 let result = query::execute_query(&db.query_executor, ¶ms, config).await?;
136 render_result(&result);
137 Ok(())
138 },
139 DbCommands::Execute { sql, format: _ } => {
140 let result = query::execute_write(&db.query_executor, &sql, config).await?;
141 render_result(&result);
142 Ok(())
143 },
144 DbCommands::Tables { filter } => {
145 schema::execute_tables(&db.admin_service, filter, config).await
146 },
147 DbCommands::Describe { table_name } => {
148 schema::execute_describe(&db.admin_service, &table_name, config).await
149 },
150 DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
151 DbCommands::Migrate { .. }
152 | DbCommands::MigrateDown { .. }
153 | DbCommands::MigrateSquash { .. }
154 | DbCommands::MigrateRepair { .. }
155 | DbCommands::MigrateMarkApplied { .. } => unreachable!(),
156 DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
157 DbCommands::MigratePlan { extension, json } => {
158 admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
159 },
160 DbCommands::MigrateStatus { extension, json } => {
161 admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
162 },
163 DbCommands::AssignAdmin { user } => {
164 admin::execute_assign_admin(&db.ctx, &user, config).await
165 },
166 DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
167 DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
168 DbCommands::Count { table_name } => {
169 schema::execute_count(&db.admin_service, &table_name, config).await
170 },
171 DbCommands::Indexes { table } => {
172 introspect::execute_indexes(&db.admin_service, table, config).await
173 },
174 DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
175 DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
176 }
177}
178
179pub async fn execute_with_db(
180 cmd: DbCommands,
181 db_ctx: &DatabaseContext,
182 config: &CliConfig,
183) -> Result<()> {
184 let pool = db_ctx
185 .db_pool()
186 .write_pool_arc()
187 .context("Database must be PostgreSQL")?;
188 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
189 let query_executor = QueryExecutor::new(pool);
190
191 match cmd {
192 DbCommands::Query {
193 sql,
194 limit,
195 offset,
196 format: _,
197 } => {
198 let params = query::QueryParams {
199 sql: &sql,
200 limit,
201 offset,
202 };
203 let result = query::execute_query(&query_executor, ¶ms, config).await?;
204 render_result(&result);
205 Ok(())
206 },
207 DbCommands::Execute { sql, format: _ } => {
208 let result = query::execute_write(&query_executor, &sql, config).await?;
209 render_result(&result);
210 Ok(())
211 },
212 DbCommands::Tables { filter } => {
213 schema::execute_tables(&admin_service, filter, config).await
214 },
215 DbCommands::Describe { table_name } => {
216 schema::execute_describe(&admin_service, &table_name, config).await
217 },
218 DbCommands::Info => schema::execute_info(&admin_service, config).await,
219 DbCommands::Migrate {
220 allow_checksum_drift,
221 } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
222 DbCommands::MigrateDown { extension, count } => {
223 admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
224 },
225 DbCommands::MigrateSquash {
226 extension,
227 through,
228 apply,
229 } => {
230 admin_squash::execute_squash_standalone(
231 db_ctx,
232 config,
233 admin_squash::SquashArgs {
234 extension: &extension,
235 through,
236 apply,
237 },
238 )
239 .await
240 },
241 DbCommands::Migrations { cmd } => {
242 admin::execute_migrations_standalone(db_ctx, cmd, config).await
243 },
244 DbCommands::MigratePlan { extension, json } => {
245 admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
246 },
247 DbCommands::MigrateStatus { extension, json } => {
248 admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
249 .await
250 },
251 DbCommands::MigrateRepair {
252 extension,
253 apply,
254 json,
255 } => {
256 admin::execute_migrate_repair_standalone(
257 db_ctx,
258 config,
259 admin::RepairArgs {
260 extension: extension.as_deref(),
261 apply,
262 json,
263 },
264 )
265 .await
266 },
267 DbCommands::MigrateMarkApplied {
268 extension,
269 version,
270 json,
271 } => {
272 admin::execute_migrate_mark_applied_standalone(
273 db_ctx,
274 config,
275 admin::MarkAppliedArgs {
276 extension: &extension,
277 version,
278 json,
279 },
280 )
281 .await
282 },
283 DbCommands::AssignAdmin { .. } => {
284 bail!("assign-admin requires full profile context")
285 },
286 DbCommands::Status => admin::execute_status(&admin_service, config).await,
287 DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
288 DbCommands::Count { table_name } => {
289 schema::execute_count(&admin_service, &table_name, config).await
290 },
291 DbCommands::Indexes { table } => {
292 introspect::execute_indexes(&admin_service, table, config).await
293 },
294 DbCommands::Size => introspect::execute_size(&admin_service, config).await,
295 DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
296 }
297}