1use crate::config::{get_cache_dir, Config, OFFICIAL_PUBLIC_KEY};
2use crate::db::resolve_db_path;
3use anyhow::{Context, Result};
4use cmdhub_shared::{CmdHubError, IncrementalSyncPayload, UpdateManifest};
5use ed25519_dalek::{Signature, Verifier, VerifyingKey};
6use fs2::FileExt;
7use reqwest::Client;
8use sha2::{Digest, Sha256};
9use std::fs;
10
11pub async fn update_database(config: &Config, force: bool) -> Result<()> {
12 let to = std::time::Duration::from_secs(config.timeout_seconds);
16 let client = Client::builder()
17 .connect_timeout(to)
18 .read_timeout(to)
19 .build()?;
20
21 let mut last_sync_time = 0i64;
22 let live_db_path = resolve_db_path();
23 if !force && live_db_path.exists() {
24 if let Ok(conn) = rusqlite::Connection::open(&live_db_path) {
25 if let Ok(val) = conn.query_row::<String, _, _>(
26 "SELECT value FROM sync_meta WHERE key = 'last_sync_time' LIMIT 1",
27 [],
28 |row| row.get(0),
29 ) {
30 if let Ok(t) = val.parse::<i64>() {
31 last_sync_time = t;
32 }
33 }
34 }
35 }
36
37 let update_url = format!(
38 "{}/db/update?last_sync_time={}",
39 config.api_url, last_sync_time
40 );
41
42 eprintln!("Checking for updates at {}...", update_url);
43
44 let manifest_resp = client.get(&update_url).send().await;
46 let manifest: UpdateManifest = match manifest_resp {
47 Ok(resp) => {
48 if resp.status().is_success() {
49 resp.json()
50 .await
51 .context("Failed to parse UpdateManifest JSON")?
52 } else {
53 return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
54 "Cloud returned status code: {}",
55 resp.status()
56 ))));
57 }
58 }
59 Err(e) => {
60 return Err(anyhow::anyhow!(CmdHubError::UpdateFailed(format!(
61 "Failed to fetch database update manifest: {}",
62 e
63 ))));
64 }
65 };
66
67 let mode = manifest.mode.clone().unwrap_or_else(|| "full".to_string());
68 if mode == "noop" {
69 eprintln!("Database is already up-to-date.");
70 return Ok(());
71 }
72
73 let cache_dir = get_cache_dir();
74 let downloads_dir = cache_dir.join("downloads");
75 fs::create_dir_all(&downloads_dir).context("Failed to create downloads cache directory")?;
76
77 let db_zst_path = downloads_dir.join("latest.db.zst");
78 let sig_path = downloads_dir.join("latest.db.sig");
79
80 eprintln!(
81 "Downloading database update (version: {})...",
82 manifest.version
83 );
84
85 let db_resp = client
87 .get(&manifest.db_url)
88 .send()
89 .await
90 .context("Failed to download database file")?;
91 let db_bytes = db_resp
92 .bytes()
93 .await
94 .context("Failed to read database bytes")?;
95 fs::write(&db_zst_path, &db_bytes).context("Failed to write downloaded database payload")?;
96
97 let sig_resp = client
99 .get(&manifest.sig_url)
100 .send()
101 .await
102 .context("Failed to download database signature file")?;
103 let sig_bytes = sig_resp
104 .bytes()
105 .await
106 .context("Failed to read database signature bytes")?;
107 fs::write(&sig_path, &sig_bytes).context("Failed to write downloaded signature payload")?;
108
109 eprintln!("Verifying database integrity and signature...");
111 let mut hasher = Sha256::new();
112 hasher.update(&db_bytes);
113 let hash_result: [u8; 32] = hasher.finalize().into();
114
115 let computed_hex = hash_result
117 .iter()
118 .map(|b| format!("{:02x}", b))
119 .collect::<String>();
120 if computed_hex != manifest.sha256 {
126 return Err(anyhow::anyhow!(CmdHubError::Validation(format!(
127 "SHA-256 mismatch: computed {}, manifest {} (stale CDN cache? try again or purge)",
128 computed_hex, manifest.sha256
129 ))));
130 }
131
132 let pub_key_bytes = match hex_decode(&config.public_key) {
134 Ok(bytes) => {
135 let mut arr = [0u8; 32];
136 if bytes.len() == 32 {
137 arr.copy_from_slice(&bytes);
138 arr
139 } else {
140 OFFICIAL_PUBLIC_KEY
141 }
142 }
143 Err(_) => OFFICIAL_PUBLIC_KEY,
144 };
145
146 let verifying_key = VerifyingKey::from_bytes(&pub_key_bytes).map_err(|e| {
147 anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
148 "Invalid public key: {}",
149 e
150 )))
151 })?;
152
153 let signature = Signature::from_slice(&sig_bytes).map_err(|e| {
154 anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
155 "Invalid signature format: {}",
156 e
157 )))
158 })?;
159
160 verifying_key
161 .verify(&hash_result, &signature)
162 .map_err(|e| {
163 anyhow::anyhow!(CmdHubError::SignatureVerification(format!(
164 "Ed25519 signature verification failed: {}",
165 e
166 )))
167 })?;
168
169 eprintln!("Decompressing database...");
171 let decompressed =
172 zstd::decode_all(&db_bytes[..]).context("Failed to decompress zstd payload")?;
173
174 if mode == "incremental" {
175 eprintln!("Applying incremental database changes...");
176 let payload: IncrementalSyncPayload = serde_json::from_slice(&decompressed)
177 .context("Failed to parse IncrementalSyncPayload JSON")?;
178
179 let lock_path = cache_dir.join("update.lock");
180 let lock_file = fs::OpenOptions::new()
181 .read(true)
182 .write(true)
183 .create(true)
184 .truncate(true)
185 .open(&lock_path)
186 .context("Failed to open update.lock file")?;
187
188 lock_file
189 .lock_exclusive()
190 .context("Failed to acquire exclusive lock on update.lock")?;
191
192 let mut conn = rusqlite::Connection::open(&live_db_path)
193 .context("Failed to open live database for incremental update")?;
194 let _ = conn.execute("PRAGMA foreign_keys = ON;", []);
195
196 unsafe {
197 type SqliteVecInitFn = unsafe extern "C" fn();
198 let init_fn: SqliteVecInitFn = sqlite_vec::sqlite3_vec_init;
199 #[allow(clippy::missing_transmute_annotations)]
200 let _ = rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(init_fn)));
201 }
202
203 let tx = conn
204 .transaction()
205 .context("Failed to start SQLite transaction")?;
206
207 crate::db::init_db(&tx)?;
208
209 let delete_app_commands = |tx_ref: &rusqlite::Transaction,
211 target_app_id: &str|
212 -> Result<()> {
213 let mut stmt = tx_ref.prepare("SELECT cmd_path FROM arguments WHERE app_id = ?1")?;
214 let mut rows = stmt.query(rusqlite::params![target_app_id])?;
215 while let Some(row) = rows.next()? {
216 let cmd_path: String = row.get(0)?;
217 let _ = tx_ref.execute(
218 "DELETE FROM apps_fts WHERE cmd_path = ?1",
219 rusqlite::params![cmd_path],
220 );
221 let _ = tx_ref.execute(
222 "DELETE FROM commands_vec WHERE cmd_path = ?1",
223 rusqlite::params![cmd_path],
224 );
225 }
226 tx_ref.execute(
227 "DELETE FROM arguments WHERE app_id = ?1",
228 rusqlite::params![target_app_id],
229 )?;
230 Ok(())
231 };
232
233 for app_id in payload.deleted_apps {
235 delete_app_commands(&tx, &app_id)?;
236 tx.execute(
237 "DELETE FROM apps WHERE app_id = ?1",
238 rusqlite::params![app_id],
239 )?;
240 }
241
242 for app in payload.apps {
244 delete_app_commands(&tx, &app.app_id)?;
245 tx.execute(
246 "INSERT OR REPLACE INTO apps (app_id, name, install_instructions) VALUES (?1, ?2, ?3)",
247 rusqlite::params![app.app_id, app.name, app.install_instructions],
248 )?;
249 }
250
251 for arg in payload.arguments {
252 tx.execute(
253 "INSERT OR REPLACE INTO arguments (cmd_path, app_id, node_name, node_type, description, risk_level, example_template, docker_image, script_url, source_url) \
254 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
255 rusqlite::params![
256 arg.cmd_path,
257 arg.app_id,
258 arg.node_name,
259 arg.node_type,
260 arg.description,
261 arg.risk_level,
262 arg.example_template,
263 arg.docker_image,
264 arg.script_url,
265 arg.source_url
266 ],
267 )?;
268
269 let _ = tx.execute(
270 "DELETE FROM apps_fts WHERE cmd_path = ?1",
271 rusqlite::params![arg.cmd_path],
272 );
273
274 let app_name: String = tx
275 .query_row(
276 "SELECT name FROM apps WHERE app_id = ?1",
277 rusqlite::params![arg.app_id],
278 |row| row.get(0),
279 )
280 .unwrap_or_else(|_| "unknown".to_string());
281
282 tx.execute(
283 "INSERT INTO apps_fts (cmd_path, name, capabilities) VALUES (?1, ?2, ?3)",
284 rusqlite::params![arg.cmd_path, app_name, arg.description],
285 )?;
286 }
287
288 for vec in payload.command_vecs {
289 let is_int8 = vec.embedding.len() == 384;
291 if is_int8 || vec.embedding.len() == 512 {
292 let vec_bytes: Vec<u8> = if is_int8 {
293 vec.embedding
294 .iter()
295 .map(|&v| (v * 127.0).round().clamp(-128.0, 127.0) as i8 as u8)
296 .collect()
297 } else {
298 let mut b = Vec::with_capacity(512 * 4);
299 for &val in &vec.embedding {
300 b.extend_from_slice(&val.to_ne_bytes());
301 }
302 b
303 };
304 let _ = tx.execute(
305 "DELETE FROM commands_vec WHERE cmd_path = ?1",
306 rusqlite::params![vec.cmd_path],
307 );
308 let _ = tx.execute(
309 "INSERT INTO commands_vec (cmd_path, embedding) VALUES (?1, ?2)",
310 rusqlite::params![vec.cmd_path, vec_bytes],
311 );
312 }
313 }
314
315 let new_time = manifest
316 .new_sync_time
317 .unwrap_or_else(|| chrono::Utc::now().timestamp());
318 tx.execute(
319 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
320 rusqlite::params![new_time.to_string()],
321 )?;
322
323 tx.commit()
324 .context("Failed to commit incremental SQLite transaction")?;
325 eprintln!(
326 "Database successfully incrementally updated (new sync time: {})!",
327 new_time
328 );
329 } else {
330 let tmp_dir = cache_dir.join("tmp");
331 fs::create_dir_all(&tmp_dir).context("Failed to create temporary staging directory")?;
332 let staging_path = tmp_dir.join("latest.db");
333 fs::write(&staging_path, &decompressed)
334 .context("Failed to write decompressed staging database")?;
335
336 eprintln!("Applying atomic database replacement...");
337 let lock_path = cache_dir.join("update.lock");
338 let lock_file = fs::OpenOptions::new()
339 .read(true)
340 .write(true)
341 .create(true)
342 .truncate(true)
343 .open(&lock_path)
344 .context("Failed to open update.lock file")?;
345
346 lock_file
347 .lock_exclusive()
348 .context("Failed to acquire exclusive lock on update.lock")?;
349
350 let live_db_path = resolve_db_path();
351 if let Some(parent) = live_db_path.parent() {
352 fs::create_dir_all(parent).context("Failed to create live database directory")?;
353 }
354
355 eprintln!("Safely applying database changes...");
356 let src_conn =
357 rusqlite::Connection::open(&staging_path).context("Failed to open staging database")?;
358 let mut dst_conn =
359 rusqlite::Connection::open(&live_db_path).context("Failed to open live database")?;
360
361 let _ = dst_conn.execute("PRAGMA journal_mode = WAL;", []);
362 let _ = dst_conn.execute("PRAGMA synchronous = NORMAL;", []);
363 let _ = dst_conn.execute("PRAGMA foreign_keys = ON;", []);
364
365 let backup = rusqlite::backup::Backup::new(&src_conn, &mut dst_conn)
366 .context("Failed to initialize SQLite backup")?;
367
368 backup
369 .run_to_completion(100, std::time::Duration::from_millis(10), None)
370 .context("SQLite backup to live database failed")?;
371
372 drop(backup);
373
374 let _ = fs::remove_file(&staging_path);
375
376 let new_time = manifest
377 .new_sync_time
378 .unwrap_or_else(|| chrono::Utc::now().timestamp());
379 let _ = dst_conn.execute(
380 "CREATE TABLE IF NOT EXISTS sync_meta (
381 key TEXT PRIMARY KEY,
382 value TEXT NOT NULL
383 );",
384 [],
385 );
386 let _ = dst_conn.execute(
387 "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_time', ?1)",
388 rusqlite::params![new_time.to_string()],
389 );
390
391 eprintln!(
392 "Database successfully updated to version {} (sync time: {})!",
393 manifest.version, new_time
394 );
395 }
396 Ok(())
397}
398
399fn hex_decode(s: &str) -> Result<Vec<u8>> {
400 let mut bytes = Vec::new();
401 let mut chars = s.chars().peekable();
402 while let Some(c1) = chars.next() {
403 if let Some(c2) = chars.next() {
404 let hex = format!("{}{}", c1, c2);
405 let b = u8::from_str_radix(&hex, 16)?;
406 bytes.push(b);
407 }
408 }
409 Ok(bytes)
410}