1mod admin;
2mod admin_migrate;
3mod admin_migrate_down;
4mod admin_migrate_plan;
5mod admin_migrate_repair;
6mod admin_migrate_status;
7mod admin_migrations;
8mod admin_squash;
9mod commands;
10mod doctor;
11mod helpers;
12mod introspect;
13mod query;
14mod schema;
15mod types;
16
17use anyhow::{Context, Result, bail};
18use std::sync::Arc;
19use systemprompt_database::{DatabaseAdminService, QueryExecutor};
20use systemprompt_runtime::{AppContext, DatabaseContext};
21
22use crate::cli_settings::CliConfig;
23use crate::shared::render_result;
24
25pub use commands::{DbCommands, MigrationsCommands};
26pub use types::*;
27
28struct DatabaseTool {
29 ctx: AppContext,
30 admin_service: DatabaseAdminService,
31 query_executor: QueryExecutor,
32}
33
34impl DatabaseTool {
35 async fn new() -> Result<Self> {
36 let ctx = AppContext::new()
37 .await
38 .context("Failed to connect to database. Check your profile configuration.")?;
39 let pool = ctx.db_pool().write_pool_arc()?;
40 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
41 let query_executor = QueryExecutor::new(pool);
42 Ok(Self {
43 ctx,
44 admin_service,
45 query_executor,
46 })
47 }
48}
49
50pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
51 if let DbCommands::Migrate {
52 allow_checksum_drift,
53 } = cmd
54 {
55 return admin::execute_migrate(config, allow_checksum_drift).await;
56 }
57
58 if let DbCommands::MigrateDown { extension, count } = cmd {
59 return admin::execute_migrate_down(config, &extension, count).await;
60 }
61
62 if let DbCommands::MigrateRepair {
63 extension,
64 apply,
65 json,
66 } = cmd
67 {
68 return admin::execute_migrate_repair(
69 config,
70 admin::RepairArgs {
71 extension: extension.as_deref(),
72 apply,
73 json,
74 },
75 )
76 .await;
77 }
78
79 if let DbCommands::MigrateSquash {
80 extension,
81 through,
82 apply,
83 } = cmd
84 {
85 return admin_squash::execute_squash(
86 config,
87 admin_squash::SquashArgs {
88 extension: &extension,
89 through,
90 apply,
91 },
92 )
93 .await;
94 }
95
96 let db = DatabaseTool::new().await?;
97
98 match cmd {
99 DbCommands::Query {
100 sql,
101 limit,
102 offset,
103 format: _,
104 } => {
105 let params = query::QueryParams {
106 sql: &sql,
107 limit,
108 offset,
109 };
110 let result = query::execute_query(&db.query_executor, ¶ms, config).await?;
111 render_result(&result);
112 Ok(())
113 },
114 DbCommands::Execute { sql, format: _ } => {
115 let result = query::execute_write(&db.query_executor, &sql, config).await?;
116 render_result(&result);
117 Ok(())
118 },
119 DbCommands::Tables { filter } => {
120 schema::execute_tables(&db.admin_service, filter, config).await
121 },
122 DbCommands::Describe { table_name } => {
123 schema::execute_describe(&db.admin_service, &table_name, config).await
124 },
125 DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
126 DbCommands::Migrate { .. }
127 | DbCommands::MigrateDown { .. }
128 | DbCommands::MigrateSquash { .. }
129 | DbCommands::MigrateRepair { .. } => unreachable!(),
130 DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
131 DbCommands::MigratePlan { extension, json } => {
132 admin::execute_migrate_plan(&db.ctx, extension.as_deref(), json, config).await
133 },
134 DbCommands::MigrateStatus { extension, json } => {
135 admin::execute_migrate_status(&db.ctx, extension.as_deref(), json, config).await
136 },
137 DbCommands::AssignAdmin { user } => {
138 admin::execute_assign_admin(&db.ctx, &user, config).await
139 },
140 DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
141 DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
142 DbCommands::Count { table_name } => {
143 schema::execute_count(&db.admin_service, &table_name, config).await
144 },
145 DbCommands::Indexes { table } => {
146 introspect::execute_indexes(&db.admin_service, table, config).await
147 },
148 DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
149 DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
150 }
151}
152
153pub async fn execute_with_db(
154 cmd: DbCommands,
155 db_ctx: &DatabaseContext,
156 config: &CliConfig,
157) -> Result<()> {
158 let pool = db_ctx
159 .db_pool()
160 .write_pool_arc()
161 .context("Database must be PostgreSQL")?;
162 let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
163 let query_executor = QueryExecutor::new(pool);
164
165 match cmd {
166 DbCommands::Query {
167 sql,
168 limit,
169 offset,
170 format: _,
171 } => {
172 let params = query::QueryParams {
173 sql: &sql,
174 limit,
175 offset,
176 };
177 let result = query::execute_query(&query_executor, ¶ms, config).await?;
178 render_result(&result);
179 Ok(())
180 },
181 DbCommands::Execute { sql, format: _ } => {
182 let result = query::execute_write(&query_executor, &sql, config).await?;
183 render_result(&result);
184 Ok(())
185 },
186 DbCommands::Tables { filter } => {
187 schema::execute_tables(&admin_service, filter, config).await
188 },
189 DbCommands::Describe { table_name } => {
190 schema::execute_describe(&admin_service, &table_name, config).await
191 },
192 DbCommands::Info => schema::execute_info(&admin_service, config).await,
193 DbCommands::Migrate {
194 allow_checksum_drift,
195 } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
196 DbCommands::MigrateDown { extension, count } => {
197 admin::execute_migrate_down_standalone(db_ctx, config, &extension, count).await
198 },
199 DbCommands::MigrateSquash {
200 extension,
201 through,
202 apply,
203 } => {
204 admin_squash::execute_squash_standalone(
205 db_ctx,
206 config,
207 admin_squash::SquashArgs {
208 extension: &extension,
209 through,
210 apply,
211 },
212 )
213 .await
214 },
215 DbCommands::Migrations { cmd } => {
216 admin::execute_migrations_standalone(db_ctx, cmd, config).await
217 },
218 DbCommands::MigratePlan { extension, json } => {
219 admin::execute_migrate_plan_standalone(db_ctx, extension.as_deref(), json, config).await
220 },
221 DbCommands::MigrateStatus { extension, json } => {
222 admin::execute_migrate_status_standalone(db_ctx, extension.as_deref(), json, config)
223 .await
224 },
225 DbCommands::MigrateRepair {
226 extension,
227 apply,
228 json,
229 } => {
230 admin::execute_migrate_repair_standalone(
231 db_ctx,
232 config,
233 admin::RepairArgs {
234 extension: extension.as_deref(),
235 apply,
236 json,
237 },
238 )
239 .await
240 },
241 DbCommands::AssignAdmin { .. } => {
242 bail!("assign-admin requires full profile context")
243 },
244 DbCommands::Status => admin::execute_status(&admin_service, config).await,
245 DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
246 DbCommands::Count { table_name } => {
247 schema::execute_count(&admin_service, &table_name, config).await
248 },
249 DbCommands::Indexes { table } => {
250 introspect::execute_indexes(&admin_service, table, config).await
251 },
252 DbCommands::Size => introspect::execute_size(&admin_service, config).await,
253 DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
254 }
255}