Skip to main content

btrfs_cli/
receive.rs

1mod dump;
2mod ops;
3
4use crate::{Format, Runnable};
5use anyhow::{Context, Result, bail};
6use btrfs_disk::stream::{StreamCommand, StreamReader};
7use clap::Parser;
8use ops::ReceiveContext;
9use std::{fs::File, io, path::PathBuf};
10
11/// Receive subvolumes from a stream.
12///
13/// Read a btrfs send stream and recreate subvolumes on the destination filesystem.
14/// Streams can be received incrementally based on a parent subvolume to only
15/// apply changes. Multiple streams can be received in sequence. The destination
16/// filesystem must be mounted and writable. Requires CAP_SYS_ADMIN.
17#[derive(Parser, Debug)]
18pub struct ReceiveCommand {
19    /// Mount point of the destination filesystem (not required with --dump)
20    mount: Option<PathBuf>,
21
22    /// Read the stream from FILE instead of stdin
23    #[clap(short = 'f')]
24    file: Option<PathBuf>,
25
26    /// Terminate after receiving an end-cmd marker
27    #[clap(short = 'e')]
28    terminate_on_end: bool,
29
30    /// Confine the process to directory using chroot
31    #[clap(short = 'C', long)]
32    chroot: bool,
33
34    /// Terminate after NERR errors (0 means unlimited)
35    #[clap(short = 'E', long)]
36    max_errors: Option<u64>,
37
38    /// The root mount point of the destination filesystem
39    #[clap(short = 'm', long = "root-mount")]
40    root_mount: Option<PathBuf>,
41
42    /// Always decompress instead of using encoded I/O
43    #[clap(long)]
44    force_decompress: bool,
45
46    /// Dump stream metadata without requiring the mount parameter
47    #[clap(long)]
48    dump: bool,
49}
50
51impl Runnable for ReceiveCommand {
52    fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
53        let input: Box<dyn io::Read> = match &self.file {
54            Some(path) => Box::new(
55                File::open(path).with_context(|| format!("cannot open '{}'", path.display()))?,
56            ),
57            None => Box::new(io::stdin()),
58        };
59
60        if self.dump {
61            return dump::dump_stream(input);
62        }
63
64        let mount = self
65            .mount
66            .as_ref()
67            .ok_or_else(|| anyhow::anyhow!("mount point is required (unless --dump)"))?;
68
69        if !mount.is_dir() {
70            bail!("'{}' is not a directory", mount.display());
71        }
72
73        // The input file must be opened before chroot (it may be outside
74        // the mount point). The stream reader consumes the input.
75        let mut reader = StreamReader::new(input)?;
76
77        let dest = if self.chroot {
78            // Confine the process to the mount point. After this, all paths
79            // in the stream are resolved relative to "/".
80            let mount_cstr = std::ffi::CString::new(
81                mount
82                    .to_str()
83                    .ok_or_else(|| anyhow::anyhow!("mount path is not valid UTF-8"))?,
84            )
85            .context("mount path contains null byte")?;
86
87            if unsafe { nix::libc::chroot(mount_cstr.as_ptr()) } != 0 {
88                return Err(std::io::Error::last_os_error())
89                    .context(format!("failed to chroot to '{}'", mount.display()));
90            }
91            if unsafe { nix::libc::chdir(c"/".as_ptr()) } != 0 {
92                return Err(std::io::Error::last_os_error())
93                    .context("failed to chdir to / after chroot");
94            }
95            eprintln!("Chroot to {}", mount.display());
96            PathBuf::from("/")
97        } else {
98            mount.clone()
99        };
100
101        let mut ctx = ReceiveContext::new(&dest)?;
102        let max_errors = self.max_errors.unwrap_or(0);
103        let mut error_count = 0u64;
104        let mut received_subvol = false;
105
106        loop {
107            match reader.next_command() {
108                Err(e) => {
109                    error_count += 1;
110                    eprintln!("ERROR: {e:#}");
111                    if max_errors > 0 && error_count >= max_errors {
112                        bail!("too many errors ({error_count}), aborting");
113                    }
114                    continue;
115                }
116                Ok(None) => {
117                    // EOF — finalize and exit.
118                    break;
119                }
120                Ok(Some(StreamCommand::End)) => {
121                    ctx.close_write_fd();
122                    ctx.finish_subvol()?;
123                    received_subvol = false;
124
125                    if self.terminate_on_end {
126                        return Ok(());
127                    }
128
129                    // Try to read the next stream header for multi-stream input.
130                    // If there's more data, the next call to next_command() on a
131                    // new reader will pick it up. We re-create the reader with the
132                    // remaining input.
133                    let inner = reader.into_inner();
134                    match StreamReader::new(inner) {
135                        Ok(new_reader) => {
136                            reader = new_reader;
137                        }
138                        Err(_) => {
139                            // No more streams.
140                            return Ok(());
141                        }
142                    }
143                    continue;
144                }
145                Ok(Some(cmd)) => {
146                    if matches!(
147                        &cmd,
148                        StreamCommand::Subvol { .. } | StreamCommand::Snapshot { .. }
149                    ) {
150                        received_subvol = true;
151                    }
152                    if let Err(e) = ctx.process_command(&cmd) {
153                        error_count += 1;
154                        eprintln!("ERROR: {e:#}");
155                        if max_errors > 0 && error_count >= max_errors {
156                            bail!("too many errors ({error_count}), aborting");
157                        }
158                    }
159                }
160            }
161        }
162
163        // Finalize the last subvolume if we received one.
164        if received_subvol {
165            ctx.finish_subvol()?;
166        }
167
168        Ok(())
169    }
170}