cmdhub-cli 0.1.2

cmdh — the CmdHub CLI client for offline command search and execution
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
use crate::config::{get_cache_dir, Config, OFFICIAL_PUBLIC_KEY};
use crate::db::resolve_db_path;
use anyhow::{Context, Result};
use cmdhub_shared::{CmdHubError, IncrementalSyncPayload, UpdateManifest};
use ed25519_dalek::{Signature, Verifier, VerifyingKey};
use fs2::FileExt;
use reqwest::Client;
use sha2::{Digest, Sha256};
use std::fs;

pub async fn update_database(config: &Config, force: bool) -> Result<()> {
    // Use connect + read (stall) timeouts rather than one total timeout: the database
    // payload is hundreds of MB, so a total timeout aborts a healthy but slow download.
    // A read timeout still aborts a genuinely stalled connection.
    let to = std::time::Duration::from_secs(config.timeout_seconds);
    let client = Client::builder()
        .connect_timeout(to)
        .read_timeout(to)
        .build()?;

    let mut last_sync_time = 0i64;
    let live_db_path = resolve_db_path();
    if !force && live_db_path.exists() {
        if let Ok(conn) = rusqlite::Connection::open(&live_db_path) {
            if let Ok(val) = conn.query_row::<String, _, _>(
                "SELECT value FROM sync_meta WHERE key = 'last_sync_time' LIMIT 1",
                [],
                |row| row.get(0),
            ) {
                if let Ok(t) = val.parse::<i64>() {
                    last_sync_time = t;
                }
            }
        }
    }

    let update_url = format!(
        "{}/db/update?last_sync_time={}",
        config.api_url, last_sync_time
    );

    eprintln!("Checking for updates at {}...", update_url);

    // Fetch manifest
    let manifest_resp = client.get(&update_url).send().await;
    let manifest: UpdateManifest = match manifest_resp {
        Ok(resp) => {
            if resp.status().is_success() {
                resp.json()
                    .await
                    .context("Failed to parse UpdateManifest JSON")?
            } else {
                return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
                    "Cloud returned status code: {}",
                    resp.status()
                ))));
            }
        }
        Err(e) => {
            return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
                "Failed to fetch database update manifest: {}",
                e
            ))));
        }
    };

    let mode = manifest.mode.clone().unwrap_or_else(|| "full".to_string());
    if mode == "noop" {
        eprintln!("Database is already up-to-date.");
        return Ok(());
    }

    let cache_dir = get_cache_dir();
    let downloads_dir = cache_dir.join("downloads");
    fs::create_dir_all(&downloads_dir).context("Failed to create downloads cache directory")?;

    let db_zst_path = downloads_dir.join("latest.db.zst");
    let sig_path = downloads_dir.join("latest.db.sig");

    eprintln!(
        "Downloading database update (version: {})...",
        manifest.version
    );

    // Download payload .zst
    let db_resp = client
        .get(&manifest.db_url)
        .send()
        .await
        .context("Failed to download database file")?;
    let db_bytes = db_resp
        .bytes()
        .await
        .context("Failed to read database bytes")?;
    fs::write(&db_zst_path, &db_bytes).context("Failed to write downloaded database payload")?;

    // Download signature
    let sig_resp = client
        .get(&manifest.sig_url)
        .send()
        .await
        .context("Failed to download database signature file")?;
    let sig_bytes = sig_resp
        .bytes()
        .await
        .context("Failed to read database signature bytes")?;
    fs::write(&sig_path, &sig_bytes).context("Failed to write downloaded signature payload")?;

    // 1. Calculate SHA-256 Hash of downloaded .zst
    eprintln!("Verifying database integrity and signature...");
    let mut hasher = Sha256::new();
    hasher.update(&db_bytes);
    let hash_result: [u8; 32] = hasher.finalize().into();

    // Verify SHA-256 match with manifest
    let computed_hex = hash_result
        .iter()
        .map(|b| format!("{:02x}", b))
        .collect::<String>();
    // Always validate integrity, even with --force. --force is about bypassing the
    // last_sync_time noop check (re-download an already-current version), NOT about
    // skipping integrity: a stale CDN edge cache can serve an old .zst whose sha256 no
    // longer matches the fresh manifest, and skipping this check used to surface that as
    // a confusing "Ed25519 verification failed" instead of a clear SHA-256 mismatch.
    if computed_hex != manifest.sha256 {
        return Err(anyhow::anyhow!(CmdHubError::Validation(format!(
            "SHA-256 mismatch: computed {}, manifest {} (stale CDN cache? try again or purge)",
            computed_hex, manifest.sha256
        ))));
    }

    // 2. Decode official public key
    let pub_key_bytes = match hex_decode(&config.public_key) {
        Ok(bytes) => {
            let mut arr = [0u8; 32];
            if bytes.len() == 32 {
                arr.copy_from_slice(&bytes);
                arr
            } else {
                OFFICIAL_PUBLIC_KEY
            }
        }
        Err(_) => OFFICIAL_PUBLIC_KEY,
    };

    let verifying_key = VerifyingKey::from_bytes(&pub_key_bytes).map_err(|e| {
        anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
            "Invalid public key: {}",
            e
        )))
    })?;

    let signature = Signature::from_slice(&sig_bytes).map_err(|e| {
        anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
            "Invalid signature format: {}",
            e
        )))
    })?;

    verifying_key
        .verify(&hash_result, &signature)
        .map_err(|e| {
            anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
                "Ed25519 signature verification failed: {}",
                e
            )))
        })?;

    // 3. Decompress .zst payload
    eprintln!("Decompressing database...");
    let decompressed =
        zstd::decode_all(&db_bytes[..]).context("Failed to decompress zstd payload")?;

    if mode == "incremental" {
        eprintln!("Applying incremental database changes...");
        let payload: IncrementalSyncPayload = serde_json::from_slice(&decompressed)
            .context("Failed to parse IncrementalSyncPayload JSON")?;

        let lock_path = cache_dir.join("update.lock");
        let lock_file = fs::OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(true)
            .open(&lock_path)
            .context("Failed to open update.lock file")?;

        lock_file
            .lock_exclusive()
            .context("Failed to acquire exclusive lock on update.lock")?;

        let mut conn = rusqlite::Connection::open(&live_db_path)
            .context("Failed to open live database for incremental update")?;
        let _ = conn.execute("PRAGMA foreign_keys = ON;", []);

        unsafe {
            type SqliteVecInitFn = unsafe extern "C" fn();
            let init_fn: SqliteVecInitFn = sqlite_vec::sqlite3_vec_init;
            #[allow(clippy::missing_transmute_annotations)]
            let _ = rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(init_fn)));
        }

        let tx = conn
            .transaction()
            .context("Failed to start SQLite transaction")?;

        crate::db::init_db(&tx)?;

        // Helper helper to delete commands and associated index entries for a given app_id
        let delete_app_commands = |tx_ref: &rusqlite::Transaction,
                                   target_app_id: &str|
         -> Result<()> {
            let mut stmt = tx_ref.prepare("SELECT cmd_path FROM arguments WHERE app_id = ?1")?;
            let mut rows = stmt.query(rusqlite::params![target_app_id])?;
            while let Some(row) = rows.next()? {
                let cmd_path: String = row.get(0)?;
                let _ = tx_ref.execute(
                    "DELETE FROM apps_fts WHERE cmd_path = ?1",
                    rusqlite::params![cmd_path],
                );
                let _ = tx_ref.execute(
                    "DELETE FROM commands_vec WHERE cmd_path = ?1",
                    rusqlite::params![cmd_path],
                );
            }
            tx_ref.execute(
                "DELETE FROM arguments WHERE app_id = ?1",
                rusqlite::params![target_app_id],
            )?;
            Ok(())
        };

        // 1. Process deleted/archived apps
        for app_id in payload.deleted_apps {
            delete_app_commands(&tx, &app_id)?;
            tx.execute(
                "DELETE FROM apps WHERE app_id = ?1",
                rusqlite::params![app_id],
            )?;
        }

        // 2. Process updated/inserted apps
        for app in payload.apps {
            delete_app_commands(&tx, &app.app_id)?;
            tx.execute(
                "INSERT OR REPLACE INTO apps (app_id, name, install_instructions) VALUES (?1, ?2, ?3)",
                rusqlite::params![app.app_id, app.name, app.install_instructions],
            )?;
        }

        for arg in payload.arguments {
            tx.execute(
                "INSERT OR REPLACE INTO arguments (cmd_path, app_id, node_name, node_type, description, risk_level, example_template, docker_image, script_url, source_url) \
                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
                rusqlite::params![
                    arg.cmd_path,
                    arg.app_id,
                    arg.node_name,
                    arg.node_type,
                    arg.description,
                    arg.risk_level,
                    arg.example_template,
                    arg.docker_image,
                    arg.script_url,
                    arg.source_url
                ],
            )?;

            let _ = tx.execute(
                "DELETE FROM apps_fts WHERE cmd_path = ?1",
                rusqlite::params![arg.cmd_path],
            );

            let app_name: String = tx
                .query_row(
                    "SELECT name FROM apps WHERE app_id = ?1",
                    rusqlite::params![arg.app_id],
                    |row| row.get(0),
                )
                .unwrap_or_else(|_| "unknown".to_string());

            tx.execute(
                "INSERT INTO apps_fts (cmd_path, name, capabilities) VALUES (?1, ?2, ?3)",
                rusqlite::params![arg.cmd_path, app_name, arg.description],
            )?;
        }

        for vec in payload.command_vecs {
            // Support both int8_384 (new) and float32_512 (legacy) embedding formats.
            let is_int8 = vec.embedding.len() == 384;
            if is_int8 || vec.embedding.len() == 512 {
                let vec_bytes: Vec<u8> = if is_int8 {
                    vec.embedding
                        .iter()
                        .map(|&v| (v * 127.0).round().clamp(-128.0, 127.0) as i8 as u8)
                        .collect()
                } else {
                    let mut b = Vec::with_capacity(512 * 4);
                    for &val in &vec.embedding {
                        b.extend_from_slice(&val.to_ne_bytes());
                    }
                    b
                };
                let _ = tx.execute(
                    "DELETE FROM commands_vec WHERE cmd_path = ?1",
                    rusqlite::params![vec.cmd_path],
                );
                let _ = tx.execute(
                    "INSERT INTO commands_vec (cmd_path, embedding) VALUES (?1, ?2)",
                    rusqlite::params![vec.cmd_path, vec_bytes],
                );
            }
        }

        let new_time = manifest
            .new_sync_time
            .unwrap_or_else(|| chrono::Utc::now().timestamp());
        tx.execute(
            "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
            rusqlite::params![new_time.to_string()],
        )?;

        tx.commit()
            .context("Failed to commit incremental SQLite transaction")?;
        eprintln!(
            "Database successfully incrementally updated (new sync time: {})!",
            new_time
        );
    } else {
        let tmp_dir = cache_dir.join("tmp");
        fs::create_dir_all(&tmp_dir).context("Failed to create temporary staging directory")?;
        let staging_path = tmp_dir.join("latest.db");
        fs::write(&staging_path, &decompressed)
            .context("Failed to write decompressed staging database")?;

        eprintln!("Applying atomic database replacement...");
        let lock_path = cache_dir.join("update.lock");
        let lock_file = fs::OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(true)
            .open(&lock_path)
            .context("Failed to open update.lock file")?;

        lock_file
            .lock_exclusive()
            .context("Failed to acquire exclusive lock on update.lock")?;

        let live_db_path = resolve_db_path();
        if let Some(parent) = live_db_path.parent() {
            fs::create_dir_all(parent).context("Failed to create live database directory")?;
        }

        eprintln!("Safely applying database changes...");
        let src_conn =
            rusqlite::Connection::open(&staging_path).context("Failed to open staging database")?;
        let mut dst_conn =
            rusqlite::Connection::open(&live_db_path).context("Failed to open live database")?;

        let _ = dst_conn.execute("PRAGMA journal_mode = WAL;", []);
        let _ = dst_conn.execute("PRAGMA synchronous = NORMAL;", []);
        let _ = dst_conn.execute("PRAGMA foreign_keys = ON;", []);

        let backup = rusqlite::backup::Backup::new(&src_conn, &mut dst_conn)
            .context("Failed to initialize SQLite backup")?;

        backup
            .run_to_completion(100, std::time::Duration::from_millis(10), None)
            .context("SQLite backup to live database failed")?;

        drop(backup);

        let _ = fs::remove_file(&staging_path);

        let new_time = manifest
            .new_sync_time
            .unwrap_or_else(|| chrono::Utc::now().timestamp());
        let _ = dst_conn.execute(
            "CREATE TABLE IF NOT EXISTS sync_meta (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL
            );",
            [],
        );
        let _ = dst_conn.execute(
            "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
            rusqlite::params![new_time.to_string()],
        );

        eprintln!(
            "Database successfully updated to version {} (sync time: {})!",
            manifest.version, new_time
        );
    }
    Ok(())
}

fn hex_decode(s: &str) -> Result<Vec<u8>> {
    let mut bytes = Vec::new();
    let mut chars = s.chars().peekable();
    while let Some(c1) = chars.next() {
        if let Some(c2) = chars.next() {
            let hex = format!("{}{}", c1, c2);
            let b = u8::from_str_radix(&hex, 16)?;
            bytes.push(b);
        }
    }
    Ok(bytes)
}