1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_mark_applied;
5mod admin_migrate_plan;
6mod admin_migrate_repair;
7mod admin_migrate_status;
8mod admin_migrations;
9mod admin_squash;
10mod commands;
11mod doctor;
12mod helpers;
13mod introspect;
14mod query;
15mod schema;
16mod types;
17
18use anyhow::{Context, Result, bail};
19use std::sync::Arc;
20use systemprompt_database::{DatabaseAdminService, QueryExecutor};
21use systemprompt_runtime::{AppContext, DatabaseContext};
22
23use crate::cli_settings::CliConfig;
24use crate::shared::render_result;
25
26pub use commands::{DbCommands, MigrationsCommands};
27pub use types::*;
28
29struct DatabaseTool {
30 ctx: AppContext,
31 admin_service: DatabaseAdminService,
32 query_executor: QueryExecutor,
33}
34
35impl DatabaseTool {
36 async fn new() -> Result<Self> {
37 let ctx = AppContext::new()
38 .await
39 .context("Failed to connect to database. Check your profile configuration.")?;
40 let pool = ctx.db_pool().write_pool_arc()?;
41 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
42 let query_executor = QueryExecutor::new(pool);
43 Ok(Self {
44 ctx,
45 admin_service,
46 query_executor,
47 })
48 }
49}
50
51pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
52 if let DbCommands::Migrate {
53 allow_checksum_drift,
54 } = cmd
55 {
56 return admin::execute_migrate(config, allow_checksum_drift).await;
57 }
58
59 if let DbCommands::MigrateDown { extension, count } = cmd {
60 return admin::execute_migrate_down(config, &extension, count).await;
61 }
62
63 if let DbCommands::MigrateRepair {
64 extension,
65 apply,
66 json,
67 } = cmd
68 {
69 return admin::execute_migrate_repair(
70 config,
71 admin::RepairArgs {
72 extension: extension.as_deref(),
73 apply,
74 json,
75 },
76 )
77 .await;
78 }
79
80 if let DbCommands::MigrateMarkApplied {
81 extension,
82 version,
83 json,
84 } = cmd
85 {
86 return admin::execute_migrate_mark_applied(
87 config,
88 admin::MarkAppliedArgs {
89 extension: &extension,
90 version,
91 json,
92 },
93 )
94 .await;
95 }
96
97 if let DbCommands::MigrateSquash {
98 extension,
99 through,
100 apply,
101 } = cmd
102 {
103 return admin_squash::execute_squash(
104 config,
105 admin_squash::SquashArgs {
106 extension: &extension,
107 through,
108 apply,
109 },
110 )
111 .await;
112 }
113
114 let db = DatabaseTool::new().await?;
115
116 match cmd {
117 DbCommands::Query {
118 sql,
119 limit,
120 offset,
121 format: _,
122 } => {
123 let params = query::QueryParams {
124 sql: &sql,
125 limit,
126 offset,
127 };
128 let result = query::execute_query(&db.query_executor, ¶ms, config).await?;
129 render_result(&result);
130 Ok(())
131 },
132 DbCommands::Execute { sql, format: _ } => {
133 let result = query::execute_write(&db.query_executor, &sql, config).await?;
134 render_result(&result);
135 Ok(())
136 },
137 DbCommands::Tables { filter } => {
138 schema::execute_tables(&db.admin_service, filter, config).await
139 },
140 DbCommands::Describe { table_name } => {
141 schema::execute_describe(&db.admin_service, &table_name, config).await
142 },
143 DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
144 DbCommands::Migrate { .. }
145 | DbCommands::MigrateDown { .. }
146 | DbCommands::MigrateSquash { .. }
147 | DbCommands::MigrateRepair { .. }
148 | DbCommands::MigrateMarkApplied { .. } => unreachable!(),
149 DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
150 DbCommands::MigratePlan { extension, json } => {
151 admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
152 },
153 DbCommands::MigrateStatus { extension, json } => {
154 admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
155 },
156 DbCommands::AssignAdmin { user } => {
157 admin::execute_assign_admin(&db.ctx, &user, config).await
158 },
159 DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
160 DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
161 DbCommands::Count { table_name } => {
162 schema::execute_count(&db.admin_service, &table_name, config).await
163 },
164 DbCommands::Indexes { table } => {
165 introspect::execute_indexes(&db.admin_service, table, config).await
166 },
167 DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
168 DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
169 }
170}
171
172pub async fn execute_with_db(
173 cmd: DbCommands,
174 db_ctx: &DatabaseContext,
175 config: &CliConfig,
176) -> Result<()> {
177 let pool = db_ctx
178 .db_pool()
179 .write_pool_arc()
180 .context("Database must be PostgreSQL")?;
181 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
182 let query_executor = QueryExecutor::new(pool);
183
184 match cmd {
185 DbCommands::Query {
186 sql,
187 limit,
188 offset,
189 format: _,
190 } => {
191 let params = query::QueryParams {
192 sql: &sql,
193 limit,
194 offset,
195 };
196 let result = query::execute_query(&query_executor, ¶ms, config).await?;
197 render_result(&result);
198 Ok(())
199 },
200 DbCommands::Execute { sql, format: _ } => {
201 let result = query::execute_write(&query_executor, &sql, config).await?;
202 render_result(&result);
203 Ok(())
204 },
205 DbCommands::Tables { filter } => {
206 schema::execute_tables(&admin_service, filter, config).await
207 },
208 DbCommands::Describe { table_name } => {
209 schema::execute_describe(&admin_service, &table_name, config).await
210 },
211 DbCommands::Info => schema::execute_info(&admin_service, config).await,
212 DbCommands::Migrate {
213 allow_checksum_drift,
214 } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
215 DbCommands::MigrateDown { extension, count } => {
216 admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
217 },
218 DbCommands::MigrateSquash {
219 extension,
220 through,
221 apply,
222 } => {
223 admin_squash::execute_squash_standalone(
224 db_ctx,
225 config,
226 admin_squash::SquashArgs {
227 extension: &extension,
228 through,
229 apply,
230 },
231 )
232 .await
233 },
234 DbCommands::Migrations { cmd } => {
235 admin::execute_migrations_standalone(db_ctx, cmd, config).await
236 },
237 DbCommands::MigratePlan { extension, json } => {
238 admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
239 },
240 DbCommands::MigrateStatus { extension, json } => {
241 admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
242 .await
243 },
244 DbCommands::MigrateRepair {
245 extension,
246 apply,
247 json,
248 } => {
249 admin::execute_migrate_repair_standalone(
250 db_ctx,
251 config,
252 admin::RepairArgs {
253 extension: extension.as_deref(),
254 apply,
255 json,
256 },
257 )
258 .await
259 },
260 DbCommands::MigrateMarkApplied {
261 extension,
262 version,
263 json,
264 } => {
265 admin::execute_migrate_mark_applied_standalone(
266 db_ctx,
267 config,
268 admin::MarkAppliedArgs {
269 extension: &extension,
270 version,
271 json,
272 },
273 )
274 .await
275 },
276 DbCommands::AssignAdmin { .. } => {
277 bail!("assign-admin requires full profile context")
278 },
279 DbCommands::Status => admin::execute_status(&admin_service, config).await,
280 DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
281 DbCommands::Count { table_name } => {
282 schema::execute_count(&admin_service, &table_name, config).await
283 },
284 DbCommands::Indexes { table } => {
285 introspect::execute_indexes(&admin_service, table, config).await
286 },
287 DbCommands::Size => introspect::execute_size(&admin_service, config).await,
288 DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
289 }
290}