Skip to main content

btrfs_cli/
receive.rs

1mod dump;
2
3use crate::{RunContext, Runnable};
4use anyhow::{Context, Result, bail};
5use btrfs_stream::{ReceiveContext, StreamCommand, StreamReader};
6use clap::Parser;
7use std::{fs::File, io, path::PathBuf};
8
9/// Receive subvolumes from a stream.
10///
11/// Read a btrfs send stream and recreate subvolumes on the destination filesystem.
12/// Streams can be received incrementally based on a parent subvolume to only
13/// apply changes. Multiple streams can be received in sequence. The destination
14/// filesystem must be mounted and writable. Requires CAP_SYS_ADMIN.
15#[derive(Parser, Debug)]
16#[allow(clippy::doc_markdown, clippy::struct_excessive_bools)]
17pub struct ReceiveCommand {
18    /// Mount point of the destination filesystem (not required with --dump)
19    mount: Option<PathBuf>,
20
21    /// Read the stream from FILE instead of stdin
22    #[clap(short = 'f', long = "file")]
23    file: Option<PathBuf>,
24
25    /// Terminate after receiving an end-cmd marker
26    #[clap(short = 'e', long)]
27    terminate_on_end: bool,
28
29    /// Confine the process to directory using chroot
30    #[clap(short = 'C', long)]
31    chroot: bool,
32
33    /// Terminate after NERR errors (0 means unlimited)
34    #[clap(short = 'E', long)]
35    max_errors: Option<u64>,
36
37    /// The root mount point of the destination filesystem
38    #[clap(short = 'm', long = "root-mount")]
39    root_mount: Option<PathBuf>,
40
41    /// Always decompress instead of using encoded I/O
42    #[clap(long)]
43    force_decompress: bool,
44
45    /// Dump stream metadata without requiring the mount parameter
46    #[clap(long)]
47    dump: bool,
48}
49
50impl Runnable for ReceiveCommand {
51    fn run(&self, _ctx: &RunContext) -> Result<()> {
52        let input: Box<dyn io::Read> = match &self.file {
53            Some(path) => Box::new(File::open(path).with_context(|| {
54                format!("cannot open '{}'", path.display())
55            })?),
56            None => Box::new(io::stdin()),
57        };
58
59        if self.dump {
60            return dump::dump_stream(input);
61        }
62
63        let mount = self.mount.as_ref().ok_or_else(|| {
64            anyhow::anyhow!("mount point is required (unless --dump)")
65        })?;
66
67        if !mount.is_dir() {
68            bail!("'{}' is not a directory", mount.display());
69        }
70
71        // The input file must be opened before chroot (it may be outside
72        // the mount point). The stream reader consumes the input.
73        let mut reader = StreamReader::new(input)?;
74
75        let dest = if self.chroot {
76            // Confine the process to the mount point. After this, all paths
77            // in the stream are resolved relative to "/".
78            let mount_cstr =
79                std::ffi::CString::new(mount.to_str().ok_or_else(|| {
80                    anyhow::anyhow!("mount path is not valid UTF-8")
81                })?)
82                .context("mount path contains null byte")?;
83
84            if unsafe { nix::libc::chroot(mount_cstr.as_ptr()) } != 0 {
85                return Err(std::io::Error::last_os_error()).context(format!(
86                    "failed to chroot to '{}'",
87                    mount.display()
88                ));
89            }
90            if unsafe { nix::libc::chdir(c"/".as_ptr()) } != 0 {
91                return Err(std::io::Error::last_os_error())
92                    .context("failed to chdir to / after chroot");
93            }
94            eprintln!("Chroot to {}", mount.display());
95            PathBuf::from("/")
96        } else {
97            mount.clone()
98        };
99
100        let mut ctx = ReceiveContext::new(&dest)?;
101        let max_errors = self.max_errors.unwrap_or(0);
102        let mut error_count = 0u64;
103        let mut received_subvol = false;
104
105        loop {
106            match reader.next_command() {
107                Err(e) => {
108                    error_count += 1;
109                    eprintln!("ERROR: {e:#}");
110                    if max_errors > 0 && error_count >= max_errors {
111                        bail!("too many errors ({error_count}), aborting");
112                    }
113                }
114                Ok(None) => {
115                    // EOF — finalize and exit.
116                    break;
117                }
118                Ok(Some(StreamCommand::End)) => {
119                    ctx.close_write_fd();
120                    ctx.finish_subvol()?;
121                    received_subvol = false;
122
123                    if self.terminate_on_end {
124                        return Ok(());
125                    }
126
127                    // Try to read the next stream header for multi-stream input.
128                    // If there's more data, the next call to next_command() on a
129                    // new reader will pick it up. We re-create the reader with the
130                    // remaining input.
131                    let inner = reader.into_inner();
132                    match StreamReader::new(inner) {
133                        Ok(new_reader) => {
134                            reader = new_reader;
135                        }
136                        Err(_) => {
137                            // No more streams.
138                            return Ok(());
139                        }
140                    }
141                }
142                Ok(Some(cmd)) => {
143                    if matches!(
144                        &cmd,
145                        StreamCommand::Subvol { .. }
146                            | StreamCommand::Snapshot { .. }
147                    ) {
148                        received_subvol = true;
149                    }
150                    if let Err(e) = ctx.process_command(&cmd) {
151                        error_count += 1;
152                        eprintln!("ERROR: {e:#}");
153                        if max_errors > 0 && error_count >= max_errors {
154                            bail!("too many errors ({error_count}), aborting");
155                        }
156                    }
157                }
158            }
159        }
160
161        // Finalize the last subvolume if we received one.
162        if received_subvol {
163            ctx.finish_subvol()?;
164        }
165
166        Ok(())
167    }
168}