Skip to main content

cmdhub_cli/
updater.rs

1use crate::config::{get_cache_dir, Config, OFFICIAL_PUBLIC_KEY};
2use crate::db::resolve_db_path;
3use anyhow::{Context, Result};
4use cmdhub_shared::{CmdHubError, IncrementalSyncPayload, UpdateManifest};
5use ed25519_dalek::{Signature, Verifier, VerifyingKey};
6use fs2::FileExt;
7use reqwest::Client;
8use sha2::{Digest, Sha256};
9use std::fs;
10
11pub async fn update_database(config: &Config, force: bool) -> Result<()> {
12    // Use connect + read (stall) timeouts rather than one total timeout: the database
13    // payload is hundreds of MB, so a total timeout aborts a healthy but slow download.
14    // A read timeout still aborts a genuinely stalled connection.
15    let to = std::time::Duration::from_secs(config.timeout_seconds);
16    let client = Client::builder()
17        .connect_timeout(to)
18        .read_timeout(to)
19        .build()?;
20
21    let mut last_sync_time = 0i64;
22    let live_db_path = resolve_db_path();
23    if !force && live_db_path.exists() {
24        if let Ok(conn) = rusqlite::Connection::open(&live_db_path) {
25            if let Ok(val) = conn.query_row::<String, _, _>(
26                "SELECT value FROM sync_meta WHERE key = 'last_sync_time' LIMIT 1",
27                [],
28                |row| row.get(0),
29            ) {
30                if let Ok(t) = val.parse::<i64>() {
31                    last_sync_time = t;
32                }
33            }
34        }
35    }
36
37    let update_url = format!(
38        "{}/db/update?last_sync_time={}",
39        config.api_url, last_sync_time
40    );
41
42    eprintln!("Checking for updates at {}...", update_url);
43
44    // Fetch manifest
45    let manifest_resp = client.get(&update_url).send().await;
46    let manifest: UpdateManifest = match manifest_resp {
47        Ok(resp) => {
48            if resp.status().is_success() {
49                resp.json()
50                    .await
51                    .context("Failed to parse UpdateManifest JSON")?
52            } else {
53                return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
54                    "Cloud returned status code: {}",
55                    resp.status()
56                ))));
57            }
58        }
59        Err(e) => {
60            return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
61                "Failed to fetch database update manifest: {}",
62                e
63            ))));
64        }
65    };
66
67    let mode = manifest.mode.clone().unwrap_or_else(|| "full".to_string());
68    if mode == "noop" {
69        eprintln!("Database is already up-to-date.");
70        return Ok(());
71    }
72
73    let cache_dir = get_cache_dir();
74    let downloads_dir = cache_dir.join("downloads");
75    fs::create_dir_all(&downloads_dir).context("Failed to create downloads cache directory")?;
76
77    let db_zst_path = downloads_dir.join("latest.db.zst");
78    let sig_path = downloads_dir.join("latest.db.sig");
79
80    eprintln!(
81        "Downloading database update (version: {})...",
82        manifest.version
83    );
84
85    // Download payload .zst
86    let db_resp = client
87        .get(&manifest.db_url)
88        .send()
89        .await
90        .context("Failed to download database file")?;
91    let db_bytes = db_resp
92        .bytes()
93        .await
94        .context("Failed to read database bytes")?;
95    fs::write(&db_zst_path, &db_bytes).context("Failed to write downloaded database payload")?;
96
97    // Download signature
98    let sig_resp = client
99        .get(&manifest.sig_url)
100        .send()
101        .await
102        .context("Failed to download database signature file")?;
103    let sig_bytes = sig_resp
104        .bytes()
105        .await
106        .context("Failed to read database signature bytes")?;
107    fs::write(&sig_path, &sig_bytes).context("Failed to write downloaded signature payload")?;
108
109    // 1. Calculate SHA-256 Hash of downloaded .zst
110    eprintln!("Verifying database integrity and signature...");
111    let mut hasher = Sha256::new();
112    hasher.update(&db_bytes);
113    let hash_result: [u8; 32] = hasher.finalize().into();
114
115    // Verify SHA-256 match with manifest
116    let computed_hex = hash_result
117        .iter()
118        .map(|b| format!("{:02x}", b))
119        .collect::<String>();
120    // Always validate integrity, even with --force. --force is about bypassing the
121    // last_sync_time noop check (re-download an already-current version), NOT about
122    // skipping integrity: a stale CDN edge cache can serve an old .zst whose sha256 no
123    // longer matches the fresh manifest, and skipping this check used to surface that as
124    // a confusing "Ed25519 verification failed" instead of a clear SHA-256 mismatch.
125    if computed_hex != manifest.sha256 {
126        return Err(anyhow::anyhow!(CmdHubError::Validation(format!(
127            "SHA-256 mismatch: computed {}, manifest {} (stale CDN cache? try again or purge)",
128            computed_hex, manifest.sha256
129        ))));
130    }
131
132    // 2. Decode official public key
133    let pub_key_bytes = match hex_decode(&config.public_key) {
134        Ok(bytes) => {
135            let mut arr = [0u8; 32];
136            if bytes.len() == 32 {
137                arr.copy_from_slice(&bytes);
138                arr
139            } else {
140                OFFICIAL_PUBLIC_KEY
141            }
142        }
143        Err(_) => OFFICIAL_PUBLIC_KEY,
144    };
145
146    let verifying_key = VerifyingKey::from_bytes(&pub_key_bytes).map_err(|e| {
147        anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
148            "Invalid public key: {}",
149            e
150        )))
151    })?;
152
153    let signature = Signature::from_slice(&sig_bytes).map_err(|e| {
154        anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
155            "Invalid signature format: {}",
156            e
157        )))
158    })?;
159
160    verifying_key
161        .verify(&hash_result, &signature)
162        .map_err(|e| {
163            anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
164                "Ed25519 signature verification failed: {}",
165                e
166            )))
167        })?;
168
169    // 3. Decompress .zst payload
170    eprintln!("Decompressing database...");
171    let decompressed =
172        zstd::decode_all(&db_bytes[..]).context("Failed to decompress zstd payload")?;
173
174    if mode == "incremental" {
175        eprintln!("Applying incremental database changes...");
176        let payload: IncrementalSyncPayload = serde_json::from_slice(&decompressed)
177            .context("Failed to parse IncrementalSyncPayload JSON")?;
178
179        let lock_path = cache_dir.join("update.lock");
180        let lock_file = fs::OpenOptions::new()
181            .read(true)
182            .write(true)
183            .create(true)
184            .truncate(true)
185            .open(&lock_path)
186            .context("Failed to open update.lock file")?;
187
188        lock_file
189            .lock_exclusive()
190            .context("Failed to acquire exclusive lock on update.lock")?;
191
192        let mut conn = rusqlite::Connection::open(&live_db_path)
193            .context("Failed to open live database for incremental update")?;
194        let _ = conn.execute("PRAGMA foreign_keys = ON;", []);
195
196        unsafe {
197            type SqliteVecInitFn = unsafe extern "C" fn();
198            let init_fn: SqliteVecInitFn = sqlite_vec::sqlite3_vec_init;
199            #[allow(clippy::missing_transmute_annotations)]
200            let _ = rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(init_fn)));
201        }
202
203        let tx = conn
204            .transaction()
205            .context("Failed to start SQLite transaction")?;
206
207        crate::db::init_db(&tx)?;
208
209        // Helper helper to delete commands and associated index entries for a given app_id
210        let delete_app_commands = |tx_ref: &rusqlite::Transaction,
211                                   target_app_id: &str|
212         -> Result<()> {
213            let mut stmt = tx_ref.prepare("SELECT cmd_path FROM arguments WHERE app_id = ?1")?;
214            let mut rows = stmt.query(rusqlite::params![target_app_id])?;
215            while let Some(row) = rows.next()? {
216                let cmd_path: String = row.get(0)?;
217                let _ = tx_ref.execute(
218                    "DELETE FROM apps_fts WHERE cmd_path = ?1",
219                    rusqlite::params![cmd_path],
220                );
221                let _ = tx_ref.execute(
222                    "DELETE FROM commands_vec WHERE cmd_path = ?1",
223                    rusqlite::params![cmd_path],
224                );
225            }
226            tx_ref.execute(
227                "DELETE FROM arguments WHERE app_id = ?1",
228                rusqlite::params![target_app_id],
229            )?;
230            Ok(())
231        };
232
233        // 1. Process deleted/archived apps
234        for app_id in payload.deleted_apps {
235            delete_app_commands(&tx, &app_id)?;
236            tx.execute(
237                "DELETE FROM apps WHERE app_id = ?1",
238                rusqlite::params![app_id],
239            )?;
240        }
241
242        // 2. Process updated/inserted apps
243        for app in payload.apps {
244            delete_app_commands(&tx, &app.app_id)?;
245            tx.execute(
246                "INSERT OR REPLACE INTO apps (app_id, name, install_instructions) VALUES (?1, ?2, ?3)",
247                rusqlite::params![app.app_id, app.name, app.install_instructions],
248            )?;
249        }
250
251        for arg in payload.arguments {
252            tx.execute(
253                "INSERT OR REPLACE INTO arguments (cmd_path, app_id, node_name, node_type, description, risk_level, example_template, docker_image, script_url, source_url) \
254                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
255                rusqlite::params![
256                    arg.cmd_path,
257                    arg.app_id,
258                    arg.node_name,
259                    arg.node_type,
260                    arg.description,
261                    arg.risk_level,
262                    arg.example_template,
263                    arg.docker_image,
264                    arg.script_url,
265                    arg.source_url
266                ],
267            )?;
268
269            let _ = tx.execute(
270                "DELETE FROM apps_fts WHERE cmd_path = ?1",
271                rusqlite::params![arg.cmd_path],
272            );
273
274            let app_name: String = tx
275                .query_row(
276                    "SELECT name FROM apps WHERE app_id = ?1",
277                    rusqlite::params![arg.app_id],
278                    |row| row.get(0),
279                )
280                .unwrap_or_else(|_| "unknown".to_string());
281
282            tx.execute(
283                "INSERT INTO apps_fts (cmd_path, name, capabilities) VALUES (?1, ?2, ?3)",
284                rusqlite::params![arg.cmd_path, app_name, arg.description],
285            )?;
286        }
287
288        for vec in payload.command_vecs {
289            // Support both int8_384 (new) and float32_512 (legacy) embedding formats.
290            let is_int8 = vec.embedding.len() == 384;
291            if is_int8 || vec.embedding.len() == 512 {
292                let vec_bytes: Vec<u8> = if is_int8 {
293                    vec.embedding
294                        .iter()
295                        .map(|&v| (v * 127.0).round().clamp(-128.0, 127.0) as i8 as u8)
296                        .collect()
297                } else {
298                    let mut b = Vec::with_capacity(512 * 4);
299                    for &val in &vec.embedding {
300                        b.extend_from_slice(&val.to_ne_bytes());
301                    }
302                    b
303                };
304                let _ = tx.execute(
305                    "DELETE FROM commands_vec WHERE cmd_path = ?1",
306                    rusqlite::params![vec.cmd_path],
307                );
308                let _ = tx.execute(
309                    "INSERT INTO commands_vec (cmd_path, embedding) VALUES (?1, ?2)",
310                    rusqlite::params![vec.cmd_path, vec_bytes],
311                );
312            }
313        }
314
315        let new_time = manifest
316            .new_sync_time
317            .unwrap_or_else(|| chrono::Utc::now().timestamp());
318        tx.execute(
319            "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
320            rusqlite::params![new_time.to_string()],
321        )?;
322
323        tx.commit()
324            .context("Failed to commit incremental SQLite transaction")?;
325        eprintln!(
326            "Database successfully incrementally updated (new sync time: {})!",
327            new_time
328        );
329    } else {
330        let tmp_dir = cache_dir.join("tmp");
331        fs::create_dir_all(&tmp_dir).context("Failed to create temporary staging directory")?;
332        let staging_path = tmp_dir.join("latest.db");
333        fs::write(&staging_path, &decompressed)
334            .context("Failed to write decompressed staging database")?;
335
336        eprintln!("Applying atomic database replacement...");
337        let lock_path = cache_dir.join("update.lock");
338        let lock_file = fs::OpenOptions::new()
339            .read(true)
340            .write(true)
341            .create(true)
342            .truncate(true)
343            .open(&lock_path)
344            .context("Failed to open update.lock file")?;
345
346        lock_file
347            .lock_exclusive()
348            .context("Failed to acquire exclusive lock on update.lock")?;
349
350        let live_db_path = resolve_db_path();
351        if let Some(parent) = live_db_path.parent() {
352            fs::create_dir_all(parent).context("Failed to create live database directory")?;
353        }
354
355        eprintln!("Safely applying database changes...");
356        let src_conn =
357            rusqlite::Connection::open(&staging_path).context("Failed to open staging database")?;
358        let mut dst_conn =
359            rusqlite::Connection::open(&live_db_path).context("Failed to open live database")?;
360
361        let _ = dst_conn.execute("PRAGMA journal_mode = WAL;", []);
362        let _ = dst_conn.execute("PRAGMA synchronous = NORMAL;", []);
363        let _ = dst_conn.execute("PRAGMA foreign_keys = ON;", []);
364
365        let backup = rusqlite::backup::Backup::new(&src_conn, &mut dst_conn)
366            .context("Failed to initialize SQLite backup")?;
367
368        backup
369            .run_to_completion(100, std::time::Duration::from_millis(10), None)
370            .context("SQLite backup to live database failed")?;
371
372        drop(backup);
373
374        let _ = fs::remove_file(&staging_path);
375
376        let new_time = manifest
377            .new_sync_time
378            .unwrap_or_else(|| chrono::Utc::now().timestamp());
379        let _ = dst_conn.execute(
380            "CREATE TABLE IF NOT EXISTS sync_meta (
381                key TEXT PRIMARY KEY,
382                value TEXT NOT NULL
383            );",
384            [],
385        );
386        let _ = dst_conn.execute(
387            "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
388            rusqlite::params![new_time.to_string()],
389        );
390
391        eprintln!(
392            "Database successfully updated to version {} (sync time: {})!",
393            manifest.version, new_time
394        );
395    }
396    Ok(())
397}
398
399fn hex_decode(s: &str) -> Result<Vec<u8>> {
400    let mut bytes = Vec::new();
401    let mut chars = s.chars().peekable();
402    while let Some(c1) = chars.next() {
403        if let Some(c2) = chars.next() {
404            let hex = format!("{}{}", c1, c2);
405            let b = u8::from_str_radix(&hex, 16)?;
406            bytes.push(b);
407        }
408    }
409    Ok(bytes)
410}