pxs 0.6.3

pxs (Parallel X-Sync) - Integrity-first Rust sync/clone for large mutable datasets.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
use crate::built_info;
use clap::{
    Arg, ArgAction, ColorChoice, Command,
    builder::{
        ValueParser,
        styling::{AnsiColor, Effects, Styles},
    },
};
use std::path::PathBuf;

const LONG_ABOUT: &str = "pxs is an integrity-first sync and clone tool for \
    large mutable datasets. It keeps destination trees accurate across local \
    paths, SSH, and raw TCP, and it is designed to outperform rsync in target \
    workloads such as PostgreSQL PGDATA, VM images, and repeated large-data \
    refreshes. It is not a drop-in replacement for rsync.\n\n\
    EXAMPLES:\n\n\
    1. Local Sync:\n\
       pxs sync /var/lib/postgresql/data /srv/restore/pgdata\n\n\
    2. SSH Sync:\n\
       pxs sync user@db2:/srv/export/pgdata /srv/restore/pgdata\n\n\
    3. Raw TCP Receiver Setup:\n\
       pxs listen 0.0.0.0:8080 /srv\n\n\
    4. Raw TCP Sync:\n\
       pxs sync /var/lib/postgresql/data 192.168.1.10:8080/incoming/pgdata\n\n\
    5. Verify And Durably Commit:\n\
       pxs sync file.bin backup.bin --checksum --fsync\n";
const ABOUT: &str =
    "pxs (Parallel X-Sync) - Integrity-first sync/clone for large mutable datasets.";
const THRESHOLD_LONG_HELP: &str = "Value between 0.1 and 1.0. If the destination file size is less than this fraction of the source, pxs rewrites the file instead of attempting block reuse.";
const CHECKSUM_LONG_HELP: &str = "By default, pxs skips files if size and modification time match. Use this to force a block-by-block hash comparison. In network mode, pxs also performs end-to-end BLAKE3 verification after the transfer completes.";
const FSYNC_LONG_HELP: &str =
    "Ensures that file data and metadata are flushed to disk before finishing. Slower but safer.";
const LARGE_FILE_PARALLEL_THRESHOLD_LONG_HELP: &str = "For outbound network sync, files at or above this size use chunk-parallel worker transfers. Smaller files can share the main control session when network file concurrency is above 1. Accepts raw bytes or binary suffixes such as KiB, MiB, GiB, and TiB. Use 0 to disable large-file worker mode.";
const LARGE_FILE_PARALLEL_WORKERS_LONG_HELP: &str = "Number of parallel worker connections or sessions for eligible large outbound network transfers. If omitted, pxs chooses a conservative default from available CPU cores.";
const NETWORK_FILE_CONCURRENCY_LONG_HELP: &str = "Maximum number of small outbound network file transfers to keep in flight on the main control session. If omitted, pxs chooses a conservative default from available CPU cores. Higher values can saturate bandwidth-bound links, so reduce this on slower or congested networks. Files at or above --large-file-parallel-threshold are transferred separately using chunk workers.";

/// Create a path validator that requires the path to exist.
pub fn validator_path_exists() -> ValueParser {
    ValueParser::from(move |s: &str| -> std::result::Result<PathBuf, String> {
        let path = PathBuf::from(s);
        if path.exists() {
            return Ok(path);
        }

        Err(format!("Path does not exist: '{s}'"))
    })
}

/// Create a path validator that requires the destination or its parent to exist.
pub fn validator_parent_exist() -> ValueParser {
    ValueParser::from(move |s: &str| -> std::result::Result<PathBuf, String> {
        let path = PathBuf::from(s);
        if path.exists() && path.is_dir() {
            return Ok(path);
        }

        let parent = if let Some(p) = path.parent() {
            if p.as_os_str().is_empty() {
                PathBuf::from(".")
            } else {
                p.to_path_buf()
            }
        } else {
            PathBuf::from(".")
        };

        if parent.exists() && parent.is_dir() {
            return Ok(path);
        }

        Err(format!(
            "Invalid destination path or parent directory does not exist: '{s}'"
        ))
    })
}

