Skip to main content

romm_cli/commands/
sync.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::time::SystemTime;
6
7use anyhow::{anyhow, Context, Result};
8use clap::{Args, Subcommand, ValueEnum};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use time::format_description::well_known::Rfc3339;
12use time::{OffsetDateTime, UtcOffset};
13use zip::ZipArchive;
14
15use crate::client::{RommClient, SaveUploadOptions};
16use crate::commands::OutputFormat;
17use crate::endpoints::device::{
18    DeviceSchema, GetDevice, ListDevices, RegisterDevice, SyncMode as EndpointSyncMode,
19};
20use crate::endpoints::sync::{
21    CompleteSyncSession, GetSyncSession, ListSyncSessions, NegotiateSync, SyncNegotiateResponse,
22    TriggerPushPull,
23};
24use crate::feature_compat::{save_sync_compatibility, SAVE_SYNC_UNSUPPORTED_MESSAGE};
25use crate::openapi::EndpointRegistry;
26
27#[derive(Args, Debug)]
28pub struct SyncCommand {
29    /// Output as JSON (overrides global --json when set).
30    #[arg(long, global = true)]
31    pub json: bool,
32
33    #[command(subcommand)]
34    pub action: SyncAction,
35}
36
37#[derive(Subcommand, Debug)]
38pub enum SyncAction {
39    /// Manage sync devices.
40    Device(SyncDeviceCommand),
41    /// Negotiate sync operations without modifying files.
42    Plan(SyncPlanArgs),
43    /// Execute one-shot sync operations.
44    Run(SyncRunArgs),
45    /// Inspect sync sessions.
46    Sessions(SyncSessionsCommand),
47    /// Trigger push-pull mode on a registered device.
48    PushPull {
49        /// Device ID
50        device_id: String,
51    },
52}
53
54#[derive(Args, Debug)]
55pub struct SyncDeviceCommand {
56    #[command(subcommand)]
57    pub action: SyncDeviceAction,
58}
59
60#[derive(Subcommand, Debug)]
61pub enum SyncDeviceAction {
62    /// Register a device (`POST /api/devices`).
63    Register {
64        #[arg(long)]
65        name: Option<String>,
66        #[arg(long)]
67        platform: Option<String>,
68        #[arg(long, default_value = "romm-cli")]
69        client: String,
70        #[arg(long)]
71        client_version: Option<String>,
72        #[arg(long)]
73        hostname: Option<String>,
74        #[arg(long)]
75        mac_address: Option<String>,
76        #[arg(long)]
77        ip_address: Option<String>,
78        #[arg(long, value_enum, default_value_t = CliSyncMode::Api)]
79        sync_mode: CliSyncMode,
80        /// Optional JSON object string for `sync_config`.
81        #[arg(long)]
82        sync_config_json: Option<String>,
83        #[arg(long)]
84        allow_duplicate: bool,
85        #[arg(long)]
86        reset_syncs: bool,
87    },
88    /// List devices (`GET /api/devices`).
89    List,
90    /// Get one device (`GET /api/devices/{id}`).
91    Get {
92        /// Device ID
93        device_id: String,
94    },
95}
96
97#[derive(Args, Debug)]
98pub struct SyncPlanArgs {
99    #[arg(long)]
100    pub device_id: String,
101    #[arg(long)]
102    pub manifest: PathBuf,
103}
104
105#[derive(Args, Debug)]
106pub struct SyncRunArgs {
107    #[arg(long)]
108    pub device_id: String,
109    #[arg(long)]
110    pub manifest: PathBuf,
111    /// Folder for downloaded saves (defaults to the manifest directory).
112    #[arg(long)]
113    pub download_dir: Option<PathBuf>,
114    #[arg(long, value_enum, default_value_t = ConflictPolicy::Fail)]
115    pub conflict: ConflictPolicy,
116}
117
118#[derive(Args, Debug)]
119pub struct SyncSessionsCommand {
120    #[command(subcommand)]
121    pub action: SyncSessionsAction,
122}
123
124#[derive(Subcommand, Debug)]
125pub enum SyncSessionsAction {
126    /// List sessions (`GET /api/sync/sessions`).
127    List {
128        #[arg(long)]
129        device_id: Option<String>,
130        #[arg(long)]
131        limit: Option<u32>,
132    },
133    /// Get one session (`GET /api/sync/sessions/{id}`).
134    Get { session_id: u64 },
135}
136
137#[derive(Debug, Clone, Copy, ValueEnum)]
138pub enum CliSyncMode {
139    Api,
140    FileTransfer,
141    PushPull,
142}
143
144impl From<CliSyncMode> for EndpointSyncMode {
145    fn from(value: CliSyncMode) -> Self {
146        match value {
147            CliSyncMode::Api => EndpointSyncMode::Api,
148            CliSyncMode::FileTransfer => EndpointSyncMode::FileTransfer,
149            CliSyncMode::PushPull => EndpointSyncMode::PushPull,
150        }
151    }
152}
153
154#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
155pub enum ConflictPolicy {
156    Fail,
157    Skip,
158}
159
160#[derive(Debug, Clone, Deserialize)]
161struct SyncManifest {
162    saves: Vec<ManifestSave>,
163}
164
165#[derive(Debug, Clone, Deserialize)]
166struct ManifestSave {
167    rom_id: u64,
168    path: PathBuf,
169    file_name: Option<String>,
170    slot: Option<String>,
171    emulator: Option<String>,
172}
173
174#[derive(Debug, Clone, Serialize)]
175struct ClientSaveState {
176    rom_id: u64,
177    file_name: String,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    slot: Option<String>,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    emulator: Option<String>,
182    content_hash: String,
183    updated_at: String,
184    file_size_bytes: u64,
185}
186
187#[derive(Debug, Clone)]
188struct PreparedSave {
189    path: PathBuf,
190    client: ClientSaveState,
191}
192
193#[derive(Debug, Clone, Copy, Default)]
194struct RunCounts {
195    uploaded: u64,
196    downloaded: u64,
197    no_op: u64,
198    conflicts_skipped: u64,
199    completed: u64,
200    failed: u64,
201}
202
203pub async fn handle(cmd: SyncCommand, client: &RommClient, format: OutputFormat) -> Result<()> {
204    preflight_save_sync_compatibility(client, format).await?;
205
206    match cmd.action {
207        SyncAction::Device(device_cmd) => handle_device(device_cmd, client, format).await,
208        SyncAction::Plan(args) => handle_plan(args, client, format).await,
209        SyncAction::Run(args) => handle_run(args, client, format).await,
210        SyncAction::Sessions(cmd) => handle_sessions(cmd, client, format).await,
211        SyncAction::PushPull { device_id } => {
212            let out = client.call(&TriggerPushPull { device_id }).await?;
213            print_output(format, &out)
214        }
215    }
216}
217
218async fn preflight_save_sync_compatibility(
219    client: &RommClient,
220    format: OutputFormat,
221) -> Result<()> {
222    let openapi = match client.fetch_openapi_json().await {
223        Ok(body) => body,
224        Err(e) => {
225            tracing::warn!("Skipping save-sync compatibility preflight: {e:#}");
226            return Ok(());
227        }
228    };
229    let registry = match EndpointRegistry::from_openapi_json(&openapi) {
230        Ok(registry) => registry,
231        Err(e) => {
232            tracing::warn!(
233                "Skipping save-sync compatibility preflight; OpenAPI parse failed: {e:#}"
234            );
235            return Ok(());
236        }
237    };
238    let compat = save_sync_compatibility(&registry);
239    if compat.supported {
240        return Ok(());
241    }
242
243    if matches!(format, OutputFormat::Json) {
244        println!(
245            "{}",
246            serde_json::to_string_pretty(&json!({
247                "error": "save_sync_unsupported",
248                "message": SAVE_SYNC_UNSUPPORTED_MESSAGE,
249                "missing_endpoints": compat
250                    .missing
251                    .iter()
252                    .map(|ep| ep.label())
253                    .collect::<Vec<_>>()
254            }))?
255        );
256    }
257
258    anyhow::bail!("{}", compat.unsupported_message())
259}
260
261async fn handle_device(
262    cmd: SyncDeviceCommand,
263    client: &RommClient,
264    format: OutputFormat,
265) -> Result<()> {
266    match cmd.action {
267        SyncDeviceAction::Register {
268            name,
269            platform,
270            client: client_name,
271            client_version,
272            hostname,
273            mac_address,
274            ip_address,
275            sync_mode,
276            sync_config_json,
277            allow_duplicate,
278            reset_syncs,
279        } => {
280            let sync_config = match sync_config_json {
281                Some(raw) => Some(
282                    serde_json::from_str::<serde_json::Value>(&raw)
283                        .with_context(|| "invalid --sync-config-json (must be a JSON object)")?,
284                ),
285                None => None,
286            };
287            if let Some(v) = &sync_config {
288                if !v.is_object() {
289                    anyhow::bail!("--sync-config-json must decode to a JSON object");
290                }
291            }
292
293            let body = json!({
294                "name": name,
295                "platform": platform,
296                "client": client_name,
297                "client_version": client_version,
298                "hostname": hostname,
299                "mac_address": mac_address,
300                "ip_address": ip_address,
301                "sync_mode": EndpointSyncMode::from(sync_mode),
302                "sync_config": sync_config,
303                "allow_existing": true,
304                "allow_duplicate": allow_duplicate,
305                "reset_syncs": reset_syncs
306            });
307            let created = client.call(&RegisterDevice { body }).await?;
308            print_output(format, &created)
309        }
310        SyncDeviceAction::List => {
311            let rows: Vec<DeviceSchema> = client.call(&ListDevices).await?;
312            print_output(format, &rows)
313        }
314        SyncDeviceAction::Get { device_id } => {
315            let row = client.call(&GetDevice { device_id }).await?;
316            print_output(format, &row)
317        }
318    }
319}
320
321async fn handle_plan(args: SyncPlanArgs, client: &RommClient, format: OutputFormat) -> Result<()> {
322    let prepared = load_manifest_and_prepare(&args.manifest)?;
323    let negotiate = negotiate(client, &args.device_id, &prepared).await?;
324    print_output(format, &negotiate)
325}
326
327async fn handle_run(args: SyncRunArgs, client: &RommClient, format: OutputFormat) -> Result<()> {
328    let prepared = load_manifest_and_prepare(&args.manifest)?;
329    let prepared_by_key = prepared_by_key(&prepared)?;
330    let negotiate = negotiate(client, &args.device_id, &prepared).await?;
331
332    let download_base = match args.download_dir {
333        Some(dir) => dir,
334        None => args
335            .manifest
336            .parent()
337            .map(Path::to_path_buf)
338            .unwrap_or_else(|| PathBuf::from(".")),
339    };
340    std::fs::create_dir_all(&download_base).with_context(|| {
341        format!(
342            "failed to create download directory {}",
343            download_base.display()
344        )
345    })?;
346
347    let mut counts = RunCounts::default();
348    let mut hard_conflict = false;
349    for op in &negotiate.operations {
350        match op.action.as_str() {
351            "upload" => {
352                let key = (op.rom_id, op.file_name.clone());
353                let Some(local) = prepared_by_key.get(&key) else {
354                    counts.failed += 1;
355                    eprintln!(
356                        "Missing local manifest entry for upload operation rom_id={} file_name={}",
357                        op.rom_id, op.file_name
358                    );
359                    continue;
360                };
361                let options = SaveUploadOptions {
362                    emulator: local.client.emulator.as_deref(),
363                    slot: local.client.slot.as_deref(),
364                    device_id: Some(args.device_id.as_str()),
365                    session_id: Some(negotiate.session_id),
366                    overwrite: false,
367                };
368                match client
369                    .upload_save_file_with_options(local.client.rom_id, &local.path, &options)
370                    .await
371                {
372                    Ok(_) => {
373                        counts.uploaded += 1;
374                        counts.completed += 1;
375                    }
376                    Err(err) => {
377                        counts.failed += 1;
378                        eprintln!(
379                            "Upload failed for rom_id={} file_name={}: {:#}",
380                            local.client.rom_id, local.client.file_name, err
381                        );
382                    }
383                }
384            }
385            "download" => {
386                let Some(save_id) = op.save_id else {
387                    counts.failed += 1;
388                    eprintln!(
389                        "Download operation missing save_id for rom_id={} file_name={}",
390                        op.rom_id, op.file_name
391                    );
392                    continue;
393                };
394                match client
395                    .download_save_content(
396                        save_id,
397                        Some(args.device_id.as_str()),
398                        Some(negotiate.session_id),
399                    )
400                    .await
401                {
402                    Ok(bytes) => {
403                        let target = download_base.join(&op.file_name);
404                        if let Some(parent) = target.parent() {
405                            std::fs::create_dir_all(parent).with_context(|| {
406                                format!("failed to create parent folder {}", parent.display())
407                            })?;
408                        }
409                        let mut f = File::create(&target).with_context(|| {
410                            format!("failed to create download file {}", target.display())
411                        })?;
412                        f.write_all(&bytes).with_context(|| {
413                            format!("failed to write download file {}", target.display())
414                        })?;
415                        counts.downloaded += 1;
416                        counts.completed += 1;
417                    }
418                    Err(err) => {
419                        counts.failed += 1;
420                        eprintln!("Download failed for save_id={save_id}: {err:#}");
421                    }
422                }
423            }
424            "no_op" => {
425                counts.no_op += 1;
426            }
427            "conflict" => match args.conflict {
428                ConflictPolicy::Skip => {
429                    counts.conflicts_skipped += 1;
430                }
431                ConflictPolicy::Fail => {
432                    counts.failed += 1;
433                    hard_conflict = true;
434                    eprintln!(
435                        "Conflict for rom_id={} file_name={}: {}",
436                        op.rom_id, op.file_name, op.reason
437                    );
438                }
439            },
440            other => {
441                counts.failed += 1;
442                eprintln!(
443                    "Unknown sync operation '{}' for rom_id={} file_name={}",
444                    other, op.rom_id, op.file_name
445                );
446            }
447        }
448    }
449
450    let completion = client
451        .call(&CompleteSyncSession {
452            session_id: negotiate.session_id,
453            body: json!({
454                "operations_completed": counts.completed,
455                "operations_failed": counts.failed
456            }),
457        })
458        .await?;
459
460    if matches!(format, OutputFormat::Json) {
461        let out = json!({
462            "negotiate": negotiate,
463            "counts": {
464                "uploaded": counts.uploaded,
465                "downloaded": counts.downloaded,
466                "no_op": counts.no_op,
467                "conflicts_skipped": counts.conflicts_skipped,
468                "completed": counts.completed,
469                "failed": counts.failed
470            },
471            "completion": completion
472        });
473        println!("{}", serde_json::to_string_pretty(&out)?);
474    } else {
475        println!(
476            "session={} uploaded={} downloaded={} no_op={} conflicts_skipped={} completed={} failed={}",
477            completion.session.id,
478            counts.uploaded,
479            counts.downloaded,
480            counts.no_op,
481            counts.conflicts_skipped,
482            counts.completed,
483            counts.failed
484        );
485    }
486
487    if hard_conflict || counts.failed > 0 {
488        anyhow::bail!(
489            "sync completed with {} failed operation(s); session {} marked complete",
490            counts.failed,
491            completion.session.id
492        );
493    }
494
495    Ok(())
496}
497
498async fn handle_sessions(
499    cmd: SyncSessionsCommand,
500    client: &RommClient,
501    format: OutputFormat,
502) -> Result<()> {
503    match cmd.action {
504        SyncSessionsAction::List { device_id, limit } => {
505            let out = client.call(&ListSyncSessions { device_id, limit }).await?;
506            print_output(format, &out)
507        }
508        SyncSessionsAction::Get { session_id } => {
509            let out = client.call(&GetSyncSession { session_id }).await?;
510            print_output(format, &out)
511        }
512    }
513}
514
515async fn negotiate(
516    client: &RommClient,
517    device_id: &str,
518    prepared: &[PreparedSave],
519) -> Result<SyncNegotiateResponse> {
520    let saves: Vec<ClientSaveState> = prepared.iter().map(|p| p.client.clone()).collect();
521    client
522        .call(&NegotiateSync {
523            body: json!({
524                "device_id": device_id,
525                "saves": saves
526            }),
527        })
528        .await
529        .map_err(Into::into)
530}
531
532fn print_output<T: Serialize>(format: OutputFormat, value: &T) -> Result<()> {
533    match format {
534        OutputFormat::Json | OutputFormat::Text => {
535            println!("{}", serde_json::to_string_pretty(value)?);
536        }
537    }
538    Ok(())
539}
540
541fn prepared_by_key(prepared: &[PreparedSave]) -> Result<HashMap<(u64, String), PreparedSave>> {
542    let mut map = HashMap::new();
543    for item in prepared {
544        let key = (item.client.rom_id, item.client.file_name.clone());
545        if map.insert(key.clone(), item.clone()).is_some() {
546            anyhow::bail!(
547                "duplicate manifest mapping for rom_id={} file_name={}",
548                key.0,
549                key.1
550            );
551        }
552    }
553    Ok(map)
554}
555
556fn load_manifest_and_prepare(manifest_path: &Path) -> Result<Vec<PreparedSave>> {
557    let raw = std::fs::read_to_string(manifest_path)
558        .with_context(|| format!("read manifest {}", manifest_path.display()))?;
559    let manifest: SyncManifest = serde_json::from_str(&raw)
560        .with_context(|| format!("parse manifest {}", manifest_path.display()))?;
561    let base_dir = manifest_path
562        .parent()
563        .map(Path::to_path_buf)
564        .unwrap_or_else(|| PathBuf::from("."));
565
566    let mut out = Vec::new();
567    for row in manifest.saves {
568        let path = if row.path.is_absolute() {
569            row.path.clone()
570        } else {
571            base_dir.join(&row.path)
572        };
573        if !path.is_file() {
574            anyhow::bail!("manifest save path is not a file: {}", path.display());
575        }
576        let file_name = match row.file_name {
577            Some(name) if !name.trim().is_empty() => name.trim().to_string(),
578            _ => path
579                .file_name()
580                .and_then(|n| n.to_str())
581                .ok_or_else(|| {
582                    anyhow!("save path must have a unicode filename: {}", path.display())
583                })?
584                .to_string(),
585        };
586        let meta = std::fs::metadata(&path)
587            .with_context(|| format!("read metadata for {}", path.display()))?;
588        let updated_at = format_system_time_utc_rfc3339(
589            meta.modified()
590                .with_context(|| format!("read modified timestamp for {}", path.display()))?,
591        )?;
592        let content_hash = compute_content_hash(&path)?;
593        out.push(PreparedSave {
594            path,
595            client: ClientSaveState {
596                rom_id: row.rom_id,
597                file_name,
598                slot: row.slot.filter(|s| !s.trim().is_empty()),
599                emulator: row.emulator.filter(|s| !s.trim().is_empty()),
600                content_hash,
601                updated_at,
602                file_size_bytes: meta.len(),
603            },
604        });
605    }
606
607    Ok(out)
608}
609
610fn format_system_time_utc_rfc3339(t: SystemTime) -> Result<String> {
611    let odt: OffsetDateTime = t.into();
612    let utc = odt.to_offset(UtcOffset::UTC);
613    utc.format(&Rfc3339)
614        .map_err(|e| anyhow!("format timestamp as RFC3339: {e}"))
615}
616
617fn compute_content_hash(path: &Path) -> Result<String> {
618    if let Ok(file) = File::open(path) {
619        if ZipArchive::new(file).is_ok() {
620            return compute_zip_hash(path);
621        }
622    }
623    compute_file_hash(path)
624}
625
626fn compute_file_hash(path: &Path) -> Result<String> {
627    let mut file =
628        File::open(path).with_context(|| format!("open file for hashing {}", path.display()))?;
629    let mut ctx = md5::Context::new();
630    let mut buf = [0u8; 8192];
631    loop {
632        let n = file
633            .read(&mut buf)
634            .with_context(|| format!("read file for hashing {}", path.display()))?;
635        if n == 0 {
636            break;
637        }
638        ctx.consume(&buf[..n]);
639    }
640    Ok(format!("{:x}", ctx.finalize()))
641}
642
643fn compute_zip_hash(path: &Path) -> Result<String> {
644    let file = File::open(path)
645        .with_context(|| format!("open zip file for hashing {}", path.display()))?;
646    let mut archive = ZipArchive::new(file)
647        .with_context(|| format!("read zip archive for hashing {}", path.display()))?;
648    let mut row_hashes = BTreeMap::new();
649    for i in 0..archive.len() {
650        let mut entry = archive
651            .by_index(i)
652            .with_context(|| format!("read zip entry {} in {}", i, path.display()))?;
653        if entry.is_dir() {
654            continue;
655        }
656        let name = entry.name().to_string();
657        let mut ctx = md5::Context::new();
658        let mut buf = [0u8; 8192];
659        loop {
660            let n = entry
661                .read(&mut buf)
662                .with_context(|| format!("hash zip entry {} in {}", name, path.display()))?;
663            if n == 0 {
664                break;
665            }
666            ctx.consume(&buf[..n]);
667        }
668        row_hashes.insert(name, format!("{:x}", ctx.finalize()));
669    }
670    let combined = row_hashes
671        .into_iter()
672        .map(|(name, hash)| format!("{name}:{hash}"))
673        .collect::<Vec<_>>()
674        .join("\n");
675    Ok(format!("{:x}", md5::compute(combined.as_bytes())))
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use std::fs;
682    use std::io::Write;
683
684    use zip::write::SimpleFileOptions;
685    use zip::ZipWriter;
686
687    fn temp_path(name: &str) -> PathBuf {
688        let nanos = SystemTime::now()
689            .duration_since(SystemTime::UNIX_EPOCH)
690            .expect("unix epoch")
691            .as_nanos();
692        std::env::temp_dir().join(format!("romm-cli-sync-test-{nanos}-{name}"))
693    }
694
695    #[test]
696    fn file_hash_matches_md5_hex() {
697        let path = temp_path("plain.sav");
698        fs::write(&path, b"abc123").expect("write");
699        let got = compute_file_hash(&path).expect("hash");
700        let expected = format!("{:x}", md5::compute(b"abc123"));
701        assert_eq!(got, expected);
702        let _ = fs::remove_file(path);
703    }
704
705    #[test]
706    fn zip_hash_matches_sorted_entry_scheme() {
707        let path = temp_path("archive.zip");
708        {
709            let f = File::create(&path).expect("create");
710            let mut writer = ZipWriter::new(f);
711            writer
712                .start_file("b.sav", SimpleFileOptions::default())
713                .expect("start file");
714            writer.write_all(b"bbb").expect("write b");
715            writer
716                .start_file("a.sav", SimpleFileOptions::default())
717                .expect("start file");
718            writer.write_all(b"aaa").expect("write a");
719            writer.finish().expect("finish");
720        }
721
722        let hash = compute_zip_hash(&path).expect("hash zip");
723        let a = format!("{:x}", md5::compute(b"aaa"));
724        let b = format!("{:x}", md5::compute(b"bbb"));
725        let combined = format!("a.sav:{a}\nb.sav:{b}");
726        let expected = format!("{:x}", md5::compute(combined.as_bytes()));
727        assert_eq!(hash, expected);
728        let _ = fs::remove_file(path);
729    }
730
731    #[test]
732    fn duplicate_manifest_keys_fail() {
733        let p = temp_path("dup.sav");
734        fs::write(&p, b"x").expect("write");
735        let prepared = vec![
736            PreparedSave {
737                path: p.clone(),
738                client: ClientSaveState {
739                    rom_id: 1,
740                    file_name: "same.sav".into(),
741                    slot: None,
742                    emulator: None,
743                    content_hash: "h1".into(),
744                    updated_at: "2026-01-01T00:00:00Z".into(),
745                    file_size_bytes: 1,
746                },
747            },
748            PreparedSave {
749                path: p.clone(),
750                client: ClientSaveState {
751                    rom_id: 1,
752                    file_name: "same.sav".into(),
753                    slot: None,
754                    emulator: None,
755                    content_hash: "h2".into(),
756                    updated_at: "2026-01-01T00:00:01Z".into(),
757                    file_size_bytes: 1,
758                },
759            },
760        ];
761        let err = prepared_by_key(&prepared).expect_err("duplicate should fail");
762        assert!(err.to_string().contains("duplicate manifest mapping"));
763        let _ = fs::remove_file(p);
764    }
765}