Skip to main content

systemprompt_cli/commands/infrastructure/db/
mod.rs

1mod admin;
2mod admin_migrate;
3mod admin_migrations;
4mod helpers;
5mod introspect;
6mod query;
7mod schema;
8mod types;
9
10use anyhow::{Context, Result, bail};
11use clap::Subcommand;
12use std::sync::Arc;
13use systemprompt_database::{DatabaseAdminService, QueryExecutor};
14use systemprompt_runtime::{AppContext, DatabaseContext};
15
16use crate::cli_settings::CliConfig;
17use crate::shared::render_result;
18
19pub use types::*;
20
21#[derive(Debug, Subcommand)]
22pub enum DbCommands {
23    #[command(about = "Execute SQL query (read-only)")]
24    Query {
25        sql: String,
26        #[arg(long)]
27        limit: Option<u32>,
28        #[arg(long)]
29        offset: Option<u32>,
30        #[arg(long)]
31        format: Option<String>,
32    },
33    #[command(about = "Execute write operation (INSERT, UPDATE, DELETE)")]
34    Execute {
35        sql: String,
36        #[arg(long)]
37        format: Option<String>,
38    },
39    #[command(about = "List all tables with row counts and sizes")]
40    Tables {
41        #[arg(long, help = "Filter tables by pattern")]
42        filter: Option<String>,
43    },
44    #[command(about = "Describe table schema with columns and indexes")]
45    Describe { table_name: String },
46    #[command(about = "Show database information")]
47    Info,
48    #[command(about = "Run database migrations")]
49    Migrate,
50    #[command(about = "Show migration status and history")]
51    Migrations {
52        #[command(subcommand)]
53        cmd: MigrationsCommands,
54    },
55    #[command(about = "Assign admin role to a user")]
56    AssignAdmin { user: String },
57    #[command(about = "Show database connection status")]
58    Status,
59    #[command(about = "Validate database schema against expected tables")]
60    Validate,
61    #[command(about = "Get row count for a table")]
62    Count { table_name: String },
63    #[command(about = "List all indexes")]
64    Indexes {
65        #[arg(long, help = "Filter by table name")]
66        table: Option<String>,
67    },
68    #[command(about = "Show database and table sizes")]
69    Size,
70}
71
72#[derive(Debug, Subcommand)]
73pub enum MigrationsCommands {
74    #[command(about = "Show migration status for all extensions")]
75    Status,
76    #[command(about = "Show migration history for an extension")]
77    History {
78        #[arg(help = "Extension ID")]
79        extension: String,
80    },
81}
82
83struct DatabaseTool {
84    ctx: AppContext,
85    admin_service: DatabaseAdminService,
86    query_executor: QueryExecutor,
87}
88
89impl DatabaseTool {
90    async fn new() -> Result<Self> {
91        let ctx = AppContext::new()
92            .await
93            .context("Failed to connect to database. Check your profile configuration.")?;
94        let pool = ctx.db_pool().write_pool_arc()?;
95        let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
96        let query_executor = QueryExecutor::new(pool);
97        Ok(Self {
98            ctx,
99            admin_service,
100            query_executor,
101        })
102    }
103}
104
105pub async fn execute(cmd: DbCommands, config: &CliConfig) -> Result<()> {
106    if matches!(cmd, DbCommands::Migrate) {
107        return admin::execute_migrate(config).await;
108    }
109
110    let db = DatabaseTool::new().await?;
111
112    match cmd {
113        DbCommands::Query {
114            sql,
115            limit,
116            offset,
117            format: _,
118        } => {
119            let params = query::QueryParams {
120                sql: &sql,
121                limit,
122                offset,
123            };
124            let result = query::execute_query(&db.query_executor, &params, config).await?;
125            render_result(&result);
126            Ok(())
127        },
128        DbCommands::Execute { sql, format: _ } => {
129            let result = query::execute_write(&db.query_executor, &sql, config).await?;
130            render_result(&result);
131            Ok(())
132        },
133        DbCommands::Tables { filter } => {
134            schema::execute_tables(&db.admin_service, filter, config).await
135        },
136        DbCommands::Describe { table_name } => {
137            schema::execute_describe(&db.admin_service, &table_name, config).await
138        },
139        DbCommands::Info => schema::execute_info(&db.admin_service, config).await,
140        DbCommands::Migrate => unreachable!(),
141        DbCommands::Migrations { cmd } => admin::execute_migrations(&db.ctx, cmd, config).await,
142        DbCommands::AssignAdmin { user } => {
143            admin::execute_assign_admin(&db.ctx, &user, config).await
144        },
145        DbCommands::Status => admin::execute_status(&db.admin_service, config).await,
146        DbCommands::Validate => schema::execute_validate(&db.admin_service, config).await,
147        DbCommands::Count { table_name } => {
148            schema::execute_count(&db.admin_service, &table_name, config).await
149        },
150        DbCommands::Indexes { table } => {
151            introspect::execute_indexes(&db.admin_service, table, config).await
152        },
153        DbCommands::Size => introspect::execute_size(&db.admin_service, config).await,
154    }
155}
156
157pub async fn execute_with_db(
158    cmd: DbCommands,
159    db_ctx: &DatabaseContext,
160    config: &CliConfig,
161) -> Result<()> {
162    let pool = db_ctx
163        .db_pool()
164        .write_pool_arc()
165        .context("Database must be PostgreSQL")?;
166    let admin_service = DatabaseAdminService::new(Arc::clone(&pool));
167    let query_executor = QueryExecutor::new(pool);
168
169    match cmd {
170        DbCommands::Query {
171            sql,
172            limit,
173            offset,
174            format: _,
175        } => {
176            let params = query::QueryParams {
177                sql: &sql,
178                limit,
179                offset,
180            };
181            let result = query::execute_query(&query_executor, &params, config).await?;
182            render_result(&result);
183            Ok(())
184        },
185        DbCommands::Execute { sql, format: _ } => {
186            let result = query::execute_write(&query_executor, &sql, config).await?;
187            render_result(&result);
188            Ok(())
189        },
190        DbCommands::Tables { filter } => {
191            schema::execute_tables(&admin_service, filter, config).await
192        },
193        DbCommands::Describe { table_name } => {
194            schema::execute_describe(&admin_service, &table_name, config).await
195        },
196        DbCommands::Info => schema::execute_info(&admin_service, config).await,
197        DbCommands::Migrate => admin::execute_migrate_standalone(db_ctx, config).await,
198        DbCommands::Migrations { cmd } => {
199            admin::execute_migrations_standalone(db_ctx, cmd, config).await
200        },
201        DbCommands::AssignAdmin { .. } => {
202            bail!("assign-admin requires full profile context")
203        },
204        DbCommands::Status => admin::execute_status(&admin_service, config).await,
205        DbCommands::Validate => schema::execute_validate(&admin_service, config).await,
206        DbCommands::Count { table_name } => {
207            schema::execute_count(&admin_service, &table_name, config).await
208        },
209        DbCommands::Indexes { table } => {
210            introspect::execute_indexes(&admin_service, table, config).await
211        },
212        DbCommands::Size => introspect::execute_size(&admin_service, config).await,
213    }
214}