fn cli_styles() -> Styles {
    Styles::styled()
        .header(AnsiColor::Yellow.on_default() | Effects::BOLD)
        .usage(AnsiColor::Green.on_default() | Effects::BOLD)
        .literal(AnsiColor::Blue.on_default() | Effects::BOLD)
        .placeholder(AnsiColor::Green.on_default())
}

fn threshold_parser() -> ValueParser {
    ValueParser::from(|s: &str| {
        let val: f32 = s.parse().map_err(|_| String::from("Invalid float"))?;
        if (0.1..=1.0).contains(&val) {
            Ok(val)
        } else {
            Err(String::from("Threshold must be between 0.1 and 1.0"))
        }
    })
}

fn parse_size_bytes(value: &str) -> std::result::Result<u64, String> {
    let trimmed = value.trim();
    if trimmed.is_empty() {
        return Err(String::from("Size must not be empty"));
    }

    let split_index = trimmed
        .find(|ch: char| !ch.is_ascii_digit())
        .unwrap_or(trimmed.len());
    let (digits, suffix) = trimmed.split_at(split_index);
    let base = digits
        .parse::<u64>()
        .map_err(|_| format!("Invalid size value: '{value}'"))?;
    let multiplier = match suffix.trim().to_ascii_lowercase().as_str() {
        "" | "b" => 1_u64,
        "k" | "kb" | "kib" => 1024_u64,
        "m" | "mb" | "mib" => 1024_u64.pow(2),
        "g" | "gb" | "gib" => 1024_u64.pow(3),
        "t" | "tb" | "tib" => 1024_u64.pow(4),
        _ => return Err(format!("Unsupported size suffix in '{value}'")),
    };
    base.checked_mul(multiplier)
        .ok_or_else(|| format!("Size is too large: '{value}'"))
}

fn size_bytes_parser() -> ValueParser {
    ValueParser::from(|s: &str| parse_size_bytes(s))
}

fn positive_usize_parser() -> ValueParser {
    ValueParser::from(|s: &str| {
        let value = s
            .parse::<usize>()
            .map_err(|_| String::from("Invalid positive integer"))?;
        if value == 0 {
            Err(String::from("Value must be greater than 0"))
        } else {
            Ok(value)
        }
    })
}

fn long_version() -> &'static str {
    if let Some(git_hash) = built_info::GIT_COMMIT_HASH {
        Box::leak(format!("{} - {}", env!("CARGO_PKG_VERSION"), git_hash).into_boxed_str())
    } else {
        env!("CARGO_PKG_VERSION")
    }
}

fn verbose_arg() -> Arg {
    Arg::new("verbose")
        .short('v')
        .long("verbose")
        .help("Increase verbosity, -vv for debug")
        .global(true)
        .action(ArgAction::Count)
}

fn quiet_arg() -> Arg {
    Arg::new("quiet")
        .short('q')
        .long("quiet")
        .help("Do not show progress bar")
        .global(true)
        .action(ArgAction::SetTrue)
}

fn threshold_arg(hidden: bool) -> Arg {
    Arg::new("threshold")
        .short('t')
        .long("threshold")
        .help("Threshold to determine if a file should be copied")
        .long_help(THRESHOLD_LONG_HELP)
        .value_name("THRESHOLD")
        .value_parser(threshold_parser())
        .default_value("0.1")
        .hide(hidden)
}

fn checksum_arg(hidden: bool) -> Arg {
    Arg::new("checksum")
        .short('c')
        .long("checksum")
        .help("Skip based on checksum, not mod-time & size")
        .long_help(CHECKSUM_LONG_HELP)
        .action(ArgAction::SetTrue)
        .hide(hidden)
}

fn fsync_arg(hidden: bool) -> Arg {
    Arg::new("fsync")
        .short('f')
        .long("fsync")
        .help("Force durable sync of committed files, directories, and symlinks")
        .long_help(FSYNC_LONG_HELP)
        .action(ArgAction::SetTrue)
        .hide(hidden)
}

