1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::time::SystemTime;
6
7use anyhow::{anyhow, Context, Result};
8use clap::{Args, Subcommand, ValueEnum};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use time::format_description::well_known::Rfc3339;
12use time::{OffsetDateTime, UtcOffset};
13use zip::ZipArchive;
14
15use crate::client::{RommClient, SaveUploadOptions};
16use crate::commands::OutputFormat;
17use crate::endpoints::device::{
18 DeviceSchema, GetDevice, ListDevices, RegisterDevice, SyncMode as EndpointSyncMode,
19};
20use crate::endpoints::sync::{
21 CompleteSyncSession, GetSyncSession, ListSyncSessions, NegotiateSync, SyncNegotiateResponse,
22 TriggerPushPull,
23};
24use crate::feature_compat::{save_sync_compatibility, SAVE_SYNC_UNSUPPORTED_MESSAGE};
25use crate::openapi::EndpointRegistry;
26
27#[derive(Args, Debug)]
28pub struct SyncCommand {
29 #[arg(long, global = true)]
31 pub json: bool,
32
33 #[command(subcommand)]
34 pub action: SyncAction,
35}
36
37#[derive(Subcommand, Debug)]
38pub enum SyncAction {
39 Device(SyncDeviceCommand),
41 Plan(SyncPlanArgs),
43 Run(SyncRunArgs),
45 Sessions(SyncSessionsCommand),
47 PushPull {
49 device_id: String,
51 },
52}
53
54#[derive(Args, Debug)]
55pub struct SyncDeviceCommand {
56 #[command(subcommand)]
57 pub action: SyncDeviceAction,
58}
59
60#[derive(Subcommand, Debug)]
61pub enum SyncDeviceAction {
62 Register {
64 #[arg(long)]
65 name: Option<String>,
66 #[arg(long)]
67 platform: Option<String>,
68 #[arg(long, default_value = "romm-cli")]
69 client: String,
70 #[arg(long)]
71 client_version: Option<String>,
72 #[arg(long)]
73 hostname: Option<String>,
74 #[arg(long)]
75 mac_address: Option<String>,
76 #[arg(long)]
77 ip_address: Option<String>,
78 #[arg(long, value_enum, default_value_t = CliSyncMode::Api)]
79 sync_mode: CliSyncMode,
80 #[arg(long)]
82 sync_config_json: Option<String>,
83 #[arg(long)]
84 allow_duplicate: bool,
85 #[arg(long)]
86 reset_syncs: bool,
87 },
88 List,
90 Get {
92 device_id: String,
94 },
95}
96
97#[derive(Args, Debug)]
98pub struct SyncPlanArgs {
99 #[arg(long)]
100 pub device_id: String,
101 #[arg(long)]
102 pub manifest: PathBuf,
103}
104
105#[derive(Args, Debug)]
106pub struct SyncRunArgs {
107 #[arg(long)]
108 pub device_id: String,
109 #[arg(long)]
110 pub manifest: PathBuf,
111 #[arg(long)]
113 pub download_dir: Option<PathBuf>,
114 #[arg(long, value_enum, default_value_t = ConflictPolicy::Fail)]
115 pub conflict: ConflictPolicy,
116}
117
118#[derive(Args, Debug)]
119pub struct SyncSessionsCommand {
120 #[command(subcommand)]
121 pub action: SyncSessionsAction,
122}
123
124#[derive(Subcommand, Debug)]
125pub enum SyncSessionsAction {
126 List {
128 #[arg(long)]
129 device_id: Option<String>,
130 #[arg(long)]
131 limit: Option<u32>,
132 },
133 Get { session_id: u64 },
135}
136
137#[derive(Debug, Clone, Copy, ValueEnum)]
138pub enum CliSyncMode {
139 Api,
140 FileTransfer,
141 PushPull,
142}
143
144impl From<CliSyncMode> for EndpointSyncMode {
145 fn from(value: CliSyncMode) -> Self {
146 match value {
147 CliSyncMode::Api => EndpointSyncMode::Api,
148 CliSyncMode::FileTransfer => EndpointSyncMode::FileTransfer,
149 CliSyncMode::PushPull => EndpointSyncMode::PushPull,
150 }
151 }
152}
153
154#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
155pub enum ConflictPolicy {
156 Fail,
157 Skip,
158}
159
160#[derive(Debug, Clone, Deserialize)]
161struct SyncManifest {
162 saves: Vec<ManifestSave>,
163}
164
165#[derive(Debug, Clone, Deserialize)]
166struct ManifestSave {
167 rom_id: u64,
168 path: PathBuf,
169 file_name: Option<String>,
170 slot: Option<String>,
171 emulator: Option<String>,
172}
173
174#[derive(Debug, Clone, Serialize)]
175struct ClientSaveState {
176 rom_id: u64,
177 file_name: String,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 slot: Option<String>,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 emulator: Option<String>,
182 content_hash: String,
183 updated_at: String,
184 file_size_bytes: u64,
185}
186
187#[derive(Debug, Clone)]
188struct PreparedSave {
189 path: PathBuf,
190 client: ClientSaveState,
191}
192
193#[derive(Debug, Clone, Copy, Default)]
194struct RunCounts {
195 uploaded: u64,
196 downloaded: u64,
197 no_op: u64,
198 conflicts_skipped: u64,
199 completed: u64,
200 failed: u64,
201}
202
203pub async fn handle(cmd: SyncCommand, client: &RommClient, format: OutputFormat) -> Result<()> {
204 preflight_save_sync_compatibility(client, format).await?;
205
206 match cmd.action {
207 SyncAction::Device(device_cmd) => handle_device(device_cmd, client, format).await,
208 SyncAction::Plan(args) => handle_plan(args, client, format).await,
209 SyncAction::Run(args) => handle_run(args, client, format).await,
210 SyncAction::Sessions(cmd) => handle_sessions(cmd, client, format).await,
211 SyncAction::PushPull { device_id } => {
212 let out = client.call(&TriggerPushPull { device_id }).await?;
213 print_output(format, &out)
214 }
215 }
216}
217
218async fn preflight_save_sync_compatibility(
219 client: &RommClient,
220 format: OutputFormat,
221) -> Result<()> {
222 let openapi = match client.fetch_openapi_json().await {
223 Ok(body) => body,
224 Err(e) => {
225 tracing::warn!("Skipping save-sync compatibility preflight: {e:#}");
226 return Ok(());
227 }
228 };
229 let registry = match EndpointRegistry::from_openapi_json(&openapi) {
230 Ok(registry) => registry,
231 Err(e) => {
232 tracing::warn!(
233 "Skipping save-sync compatibility preflight; OpenAPI parse failed: {e:#}"
234 );
235 return Ok(());
236 }
237 };
238 let compat = save_sync_compatibility(®istry);
239 if compat.supported {
240 return Ok(());
241 }
242
243 if matches!(format, OutputFormat::Json) {
244 println!(
245 "{}",
246 serde_json::to_string_pretty(&json!({
247 "error": "save_sync_unsupported",
248 "message": SAVE_SYNC_UNSUPPORTED_MESSAGE,
249 "missing_endpoints": compat
250 .missing
251 .iter()
252 .map(|ep| ep.label())
253 .collect::<Vec<_>>()
254 }))?
255 );
256 }
257
258 anyhow::bail!("{}", compat.unsupported_message())
259}
260
261async fn handle_device(
262 cmd: SyncDeviceCommand,
263 client: &RommClient,
264 format: OutputFormat,
265) -> Result<()> {
266 match cmd.action {
267 SyncDeviceAction::Register {
268 name,
269 platform,
270 client: client_name,
271 client_version,
272 hostname,
273 mac_address,
274 ip_address,
275 sync_mode,
276 sync_config_json,
277 allow_duplicate,
278 reset_syncs,
279 } => {
280 let sync_config = match sync_config_json {
281 Some(raw) => Some(
282 serde_json::from_str::<serde_json::Value>(&raw)
283 .with_context(|| "invalid --sync-config-json (must be a JSON object)")?,
284 ),
285 None => None,
286 };
287 if let Some(v) = &sync_config {
288 if !v.is_object() {
289 anyhow::bail!("--sync-config-json must decode to a JSON object");
290 }
291 }
292
293 let body = json!({
294 "name": name,
295 "platform": platform,
296 "client": client_name,
297 "client_version": client_version,
298 "hostname": hostname,
299 "mac_address": mac_address,
300 "ip_address": ip_address,
301 "sync_mode": EndpointSyncMode::from(sync_mode),
302 "sync_config": sync_config,
303 "allow_existing": true,
304 "allow_duplicate": allow_duplicate,
305 "reset_syncs": reset_syncs
306 });
307 let created = client.call(&RegisterDevice { body }).await?;
308 print_output(format, &created)
309 }
310 SyncDeviceAction::List => {
311 let rows: Vec<DeviceSchema> = client.call(&ListDevices).await?;
312 print_output(format, &rows)
313 }
314 SyncDeviceAction::Get { device_id } => {
315 let row = client.call(&GetDevice { device_id }).await?;
316 print_output(format, &row)
317 }
318 }
319}
320
321async fn handle_plan(args: SyncPlanArgs, client: &RommClient, format: OutputFormat) -> Result<()> {
322 let prepared = load_manifest_and_prepare(&args.manifest)?;
323 let negotiate = negotiate(client, &args.device_id, &prepared).await?;
324 print_output(format, &negotiate)
325}
326
327async fn handle_run(args: SyncRunArgs, client: &RommClient, format: OutputFormat) -> Result<()> {
328 let prepared = load_manifest_and_prepare(&args.manifest)?;
329 let prepared_by_key = prepared_by_key(&prepared)?;
330 let negotiate = negotiate(client, &args.device_id, &prepared).await?;
331
332 let download_base = match args.download_dir {
333 Some(dir) => dir,
334 None => args
335 .manifest
336 .parent()
337 .map(Path::to_path_buf)
338 .unwrap_or_else(|| PathBuf::from(".")),
339 };
340 std::fs::create_dir_all(&download_base).with_context(|| {
341 format!(
342 "failed to create download directory {}",
343 download_base.display()
344 )
345 })?;
346
347 let mut counts = RunCounts::default();
348 let mut hard_conflict = false;
349 for op in &negotiate.operations {
350 match op.action.as_str() {
351 "upload" => {
352 let key = (op.rom_id, op.file_name.clone());
353 let Some(local) = prepared_by_key.get(&key) else {
354 counts.failed += 1;
355 eprintln!(
356 "Missing local manifest entry for upload operation rom_id={} file_name={}",
357 op.rom_id, op.file_name
358 );
359 continue;
360 };
361 let options = SaveUploadOptions {
362 emulator: local.client.emulator.as_deref(),
363 slot: local.client.slot.as_deref(),
364 device_id: Some(args.device_id.as_str()),
365 session_id: Some(negotiate.session_id),
366 overwrite: false,
367 };
368 match client
369 .upload_save_file_with_options(local.client.rom_id, &local.path, &options)
370 .await
371 {
372 Ok(_) => {
373 counts.uploaded += 1;
374 counts.completed += 1;
375 }
376 Err(err) => {
377 counts.failed += 1;
378 eprintln!(
379 "Upload failed for rom_id={} file_name={}: {:#}",
380 local.client.rom_id, local.client.file_name, err
381 );
382 }
383 }
384 }
385 "download" => {
386 let Some(save_id) = op.save_id else {
387 counts.failed += 1;
388 eprintln!(
389 "Download operation missing save_id for rom_id={} file_name={}",
390 op.rom_id, op.file_name
391 );
392 continue;
393 };
394 match client
395 .download_save_content(
396 save_id,
397 Some(args.device_id.as_str()),
398 Some(negotiate.session_id),
399 )
400 .await
401 {
402 Ok(bytes) => {
403 let target = download_base.join(&op.file_name);
404 if let Some(parent) = target.parent() {
405 std::fs::create_dir_all(parent).with_context(|| {
406 format!("failed to create parent folder {}", parent.display())
407 })?;
408 }
409 let mut f = File::create(&target).with_context(|| {
410 format!("failed to create download file {}", target.display())
411 })?;
412 f.write_all(&bytes).with_context(|| {
413 format!("failed to write download file {}", target.display())
414 })?;
415 counts.downloaded += 1;
416 counts.completed += 1;
417 }
418 Err(err) => {
419 counts.failed += 1;
420 eprintln!("Download failed for save_id={save_id}: {err:#}");
421 }
422 }
423 }
424 "no_op" => {
425 counts.no_op += 1;
426 }
427 "conflict" => match args.conflict {
428 ConflictPolicy::Skip => {
429 counts.conflicts_skipped += 1;
430 }
431 ConflictPolicy::Fail => {
432 counts.failed += 1;
433 hard_conflict = true;
434 eprintln!(
435 "Conflict for rom_id={} file_name={}: {}",
436 op.rom_id, op.file_name, op.reason
437 );
438 }
439 },
440 other => {
441 counts.failed += 1;
442 eprintln!(
443 "Unknown sync operation '{}' for rom_id={} file_name={}",
444 other, op.rom_id, op.file_name
445 );
446 }
447 }
448 }
449
450 let completion = client
451 .call(&CompleteSyncSession {
452 session_id: negotiate.session_id,
453 body: json!({
454 "operations_completed": counts.completed,
455 "operations_failed": counts.failed
456 }),
457 })
458 .await?;
459
460 if matches!(format, OutputFormat::Json) {
461 let out = json!({
462 "negotiate": negotiate,
463 "counts": {
464 "uploaded": counts.uploaded,
465 "downloaded": counts.downloaded,
466 "no_op": counts.no_op,
467 "conflicts_skipped": counts.conflicts_skipped,
468 "completed": counts.completed,
469 "failed": counts.failed
470 },
471 "completion": completion
472 });
473 println!("{}", serde_json::to_string_pretty(&out)?);
474 } else {
475 println!(
476 "session={} uploaded={} downloaded={} no_op={} conflicts_skipped={} completed={} failed={}",
477 completion.session.id,
478 counts.uploaded,
479 counts.downloaded,
480 counts.no_op,
481 counts.conflicts_skipped,
482 counts.completed,
483 counts.failed
484 );
485 }
486
487 if hard_conflict || counts.failed > 0 {
488 anyhow::bail!(
489 "sync completed with {} failed operation(s); session {} marked complete",
490 counts.failed,
491 completion.session.id
492 );
493 }
494
495 Ok(())
496}
497
498async fn handle_sessions(
499 cmd: SyncSessionsCommand,
500 client: &RommClient,
501 format: OutputFormat,
502) -> Result<()> {
503 match cmd.action {
504 SyncSessionsAction::List { device_id, limit } => {
505 let out = client.call(&ListSyncSessions { device_id, limit }).await?;
506 print_output(format, &out)
507 }
508 SyncSessionsAction::Get { session_id } => {
509 let out = client.call(&GetSyncSession { session_id }).await?;
510 print_output(format, &out)
511 }
512 }
513}
514
515async fn negotiate(
516 client: &RommClient,
517 device_id: &str,
518 prepared: &[PreparedSave],
519) -> Result<SyncNegotiateResponse> {
520 let saves: Vec<ClientSaveState> = prepared.iter().map(|p| p.client.clone()).collect();
521 client
522 .call(&NegotiateSync {
523 body: json!({
524 "device_id": device_id,
525 "saves": saves
526 }),
527 })
528 .await
529}
530
531fn print_output<T: Serialize>(format: OutputFormat, value: &T) -> Result<()> {
532 match format {
533 OutputFormat::Json | OutputFormat::Text => {
534 println!("{}", serde_json::to_string_pretty(value)?);
535 }
536 }
537 Ok(())
538}
539
540fn prepared_by_key(prepared: &[PreparedSave]) -> Result<HashMap<(u64, String), PreparedSave>> {
541 let mut map = HashMap::new();
542 for item in prepared {
543 let key = (item.client.rom_id, item.client.file_name.clone());
544 if map.insert(key.clone(), item.clone()).is_some() {
545 anyhow::bail!(
546 "duplicate manifest mapping for rom_id={} file_name={}",
547 key.0,
548 key.1
549 );
550 }
551 }
552 Ok(map)
553}
554
555fn load_manifest_and_prepare(manifest_path: &Path) -> Result<Vec<PreparedSave>> {
556 let raw = std::fs::read_to_string(manifest_path)
557 .with_context(|| format!("read manifest {}", manifest_path.display()))?;
558 let manifest: SyncManifest = serde_json::from_str(&raw)
559 .with_context(|| format!("parse manifest {}", manifest_path.display()))?;
560 let base_dir = manifest_path
561 .parent()
562 .map(Path::to_path_buf)
563 .unwrap_or_else(|| PathBuf::from("."));
564
565 let mut out = Vec::new();
566 for row in manifest.saves {
567 let path = if row.path.is_absolute() {
568 row.path.clone()
569 } else {
570 base_dir.join(&row.path)
571 };
572 if !path.is_file() {
573 anyhow::bail!("manifest save path is not a file: {}", path.display());
574 }
575 let file_name = match row.file_name {
576 Some(name) if !name.trim().is_empty() => name.trim().to_string(),
577 _ => path
578 .file_name()
579 .and_then(|n| n.to_str())
580 .ok_or_else(|| {
581 anyhow!("save path must have a unicode filename: {}", path.display())
582 })?
583 .to_string(),
584 };
585 let meta = std::fs::metadata(&path)
586 .with_context(|| format!("read metadata for {}", path.display()))?;
587 let updated_at = format_system_time_utc_rfc3339(
588 meta.modified()
589 .with_context(|| format!("read modified timestamp for {}", path.display()))?,
590 )?;
591 let content_hash = compute_content_hash(&path)?;
592 out.push(PreparedSave {
593 path,
594 client: ClientSaveState {
595 rom_id: row.rom_id,
596 file_name,
597 slot: row.slot.filter(|s| !s.trim().is_empty()),
598 emulator: row.emulator.filter(|s| !s.trim().is_empty()),
599 content_hash,
600 updated_at,
601 file_size_bytes: meta.len(),
602 },
603 });
604 }
605
606 Ok(out)
607}
608
609fn format_system_time_utc_rfc3339(t: SystemTime) -> Result<String> {
610 let odt: OffsetDateTime = t.into();
611 let utc = odt.to_offset(UtcOffset::UTC);
612 utc.format(&Rfc3339)
613 .map_err(|e| anyhow!("format timestamp as RFC3339: {e}"))
614}
615
616fn compute_content_hash(path: &Path) -> Result<String> {
617 if let Ok(file) = File::open(path) {
618 if ZipArchive::new(file).is_ok() {
619 return compute_zip_hash(path);
620 }
621 }
622 compute_file_hash(path)
623}
624
625fn compute_file_hash(path: &Path) -> Result<String> {
626 let mut file =
627 File::open(path).with_context(|| format!("open file for hashing {}", path.display()))?;
628 let mut ctx = md5::Context::new();
629 let mut buf = [0u8; 8192];
630 loop {
631 let n = file
632 .read(&mut buf)
633 .with_context(|| format!("read file for hashing {}", path.display()))?;
634 if n == 0 {
635 break;
636 }
637 ctx.consume(&buf[..n]);
638 }
639 Ok(format!("{:x}", ctx.finalize()))
640}
641
642fn compute_zip_hash(path: &Path) -> Result<String> {
643 let file = File::open(path)
644 .with_context(|| format!("open zip file for hashing {}", path.display()))?;
645 let mut archive = ZipArchive::new(file)
646 .with_context(|| format!("read zip archive for hashing {}", path.display()))?;
647 let mut row_hashes = BTreeMap::new();
648 for i in 0..archive.len() {
649 let mut entry = archive
650 .by_index(i)
651 .with_context(|| format!("read zip entry {} in {}", i, path.display()))?;
652 if entry.is_dir() {
653 continue;
654 }
655 let name = entry.name().to_string();
656 let mut ctx = md5::Context::new();
657 let mut buf = [0u8; 8192];
658 loop {
659 let n = entry
660 .read(&mut buf)
661 .with_context(|| format!("hash zip entry {} in {}", name, path.display()))?;
662 if n == 0 {
663 break;
664 }
665 ctx.consume(&buf[..n]);
666 }
667 row_hashes.insert(name, format!("{:x}", ctx.finalize()));
668 }
669 let combined = row_hashes
670 .into_iter()
671 .map(|(name, hash)| format!("{name}:{hash}"))
672 .collect::<Vec<_>>()
673 .join("\n");
674 Ok(format!("{:x}", md5::compute(combined.as_bytes())))
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use std::fs;
681 use std::io::Write;
682
683 use zip::write::SimpleFileOptions;
684 use zip::ZipWriter;
685
686 fn temp_path(name: &str) -> PathBuf {
687 let nanos = SystemTime::now()
688 .duration_since(SystemTime::UNIX_EPOCH)
689 .expect("unix epoch")
690 .as_nanos();
691 std::env::temp_dir().join(format!("romm-cli-sync-test-{nanos}-{name}"))
692 }
693
694 #[test]
695 fn file_hash_matches_md5_hex() {
696 let path = temp_path("plain.sav");
697 fs::write(&path, b"abc123").expect("write");
698 let got = compute_file_hash(&path).expect("hash");
699 let expected = format!("{:x}", md5::compute(b"abc123"));
700 assert_eq!(got, expected);
701 let _ = fs::remove_file(path);
702 }
703
704 #[test]
705 fn zip_hash_matches_sorted_entry_scheme() {
706 let path = temp_path("archive.zip");
707 {
708 let f = File::create(&path).expect("create");
709 let mut writer = ZipWriter::new(f);
710 writer
711 .start_file("b.sav", SimpleFileOptions::default())
712 .expect("start file");
713 writer.write_all(b"bbb").expect("write b");
714 writer
715 .start_file("a.sav", SimpleFileOptions::default())
716 .expect("start file");
717 writer.write_all(b"aaa").expect("write a");
718 writer.finish().expect("finish");
719 }
720
721 let hash = compute_zip_hash(&path).expect("hash zip");
722 let a = format!("{:x}", md5::compute(b"aaa"));
723 let b = format!("{:x}", md5::compute(b"bbb"));
724 let combined = format!("a.sav:{a}\nb.sav:{b}");
725 let expected = format!("{:x}", md5::compute(combined.as_bytes()));
726 assert_eq!(hash, expected);
727 let _ = fs::remove_file(path);
728 }
729
730 #[test]
731 fn duplicate_manifest_keys_fail() {
732 let p = temp_path("dup.sav");
733 fs::write(&p, b"x").expect("write");
734 let prepared = vec![
735 PreparedSave {
736 path: p.clone(),
737 client: ClientSaveState {
738 rom_id: 1,
739 file_name: "same.sav".into(),
740 slot: None,
741 emulator: None,
742 content_hash: "h1".into(),
743 updated_at: "2026-01-01T00:00:00Z".into(),
744 file_size_bytes: 1,
745 },
746 },
747 PreparedSave {
748 path: p.clone(),
749 client: ClientSaveState {
750 rom_id: 1,
751 file_name: "same.sav".into(),
752 slot: None,
753 emulator: None,
754 content_hash: "h2".into(),
755 updated_at: "2026-01-01T00:00:01Z".into(),
756 file_size_bytes: 1,
757 },
758 },
759 ];
760 let err = prepared_by_key(&prepared).expect_err("duplicate should fail");
761 assert!(err.to_string().contains("duplicate manifest mapping"));
762 let _ = fs::remove_file(p);
763 }
764}