Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1mod admin;
2mod admin_migrate;
3mod admin_migrations;
4mod doctor;
5mod helpers;
6mod introspect;
7mod query;
8mod schema;
9mod types;
10
11use anyhow::{Context, Result, bail};
12use clap::Subcommand;
13use std::sync::Arc;
14use systemprompt_database::{DatabaseAdminService, QueryExecutor};
15use systemprompt_runtime::{AppContext, DatabaseContext};
16
17use crate::cli_settings::CliConfig;
18use crate::shared::render_result;
19
20pub use types::*;
21
22#[derive(Debug, Subcommand)]
23pub enum DbCommands {
24    #[command(about = "Execute SQL query (read-only)")]
25    Query {
26        sql: String,
27        #[arg(long)]
28        limit: Option<u32>,
29        #[arg(long)]
30        offset: Option<u32>,
31        #[arg(long)]
32        format: Option<String>,
33    },
34    #[command(about = "Execute write operation (INSERT, UPDATE, DELETE)")]
35    Execute {
36        sql: String,
37        #[arg(long)]
38        format: Option<String>,
39    },
40    #[command(about = "List all tables with row counts and sizes")]
41    Tables {
42        #[arg(long, help = "Filter tables by pattern")]
43        filter: Option<String>,
44    },
45    #[command(about = "Describe table schema with columns and indexes")]
46    Describe { table_name: String },
47    #[command(about = "Show database information")]
48    Info,
49    #[command(about = "Run database migrations")]
50    Migrate {
51        #[arg(
52            long,
53            help = "Continue past migration checksum mismatches with a warning instead of \
54                    erroring (use with caution)"
55        )]
56        allow_checksum_drift: bool,
57    },
58    #[command(about = "Show migration status and history")]
59    Migrations {
60        #[command(subcommand)]
61        cmd: MigrationsCommands,
62    },
63    #[command(about = "Assign admin role to a user")]
64    AssignAdmin { user: String },
65    #[command(about = "Show database connection status")]
66    Status,
67    #[command(about = "Validate database schema against expected tables")]
68    Validate,
69    #[command(about = "Get row count for a table")]
70    Count { table_name: String },
71    #[command(about = "List all indexes")]
72    Indexes {
73        #[arg(long, help = "Filter by table name")]
74        table: Option<String>,
75    },
76    #[command(about = "Show database and table sizes")]
77    Size,
78    #[command(about = "Diff live schema against extension declarations")]
79    Doctor,
80}
81
82#[derive(Debug, Subcommand)]
83pub enum MigrationsCommands {
84    #[command(about = "Show migration status for all extensions")]
85    Status,
86    #[command(about = "Show migration history for an extension")]
87    History {
88        #[arg(help = "Extension ID")]
89        extension: String,
90    },
91}
92
93struct DatabaseTool {
94    ctx: AppContext,
95    admin_service: DatabaseAdminService,
96    query_executor: QueryExecutor,
97}
98
99impl DatabaseTool {
100    async fn new() -> Result<Self> {
101        let ctx = AppContext::new()
102            .await
103            .context("Failed to connect to database. Check your profile configuration.")?;
104        let pool = ctx.db_pool().write_pool_arc()?;
105        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
106        let query_executor = QueryExecutor::new(pool);
107        Ok(Self {
108            ctx,
109            admin_service,
110            query_executor,
111        })
112    }
113}
114
115pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
116    if let DbCommands::Migrate {
117        allow_checksum_drift,
118    } = cmd
119    {
120        return admin::execute_migrate(config, allow_checksum_drift).await;
121    }
122
123    let db = DatabaseTool::new().await?;
124
125    match cmd {
126        DbCommands::Query {
127            sql,
128            limit,
129            offset,
130            format: _,
131        } => {
132            let params = query::QueryParams {
133                sql: &sql,
134                limit,
135                offset,
136            };
137            let result = query::execute_query(&db.query_executor, &params, config).await?;
138            render_result(&result);
139            Ok(())
140        },
141        DbCommands::Execute { sql, format: _ } => {
142            let result = query::execute_write(&db.query_executor, &sql, config).await?;
143            render_result(&result);
144            Ok(())
145        },
146        DbCommands::Tables { filter } => {
147            schema::execute_tables(&db.admin_service, filter, config).await
148        },
149        DbCommands::Describe { table_name } => {
150            schema::execute_describe(&db.admin_service, &table_name, config).await
151        },
152        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
153        DbCommands::Migrate { .. } => unreachable!(),
154        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
155        DbCommands::AssignAdmin { user } => {
156            admin::execute_assign_admin(&db.ctx, &user, config).await
157        },
158        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
159        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
160        DbCommands::Count { table_name } => {
161            schema::execute_count(&db.admin_service, &table_name, config).await
162        },
163        DbCommands::Indexes { table } => {
164            introspect::execute_indexes(&db.admin_service, table, config).await
165        },
166        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
167        DbCommands::Doctor => doctor::execute_doctor(db.ctx.db_pool(), config).await,
168    }
169}
170
171pub async fn execute_with_db(
172    cmd: DbCommands,
173    db_ctx: &DatabaseContext,
174    config: &CliConfig,
175) -> Result<()> {
176    let pool = db_ctx
177        .db_pool()
178        .write_pool_arc()
179        .context("Database must be PostgreSQL")?;
180    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
181    let query_executor = QueryExecutor::new(pool);
182
183    match cmd {
184        DbCommands::Query {
185            sql,
186            limit,
187            offset,
188            format: _,
189        } => {
190            let params = query::QueryParams {
191                sql: &sql,
192                limit,
193                offset,
194            };
195            let result = query::execute_query(&query_executor, &params, config).await?;
196            render_result(&result);
197            Ok(())
198        },
199        DbCommands::Execute { sql, format: _ } => {
200            let result = query::execute_write(&query_executor, &sql, config).await?;
201            render_result(&result);
202            Ok(())
203        },
204        DbCommands::Tables { filter } => {
205            schema::execute_tables(&admin_service, filter, config).await
206        },
207        DbCommands::Describe { table_name } => {
208            schema::execute_describe(&admin_service, &table_name, config).await
209        },
210        DbCommands::Info => schema::execute_info(&admin_service, config).await,
211        DbCommands::Migrate {
212            allow_checksum_drift,
213        } => admin::execute_migrate_standalone(db_ctx, config, allow_checksum_drift).await,
214        DbCommands::Migrations { cmd } => {
215            admin::execute_migrations_standalone(db_ctx, cmd, config).await
216        },
217        DbCommands::AssignAdmin { .. } => {
218            bail!("assign-admin requires full profile context")
219        },
220        DbCommands::Status => admin::execute_status(&admin_service, config).await,
221        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
222        DbCommands::Count { table_name } => {
223            schema::execute_count(&admin_service, &table_name, config).await
224        },
225        DbCommands::Indexes { table } => {
226            introspect::execute_indexes(&admin_service, table, config).await
227        },
228        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
229        DbCommands::Doctor => doctor::execute_doctor(db_ctx.db_pool(), config).await,
230    }
231}