fn large_file_parallel_threshold_arg() -> Arg {
    Arg::new("large_file_parallel_threshold")
        .long("large-file-parallel-threshold")
        .help("Use chunk-parallel workers for outbound network files at or above SIZE")
        .long_help(LARGE_FILE_PARALLEL_THRESHOLD_LONG_HELP)
        .value_name("SIZE")
        .value_parser(size_bytes_parser())
        .default_value("1GiB")
}

fn large_file_parallel_workers_arg() -> Arg {
    Arg::new("large_file_parallel_workers")
        .long("large-file-parallel-workers")
        .help("Set the number of worker sessions/connections for large outbound network transfers")
        .long_help(LARGE_FILE_PARALLEL_WORKERS_LONG_HELP)
        .value_name("N")
        .value_parser(positive_usize_parser())
}

fn network_file_concurrency_arg() -> Arg {
    Arg::new("network_file_concurrency")
        .long("network-file-concurrency")
        .help("Set the number of small outbound network files to keep in flight")
        .long_help(NETWORK_FILE_CONCURRENCY_LONG_HELP)
        .value_name("N")
        .value_parser(positive_usize_parser())
}

fn dry_run_arg() -> Arg {
    Arg::new("dry_run")
        .short('n')
        .long("dry-run")
        .help("Show what would have been transferred")
        .action(ArgAction::SetTrue)
}

fn delete_arg() -> Arg {
    Arg::new("delete")
        .long("delete")
        .help("Delete extraneous files from destination directories")
        .action(ArgAction::SetTrue)
}

fn ignore_arg(hidden: bool) -> Arg {
    Arg::new("ignore")
        .short('i')
        .long("ignore")
        .help("Ignore files/directories matching this pattern (glob)")
        .action(ArgAction::Append)
        .value_name("PATTERN")
        .hide(hidden)
}

fn exclude_from_arg(hidden: bool) -> Arg {
    Arg::new("exclude_from")
        .short('E')
        .long("exclude-from")
        .help("Read exclude patterns from FILE")
        .value_name("FILE")
        .value_parser(validator_path_exists())
        .hide(hidden)
}

fn src_arg() -> Arg {
    Arg::new("src")
        .help("Path to the source file or directory")
        .value_parser(validator_path_exists())
        .value_name("SRC")
        .required(true)
}

fn sync_operand_arg(id: &'static str, help: &'static str, value_name: &'static str) -> Arg {
    Arg::new(id)
        .help(help)
        .value_name(value_name)
        .required(true)
}

fn dst_arg() -> Arg {
    Arg::new("dst")
        .help("Path to the destination file or directory")
        .value_parser(validator_parent_exist())
        .value_name("DST")
        .required(true)
}

fn addr_arg() -> Arg {
    Arg::new("addr")
        .help("Listen address such as 0.0.0.0:8080")
        .value_name("ADDR")
        .required(true)
}

fn internal_stdio_args() -> [Arg; 12] {
    [
        Arg::new("stdio")
            .long("stdio")
            .help("Use stdin/stdout for communication (internal use for SSH)")
            .hide(true)
            .action(ArgAction::SetTrue),
        Arg::new("sender")
            .long("sender")
            .help("Run in sender mode (internal use for SSH)")
            .hide(true)
            .action(ArgAction::SetTrue),
        Arg::new("source")
            .long("source")
            .help("Path to the source file or directory")
            .hide(true)
            .value_parser(validator_path_exists())
            .value_name("SRC"),
        Arg::new("destination")
            .long("destination")
            .help("Path to the destination file or directory")
            .hide(true)
            .value_parser(validator_parent_exist())
            .value_name("DST"),
        Arg::new("chunk_writer")
            .long("chunk-writer")
            .help("Run in chunk-writer mode (internal use for SSH large files)")
            .hide(true)
            .action(ArgAction::SetTrue),
        Arg::new("transfer_id")
            .long("transfer-id")
            .help("Receiver-issued transfer id for chunk-writer mode")
            .hide(true)
            .value_name("ID"),
        threshold_arg(true),
        checksum_arg(true),
        delete_arg(),
        fsync_arg(true),
        ignore_arg(true),
        exclude_from_arg(true),
    ]
}

