Skip to main content

systemprompt_cli/commands/cloud/
db.rs

1use anyhow::{anyhow, bail, Context, Result};
2use clap::{Subcommand, ValueEnum};
3use std::path::PathBuf;
4use std::process::Command;
5use systemprompt_cloud::{ProfilePath, ProjectContext};
6use systemprompt_logging::CliService;
7use systemprompt_runtime::DatabaseContext;
8
9use crate::cli_settings::CliConfig;
10use crate::commands::infrastructure::db;
11
12#[derive(Debug, Subcommand)]
13pub enum CloudDbCommands {
14    #[command(about = "Run migrations on cloud database")]
15    Migrate {
16        #[arg(long, help = "Profile name")]
17        profile: String,
18    },
19
20    #[command(about = "Execute SQL query (read-only) on cloud database")]
21    Query {
22        #[arg(long, help = "Profile name")]
23        profile: String,
24        sql: String,
25        #[arg(long)]
26        limit: Option<u32>,
27        #[arg(long)]
28        offset: Option<u32>,
29        #[arg(long)]
30        format: Option<String>,
31    },
32
33    #[command(about = "Execute write operation on cloud database")]
34    Execute {
35        #[arg(long, help = "Profile name")]
36        profile: String,
37        sql: String,
38        #[arg(long)]
39        format: Option<String>,
40    },
41
42    #[command(about = "Validate cloud database schema")]
43    Validate {
44        #[arg(long, help = "Profile name")]
45        profile: String,
46    },
47
48    #[command(about = "Show cloud database connection status")]
49    Status {
50        #[arg(long, help = "Profile name")]
51        profile: String,
52    },
53
54    #[command(about = "Show cloud database info")]
55    Info {
56        #[arg(long, help = "Profile name")]
57        profile: String,
58    },
59
60    #[command(about = "List all tables in cloud database")]
61    Tables {
62        #[arg(long, help = "Profile name")]
63        profile: String,
64        #[arg(long, help = "Filter tables by pattern")]
65        filter: Option<String>,
66    },
67
68    #[command(about = "Describe table schema in cloud database")]
69    Describe {
70        #[arg(long, help = "Profile name")]
71        profile: String,
72        table_name: String,
73    },
74
75    #[command(about = "Get row count for a table in cloud database")]
76    Count {
77        #[arg(long, help = "Profile name")]
78        profile: String,
79        table_name: String,
80    },
81
82    #[command(about = "List all indexes in cloud database")]
83    Indexes {
84        #[arg(long, help = "Profile name")]
85        profile: String,
86        #[arg(long, help = "Filter by table name")]
87        table: Option<String>,
88    },
89
90    #[command(about = "Show cloud database and table sizes")]
91    Size {
92        #[arg(long, help = "Profile name")]
93        profile: String,
94    },
95
96    #[command(about = "Backup cloud database using pg_dump")]
97    Backup {
98        #[arg(long, help = "Profile name")]
99        profile: String,
100
101        #[arg(
102            long,
103            default_value = "custom",
104            help = "Backup format: custom, sql, directory"
105        )]
106        format: BackupFormat,
107
108        #[arg(
109            long,
110            help = "Output file path (default: backups/<profile>-<timestamp>.<ext>)"
111        )]
112        output: Option<String>,
113    },
114
115    #[command(about = "Restore cloud database from a backup file")]
116    Restore {
117        #[arg(long, help = "Profile name")]
118        profile: String,
119
120        #[arg(help = "Path to backup file")]
121        file: String,
122
123        #[arg(short = 'y', long, help = "Skip confirmation prompt")]
124        yes: bool,
125    },
126}
127
128#[derive(Debug, Clone, Copy, ValueEnum)]
129pub enum BackupFormat {
130    #[value(help = "pg_dump custom format (-Fc), supports parallel restore")]
131    Custom,
132    #[value(help = "Plain SQL text format (-Fp), human-readable")]
133    Sql,
134    #[value(help = "Directory format (-Fd), supports parallel dump and restore")]
135    Directory,
136}
137
138impl CloudDbCommands {
139    fn profile_name(&self) -> &str {
140        match self {
141            Self::Migrate { profile }
142            | Self::Query { profile, .. }
143            | Self::Execute { profile, .. }
144            | Self::Validate { profile }
145            | Self::Status { profile }
146            | Self::Info { profile }
147            | Self::Tables { profile, .. }
148            | Self::Describe { profile, .. }
149            | Self::Count { profile, .. }
150            | Self::Indexes { profile, .. }
151            | Self::Size { profile }
152            | Self::Backup { profile, .. }
153            | Self::Restore { profile, .. } => profile,
154        }
155    }
156
157    fn into_db_command(self) -> Option<db::DbCommands> {
158        match self {
159            Self::Migrate { .. } => Some(db::DbCommands::Migrate),
160            Self::Query {
161                sql,
162                limit,
163                offset,
164                format,
165                ..
166            } => Some(db::DbCommands::Query {
167                sql,
168                limit,
169                offset,
170                format,
171            }),
172            Self::Execute { sql, format, .. } => Some(db::DbCommands::Execute { sql, format }),
173            Self::Validate { .. } => Some(db::DbCommands::Validate),
174            Self::Status { .. } => Some(db::DbCommands::Status),
175            Self::Info { .. } => Some(db::DbCommands::Info),
176            Self::Tables { filter, .. } => Some(db::DbCommands::Tables { filter }),
177            Self::Describe { table_name, .. } => Some(db::DbCommands::Describe { table_name }),
178            Self::Count { table_name, .. } => Some(db::DbCommands::Count { table_name }),
179            Self::Indexes { table, .. } => Some(db::DbCommands::Indexes { table }),
180            Self::Size { .. } => Some(db::DbCommands::Size),
181            Self::Backup { .. } | Self::Restore { .. } => None,
182        }
183    }
184}
185
186pub async fn execute(cmd: CloudDbCommands, config: &CliConfig) -> Result<()> {
187    let profile_name = cmd.profile_name().to_string();
188    let db_url = load_cloud_database_url(&profile_name)?;
189    execute_inner(cmd, &profile_name, &db_url, config).await
190}
191
192pub async fn execute_with_database_url(
193    cmd: CloudDbCommands,
194    database_url: &str,
195    config: &CliConfig,
196) -> Result<()> {
197    let profile_name = cmd.profile_name().to_string();
198    execute_inner(cmd, &profile_name, database_url, config).await
199}
200
201async fn execute_inner(
202    cmd: CloudDbCommands,
203    profile_name: &str,
204    db_url: &str,
205    config: &CliConfig,
206) -> Result<()> {
207    match &cmd {
208        CloudDbCommands::Backup { format, output, .. } => {
209            return execute_backup(profile_name, db_url, *format, output.as_deref());
210        },
211        CloudDbCommands::Restore { file, yes, .. } => {
212            return execute_restore(profile_name, db_url, file, *yes, config);
213        },
214        _ => {},
215    }
216
217    let db_ctx = DatabaseContext::from_url(db_url).await?;
218    let db_cmd = cmd
219        .into_db_command()
220        .ok_or_else(|| anyhow!("Unexpected command variant"))?;
221
222    db::execute_with_db(db_cmd, &db_ctx, config).await
223}
224
225fn load_cloud_database_url(profile_name: &str) -> Result<String> {
226    let ctx = ProjectContext::discover();
227    let profile_dir = ctx.profile_dir(profile_name);
228
229    if !profile_dir.exists() {
230        return Err(anyhow!("Profile '{}' not found", profile_name));
231    }
232
233    let secrets_path = ProfilePath::Secrets.resolve(&profile_dir);
234    let secrets = systemprompt_models::Secrets::load_from_path(&secrets_path)
235        .with_context(|| format!("Failed to load secrets for profile '{}'", profile_name))?;
236
237    Ok(secrets.effective_database_url(true).to_string())
238}
239
240fn ensure_pg_tool(tool: &str) -> Result<()> {
241    match Command::new(tool).arg("--version").output() {
242        Ok(output) if output.status.success() => Ok(()),
243        _ => bail!(
244            "'{}' not found. Install PostgreSQL client tools:\n  apt install postgresql-client",
245            tool
246        ),
247    }
248}
249
250const fn backup_extension(format: BackupFormat) -> &'static str {
251    match format {
252        BackupFormat::Custom => "dump",
253        BackupFormat::Sql => "sql",
254        BackupFormat::Directory => "dir",
255    }
256}
257
258const fn pg_dump_format_flag(format: BackupFormat) -> &'static str {
259    match format {
260        BackupFormat::Custom => "-Fc",
261        BackupFormat::Sql => "-Fp",
262        BackupFormat::Directory => "-Fd",
263    }
264}
265
266fn find_pg_dump() -> Result<PathBuf> {
267    for version in [17, 16, 15, 14] {
268        let path = PathBuf::from(format!("/usr/lib/postgresql/{}/bin/pg_dump", version));
269        if path.exists() {
270            return Ok(path);
271        }
272    }
273    ensure_pg_tool("pg_dump")?;
274    Ok(PathBuf::from("pg_dump"))
275}
276
277fn adjust_ssl_mode(database_url: &str) -> String {
278    database_url.replace("sslmode=require", "sslmode=prefer")
279}
280
281fn execute_backup(
282    profile_name: &str,
283    database_url: &str,
284    format: BackupFormat,
285    output: Option<&str>,
286) -> Result<()> {
287    let pg_dump = find_pg_dump()?;
288
289    let output_path = if let Some(p) = output {
290        PathBuf::from(p)
291    } else {
292        let timestamp = chrono::Utc::now().format("%Y-%m-%d-%H%M%S");
293        let ext = backup_extension(format);
294        let dir = PathBuf::from("backups");
295        std::fs::create_dir_all(&dir)
296            .with_context(|| format!("Failed to create {}", dir.display()))?;
297        dir.join(format!("{}-{}.{}", profile_name, timestamp, ext))
298    };
299
300    CliService::section("Cloud Database Backup");
301    CliService::key_value("Profile", profile_name);
302    CliService::key_value("Format", &format!("{:?}", format));
303    CliService::key_value("Output", &output_path.display().to_string());
304
305    let adjusted_url = adjust_ssl_mode(database_url);
306    let spinner = CliService::spinner("Running pg_dump...");
307
308    let result = Command::new(&pg_dump)
309        .arg(pg_dump_format_flag(format))
310        .arg("--no-owner")
311        .arg("--no-privileges")
312        .arg("-f")
313        .arg(&output_path)
314        .arg(&adjusted_url)
315        .output()
316        .context("Failed to execute pg_dump")?;
317
318    spinner.finish_and_clear();
319
320    if !result.status.success() {
321        let stderr = String::from_utf8_lossy(&result.stderr);
322        bail!("pg_dump failed:\n{}", stderr);
323    }
324
325    let size = if output_path.is_dir() {
326        dir_size(&output_path)
327    } else {
328        std::fs::metadata(&output_path)
329            .map(|m| m.len())
330            .unwrap_or(0)
331    };
332
333    CliService::success(&format!(
334        "Backup complete: {} ({})",
335        output_path.display(),
336        format_size(size)
337    ));
338
339    Ok(())
340}
341
342fn find_pg_restore() -> Result<PathBuf> {
343    for version in [17, 16, 15, 14] {
344        let path = PathBuf::from(format!("/usr/lib/postgresql/{}/bin/pg_restore", version));
345        if path.exists() {
346            return Ok(path);
347        }
348    }
349    ensure_pg_tool("pg_restore")?;
350    Ok(PathBuf::from("pg_restore"))
351}
352
353fn execute_restore(
354    profile_name: &str,
355    database_url: &str,
356    file: &str,
357    skip_confirm: bool,
358    config: &CliConfig,
359) -> Result<()> {
360    let file_path = PathBuf::from(file);
361    if !file_path.exists() {
362        bail!("Backup file not found: {}", file);
363    }
364
365    let is_custom_or_dir = std::path::Path::new(file)
366        .extension()
367        .is_some_and(|ext| ext.eq_ignore_ascii_case("dump"))
368        || file_path.is_dir();
369
370    if is_custom_or_dir {
371        find_pg_restore()?;
372    } else {
373        ensure_pg_tool("psql")?;
374    }
375
376    CliService::section("Cloud Database Restore");
377    CliService::key_value("Profile", profile_name);
378    CliService::key_value("File", file);
379    CliService::warning("This will overwrite data in the cloud database!");
380
381    if !skip_confirm {
382        if !config.is_interactive() {
383            bail!("Restore requires -y flag in non-interactive mode");
384        }
385
386        let confirm = dialoguer::Confirm::with_theme(&dialoguer::theme::ColorfulTheme::default())
387            .with_prompt(format!(
388                "Restore backup to cloud database for profile '{}'?",
389                profile_name
390            ))
391            .default(false)
392            .interact()?;
393
394        if !confirm {
395            CliService::info("Cancelled");
396            return Ok(());
397        }
398    }
399
400    let adjusted_url = adjust_ssl_mode(database_url);
401    let spinner = CliService::spinner("Restoring database...");
402
403    let result = if is_custom_or_dir {
404        let pg_restore = find_pg_restore()?;
405        Command::new(pg_restore)
406            .arg("--no-owner")
407            .arg("--no-privileges")
408            .arg("--clean")
409            .arg("--if-exists")
410            .arg("-d")
411            .arg(&adjusted_url)
412            .arg(file)
413            .output()
414            .context("Failed to execute pg_restore")?
415    } else {
416        Command::new("psql")
417            .arg(&adjusted_url)
418            .arg("-f")
419            .arg(file)
420            .output()
421            .context("Failed to execute psql")?
422    };
423
424    spinner.finish_and_clear();
425
426    if result.status.success() {
427        CliService::success("Database restored successfully");
428    } else {
429        let stderr = String::from_utf8_lossy(&result.stderr);
430        if stderr.contains("ERROR") {
431            bail!("Restore failed:\n{}", stderr);
432        }
433        CliService::warning("Restore completed with warnings:");
434        CliService::info(&stderr.chars().take(500).collect::<String>());
435    }
436
437    Ok(())
438}
439
440fn dir_size(path: &PathBuf) -> u64 {
441    std::fs::read_dir(path)
442        .map(|entries| {
443            entries
444                .filter_map(std::result::Result::ok)
445                .filter_map(|e| e.metadata().ok())
446                .map(|m| m.len())
447                .sum()
448        })
449        .unwrap_or(0)
450}
451
452fn format_size(bytes: u64) -> String {
453    if bytes >= 1_073_741_824 {
454        format!("{:.1} GB", bytes as f64 / 1_073_741_824.0)
455    } else if bytes >= 1_048_576 {
456        format!("{:.1} MB", bytes as f64 / 1_048_576.0)
457    } else if bytes >= 1024 {
458        format!("{:.1} KB", bytes as f64 / 1024.0)
459    } else {
460        format!("{} B", bytes)
461    }
462}