Skip to main content

btrfs_cli/
receive.rs

1mod dump;
2
3use crate::{Format, Runnable};
4use anyhow::{Context, Result, bail};
5use btrfs_stream::{ReceiveContext, StreamCommand, StreamReader};
6use clap::Parser;
7use std::{fs::File, io, path::PathBuf};
8
9/// Receive subvolumes from a stream.
10///
11/// Read a btrfs send stream and recreate subvolumes on the destination filesystem.
12/// Streams can be received incrementally based on a parent subvolume to only
13/// apply changes. Multiple streams can be received in sequence. The destination
14/// filesystem must be mounted and writable. Requires CAP_SYS_ADMIN.
15#[derive(Parser, Debug)]
16pub struct ReceiveCommand {
17    /// Mount point of the destination filesystem (not required with --dump)
18    mount: Option<PathBuf>,
19
20    /// Read the stream from FILE instead of stdin
21    #[clap(short = 'f', long = "file")]
22    file: Option<PathBuf>,
23
24    /// Terminate after receiving an end-cmd marker
25    #[clap(short = 'e', long)]
26    terminate_on_end: bool,
27
28    /// Confine the process to directory using chroot
29    #[clap(short = 'C', long)]
30    chroot: bool,
31
32    /// Terminate after NERR errors (0 means unlimited)
33    #[clap(short = 'E', long)]
34    max_errors: Option<u64>,
35
36    /// The root mount point of the destination filesystem
37    #[clap(short = 'm', long = "root-mount")]
38    root_mount: Option<PathBuf>,
39
40    /// Always decompress instead of using encoded I/O
41    #[clap(long)]
42    force_decompress: bool,
43
44    /// Dump stream metadata without requiring the mount parameter
45    #[clap(long)]
46    dump: bool,
47}
48
49impl Runnable for ReceiveCommand {
50    fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
51        let input: Box<dyn io::Read> = match &self.file {
52            Some(path) => Box::new(File::open(path).with_context(|| {
53                format!("cannot open '{}'", path.display())
54            })?),
55            None => Box::new(io::stdin()),
56        };
57
58        if self.dump {
59            return dump::dump_stream(input);
60        }
61
62        let mount = self.mount.as_ref().ok_or_else(|| {
63            anyhow::anyhow!("mount point is required (unless --dump)")
64        })?;
65
66        if !mount.is_dir() {
67            bail!("'{}' is not a directory", mount.display());
68        }
69
70        // The input file must be opened before chroot (it may be outside
71        // the mount point). The stream reader consumes the input.
72        let mut reader = StreamReader::new(input)?;
73
74        let dest = if self.chroot {
75            // Confine the process to the mount point. After this, all paths
76            // in the stream are resolved relative to "/".
77            let mount_cstr =
78                std::ffi::CString::new(mount.to_str().ok_or_else(|| {
79                    anyhow::anyhow!("mount path is not valid UTF-8")
80                })?)
81                .context("mount path contains null byte")?;
82
83            if unsafe { nix::libc::chroot(mount_cstr.as_ptr()) } != 0 {
84                return Err(std::io::Error::last_os_error()).context(format!(
85                    "failed to chroot to '{}'",
86                    mount.display()
87                ));
88            }
89            if unsafe { nix::libc::chdir(c"/".as_ptr()) } != 0 {
90                return Err(std::io::Error::last_os_error())
91                    .context("failed to chdir to / after chroot");
92            }
93            eprintln!("Chroot to {}", mount.display());
94            PathBuf::from("/")
95        } else {
96            mount.clone()
97        };
98
99        let mut ctx = ReceiveContext::new(&dest)?;
100        let max_errors = self.max_errors.unwrap_or(0);
101        let mut error_count = 0u64;
102        let mut received_subvol = false;
103
104        loop {
105            match reader.next_command() {
106                Err(e) => {
107                    error_count += 1;
108                    eprintln!("ERROR: {e:#}");
109                    if max_errors > 0 && error_count >= max_errors {
110                        bail!("too many errors ({error_count}), aborting");
111                    }
112                }
113                Ok(None) => {
114                    // EOF — finalize and exit.
115                    break;
116                }
117                Ok(Some(StreamCommand::End)) => {
118                    ctx.close_write_fd();
119                    ctx.finish_subvol()?;
120                    received_subvol = false;
121
122                    if self.terminate_on_end {
123                        return Ok(());
124                    }
125
126                    // Try to read the next stream header for multi-stream input.
127                    // If there's more data, the next call to next_command() on a
128                    // new reader will pick it up. We re-create the reader with the
129                    // remaining input.
130                    let inner = reader.into_inner();
131                    match StreamReader::new(inner) {
132                        Ok(new_reader) => {
133                            reader = new_reader;
134                        }
135                        Err(_) => {
136                            // No more streams.
137                            return Ok(());
138                        }
139                    }
140                }
141                Ok(Some(cmd)) => {
142                    if matches!(
143                        &cmd,
144                        StreamCommand::Subvol { .. }
145                            | StreamCommand::Snapshot { .. }
146                    ) {
147                        received_subvol = true;
148                    }
149                    if let Err(e) = ctx.process_command(&cmd) {
150                        error_count += 1;
151                        eprintln!("ERROR: {e:#}");
152                        if max_errors > 0 && error_count >= max_errors {
153                            bail!("too many errors ({error_count}), aborting");
154                        }
155                    }
156                }
157            }
158        }
159
160        // Finalize the last subvolume if we received one.
161        if received_subvol {
162            ctx.finish_subvol()?;
163        }
164
165        Ok(())
166    }
167}