fn sync_command() -> Command {
    Command::new("sync")
        .about("Synchronize from SRC into DST across local paths, SSH endpoints, or raw TCP")
        .args([
            sync_operand_arg("src", "Source path or remote endpoint", "SRC"),
            sync_operand_arg("dst", "Destination path or remote endpoint", "DST"),
            threshold_arg(false),
            checksum_arg(false),
            fsync_arg(false),
            dry_run_arg(),
            delete_arg(),
            large_file_parallel_threshold_arg(),
            large_file_parallel_workers_arg(),
            network_file_concurrency_arg(),
            ignore_arg(false),
            exclude_from_arg(false),
        ])
}

fn listen_command() -> Command {
    Command::new("listen")
        .about("Listen for incoming sync operations and write them to a destination root")
        .args([addr_arg(), dst_arg(), fsync_arg(false)])
}

fn serve_command() -> Command {
    Command::new("serve")
        .about("Serve a local source root for remote sync clients")
        .args([
            addr_arg(),
            src_arg(),
            threshold_arg(false),
            checksum_arg(false),
            ignore_arg(false),
            exclude_from_arg(false),
        ])
}

fn base_command() -> Command {
    Command::new("pxs")
        .about(ABOUT)
        .long_about(LONG_ABOUT)
        .version(env!("CARGO_PKG_VERSION"))
        .long_version(long_version())
        .color(ColorChoice::Auto)
        .styles(cli_styles())
        .arg_required_else_help(true)
        .disable_help_subcommand(true)
        .arg(verbose_arg())
        .arg(quiet_arg())
        .args(internal_stdio_args())
        .subcommands([sync_command(), listen_command(), serve_command()])
}

/// Build the CLI command definition.
#[must_use]
pub fn new() -> Command {
    base_command()
}

#[cfg(test)]
mod tests {
    use super::new;
    use anyhow::Result;
    use tempfile::tempdir;

    #[test]
    fn test_verbose_flag_counts_occurrences() -> Result<()> {
        let dir = tempdir()?;
        let src = dir.path().join("src.txt");
        let dst = dir.path().join("dst.txt");
        std::fs::write(&src, "content")?;
        let src_arg = src.to_string_lossy().to_string();
        let dst_arg = dst.to_string_lossy().to_string();
        let matches = new().try_get_matches_from(["pxs", "sync", &src_arg, &dst_arg, "-vvv"])?;
        assert_eq!(matches.get_count("verbose"), 3);
        Ok(())
    }

    #[test]
    fn test_threshold_rejects_out_of_range_values() -> Result<()> {
        let dir = tempdir()?;
        let src = dir.path().join("src.txt");
        let dst = dir.path().join("dst.txt");
        std::fs::write(&src, "content")?;
        let src_arg = src.to_string_lossy().to_string();
        let dst_arg = dst.to_string_lossy().to_string();

        let too_small =
            new().try_get_matches_from(["pxs", "sync", &src_arg, &dst_arg, "--threshold", "0.01"]);
        assert!(too_small.is_err());

        let too_large =
            new().try_get_matches_from(["pxs", "sync", &src_arg, &dst_arg, "--threshold", "1.5"]);
        assert!(too_large.is_err());
        Ok(())
    }

    #[test]
    fn test_help_hides_internal_stdio_flags() -> Result<()> {
        let mut help = Vec::new();
        new().write_long_help(&mut help)?;
        let help = String::from_utf8(help)?;
        assert!(!help.contains("--stdio"));
        assert!(!help.contains("--source"));
        assert!(help.contains("sync"));
        assert!(!help.contains("\npush"));
        assert!(!help.contains("\npull"));
        Ok(())
    }

    #[test]
    fn test_internal_stdio_mode_still_parses() -> Result<()> {
        let matches =
            new().try_get_matches_from(["pxs", "--stdio", "--destination", ".", "--fsync"])?;
        assert!(matches.get_flag("stdio"));
        assert!(matches.get_flag("fsync"));
        Ok(())
    }
}