Skip to main content

btrfs_cli/
receive.rs

1mod dump;
2
3use crate::{Format, Runnable};
4use anyhow::{Context, Result, bail};
5use btrfs_stream::{ReceiveContext, StreamCommand, StreamReader};
6use clap::Parser;
7use std::{fs::File, io, path::PathBuf};
8
9/// Receive subvolumes from a stream.
10///
11/// Read a btrfs send stream and recreate subvolumes on the destination filesystem.
12/// Streams can be received incrementally based on a parent subvolume to only
13/// apply changes. Multiple streams can be received in sequence. The destination
14/// filesystem must be mounted and writable. Requires CAP_SYS_ADMIN.
15#[derive(Parser, Debug)]
16pub struct ReceiveCommand {
17    /// Mount point of the destination filesystem (not required with --dump)
18    mount: Option<PathBuf>,
19
20    /// Read the stream from FILE instead of stdin
21    #[clap(short = 'f', long = "file")]
22    file: Option<PathBuf>,
23
24    /// Terminate after receiving an end-cmd marker
25    #[clap(short = 'e', long)]
26    terminate_on_end: bool,
27
28    /// Confine the process to directory using chroot
29    #[clap(short = 'C', long)]
30    chroot: bool,
31
32    /// Terminate after NERR errors (0 means unlimited)
33    #[clap(short = 'E', long)]
34    max_errors: Option<u64>,
35
36    /// The root mount point of the destination filesystem
37    #[clap(short = 'm', long = "root-mount")]
38    root_mount: Option<PathBuf>,
39
40    /// Always decompress instead of using encoded I/O
41    #[clap(long)]
42    force_decompress: bool,
43
44    /// Dump stream metadata without requiring the mount parameter
45    #[clap(long)]
46    dump: bool,
47}
48
49impl Runnable for ReceiveCommand {
50    fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
51        let input: Box<dyn io::Read> = match &self.file {
52            Some(path) => Box::new(File::open(path).with_context(|| {
53                format!("cannot open '{}'", path.display())
54            })?),
55            None => Box::new(io::stdin()),
56        };
57
58        if self.dump {
59            return dump::dump_stream(input);
60        }
61
62        let mount = self.mount.as_ref().ok_or_else(|| {
63            anyhow::anyhow!("mount point is required (unless --dump)")
64        })?;
65
66        if !mount.is_dir() {
67            bail!("'{}' is not a directory", mount.display());
68        }
69
70        // The input file must be opened before chroot (it may be outside
71        // the mount point). The stream reader consumes the input.
72        let mut reader = StreamReader::new(input)?;
73
74        let dest = if self.chroot {
75            // Confine the process to the mount point. After this, all paths
76            // in the stream are resolved relative to "/".
77            let mount_cstr =
78                std::ffi::CString::new(mount.to_str().ok_or_else(|| {
79                    anyhow::anyhow!("mount path is not valid UTF-8")
80                })?)
81                .context("mount path contains null byte")?;
82
83            if unsafe { nix::libc::chroot(mount_cstr.as_ptr()) } != 0 {
84                return Err(std::io::Error::last_os_error()).context(format!(
85                    "failed to chroot to '{}'",
86                    mount.display()
87                ));
88            }
89            if unsafe { nix::libc::chdir(c"/".as_ptr()) } != 0 {
90                return Err(std::io::Error::last_os_error())
91                    .context("failed to chdir to / after chroot");
92            }
93            eprintln!("Chroot to {}", mount.display());
94            PathBuf::from("/")
95        } else {
96            mount.clone()
97        };
98
99        let mut ctx = ReceiveContext::new(&dest)?;
100        let max_errors = self.max_errors.unwrap_or(0);
101        let mut error_count = 0u64;
102        let mut received_subvol = false;
103
104        loop {
105            match reader.next_command() {
106                Err(e) => {
107                    error_count += 1;
108                    eprintln!("ERROR: {e:#}");
109                    if max_errors > 0 && error_count >= max_errors {
110                        bail!("too many errors ({error_count}), aborting");
111                    }
112                    continue;
113                }
114                Ok(None) => {
115                    // EOF — finalize and exit.
116                    break;
117                }
118                Ok(Some(StreamCommand::End)) => {
119                    ctx.close_write_fd();
120                    ctx.finish_subvol()?;
121                    received_subvol = false;
122
123                    if self.terminate_on_end {
124                        return Ok(());
125                    }
126
127                    // Try to read the next stream header for multi-stream input.
128                    // If there's more data, the next call to next_command() on a
129                    // new reader will pick it up. We re-create the reader with the
130                    // remaining input.
131                    let inner = reader.into_inner();
132                    match StreamReader::new(inner) {
133                        Ok(new_reader) => {
134                            reader = new_reader;
135                        }
136                        Err(_) => {
137                            // No more streams.
138                            return Ok(());
139                        }
140                    }
141                    continue;
142                }
143                Ok(Some(cmd)) => {
144                    if matches!(
145                        &cmd,
146                        StreamCommand::Subvol { .. }
147                            | StreamCommand::Snapshot { .. }
148                    ) {
149                        received_subvol = true;
150                    }
151                    if let Err(e) = ctx.process_command(&cmd) {
152                        error_count += 1;
153                        eprintln!("ERROR: {e:#}");
154                        if max_errors > 0 && error_count >= max_errors {
155                            bail!("too many errors ({error_count}), aborting");
156                        }
157                    }
158                }
159            }
160        }
161
162        // Finalize the last subvolume if we received one.
163        if received_subvol {
164            ctx.finish_subvol()?;
165        }
166
167        Ok(())
168    }
169}