Skip to main content

btrfs_cli/
send.rs

1use crate::{RunContext, Runnable};
2use anyhow::{Context, Result, bail};
3use btrfs_uapi::{
4    send_receive::SendFlags,
5    subvolume::{SubvolumeFlags, subvolume_flags_get, subvolume_info},
6    sysfs::SysfsBtrfs,
7};
8use clap::Parser;
9use std::{
10    fs::File,
11    io::{self, Read, Write},
12    os::{
13        fd::{AsFd, AsRawFd, OwnedFd},
14        unix::io::FromRawFd,
15    },
16    path::{Path, PathBuf},
17    thread,
18};
19
20const HEADING_INCREMENTAL: &str = "Incremental";
21const HEADING_PROTOCOL: &str = "Protocol";
22
23/// Send the subvolume(s) to stdout.
24///
25/// Generate a stream representation of one or more subvolumes that can be
26/// transmitted over the network or stored for later restoration. Streams
27/// are incremental and can be based on a parent subvolume to only send
28/// changes. The stream output is in btrfs send format and can be received
29/// with the receive command. Requires CAP_SYS_ADMIN.
30#[derive(Parser, Debug)]
31#[allow(clippy::doc_markdown)]
32pub struct SendCommand {
33    /// Subvolume(s) to send
34    #[clap(required = true)]
35    subvolumes: Vec<PathBuf>,
36
37    /// Omit end-cmd marker between subvolumes
38    #[clap(short = 'e', long)]
39    omit_end_cmd: bool,
40
41    /// Send an incremental stream from parent to the subvolume
42    #[clap(short = 'p', long, help_heading = HEADING_INCREMENTAL)]
43    parent: Option<PathBuf>,
44
45    /// Use this snapshot as a clone source (may be given multiple times)
46    #[clap(short = 'c', long = "clone-src", help_heading = HEADING_INCREMENTAL)]
47    clone_src: Vec<PathBuf>,
48
49    /// Write output to a file instead of stdout
50    #[clap(short = 'f', long)]
51    outfile: Option<PathBuf>,
52
53    /// Send in NO_FILE_DATA mode
54    #[clap(long, help_heading = HEADING_PROTOCOL)]
55    no_data: bool,
56
57    /// Use send protocol version N (0 = highest supported by kernel)
58    #[clap(long, help_heading = HEADING_PROTOCOL)]
59    proto: Option<u32>,
60
61    /// Send compressed data directly without decompressing
62    #[clap(long, help_heading = HEADING_PROTOCOL)]
63    compressed_data: bool,
64}
65
66/// Buffer size for protocol v1 (matches `BTRFS_SEND_BUF_SIZE_V1` = 64 KiB).
67const SEND_BUF_SIZE_V1: usize = 64 * 1024;
68/// Buffer size for protocol v2+ (16 KiB + 128 KiB compressed = 144 KiB).
69const SEND_BUF_SIZE_V2: usize = 16 * 1024 + 128 * 1024;
70
71fn open_subvol_ro(path: &Path) -> Result<File> {
72    File::open(path)
73        .with_context(|| format!("cannot open '{}'", path.display()))
74}
75
76fn check_subvol_readonly(file: &File, path: &Path) -> Result<()> {
77    let flags = subvolume_flags_get(file.as_fd()).with_context(|| {
78        format!("failed to get flags for '{}'", path.display())
79    })?;
80    if !flags.contains(SubvolumeFlags::RDONLY) {
81        bail!("subvolume '{}' is not read-only", path.display());
82    }
83    Ok(())
84}
85
86fn get_root_id(file: &File, path: &Path) -> Result<u64> {
87    let info = subvolume_info(file.as_fd()).with_context(|| {
88        format!("failed to get subvolume info for '{}'", path.display())
89    })?;
90    Ok(info.id)
91}
92
93/// Find the best parent among clone sources for incremental send.
94///
95/// Looks for a clone source that shares the same parent UUID as the target
96/// subvolume and picks the one with the closest ctransid.
97fn find_good_parent(
98    subvol_info: &btrfs_uapi::subvolume::SubvolumeInfo,
99    clone_source_paths: &[PathBuf],
100) -> Result<Option<u64>> {
101    if subvol_info.parent_uuid.is_nil() {
102        return Ok(None);
103    }
104
105    let mut best_root_id = None;
106    let mut best_diff = u64::MAX;
107
108    for cs_path in clone_source_paths {
109        let cs_file = open_subvol_ro(cs_path)?;
110        let cs_info = subvolume_info(cs_file.as_fd()).with_context(|| {
111            format!(
112                "failed to get info for clone source '{}'",
113                cs_path.display()
114            )
115        })?;
116
117        // Check if this clone source shares the same parent or IS the parent.
118        if cs_info.parent_uuid != subvol_info.parent_uuid
119            && cs_info.uuid != subvol_info.parent_uuid
120        {
121            continue;
122        }
123
124        let diff = subvol_info.ctransid.abs_diff(cs_info.ctransid);
125        if diff < best_diff {
126            best_diff = diff;
127            best_root_id = Some(cs_info.id);
128        }
129    }
130
131    Ok(best_root_id)
132}
133
134/// Create a pipe and return (`read_end`, `write_end`) as `OwnedFd`s.
135fn make_pipe() -> Result<(OwnedFd, OwnedFd)> {
136    let mut fds = [0i32; 2];
137    let ret = unsafe { nix::libc::pipe(fds.as_mut_ptr()) };
138    if ret < 0 {
139        return Err(io::Error::last_os_error())
140            .context("failed to create pipe");
141    }
142    // SAFETY: pipe() just returned two valid fds.
143    let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) };
144    let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) };
145    Ok((read_end, write_end))
146}
147
148/// Spawn a thread that reads from `read_fd` and writes everything to `out`.
149fn spawn_reader_thread(
150    read_fd: OwnedFd,
151    mut out: Box<dyn Write + Send>,
152    buf_size: usize,
153) -> thread::JoinHandle<Result<()>> {
154    thread::spawn(move || {
155        let mut file = File::from(read_fd);
156        let mut buf = vec![0u8; buf_size];
157        loop {
158            let n = file
159                .read(&mut buf)
160                .context("failed to read send stream from kernel")?;
161            if n == 0 {
162                return Ok(());
163            }
164            out.write_all(&buf[..n])
165                .context("failed to write send stream to output")?;
166        }
167    })
168}
169
170/// Open or create the output writer for the reader thread.
171fn open_output(outfile: Option<&PathBuf>) -> Result<Box<dyn Write + Send>> {
172    match outfile {
173        Some(path) => {
174            let file =
175                File::options().append(true).open(path).with_context(|| {
176                    format!("cannot open '{}' for writing", path.display())
177                })?;
178            Ok(Box::new(file))
179        }
180        None => Ok(Box::new(io::stdout())),
181    }
182}
183
184impl Runnable for SendCommand {
185    #[allow(clippy::too_many_lines)]
186    fn run(&self, _ctx: &RunContext) -> Result<()> {
187        // Validate output destination.
188        if let Some(path) = &self.outfile {
189            // Try opening existing file first, then create. Truncate since
190            // this is the start of a new send.
191            File::options()
192                .write(true)
193                .truncate(true)
194                .open(path)
195                .or_else(|_| {
196                    File::options()
197                        .write(true)
198                        .truncate(true)
199                        .create(true)
200                        .open(path)
201                })
202                .with_context(|| {
203                    format!("cannot create '{}'", path.display())
204                })?;
205        } else {
206            let stdout = io::stdout();
207            if unsafe { nix::libc::isatty(stdout.as_fd().as_raw_fd()) } == 1 {
208                bail!(
209                    "not dumping send stream into a terminal, redirect it into a file"
210                );
211            }
212        }
213
214        // Validate all subvolumes are read-only.
215        for subvol_path in &self.subvolumes {
216            let file = open_subvol_ro(subvol_path)?;
217            check_subvol_readonly(&file, subvol_path)?;
218        }
219
220        // Validate parent is read-only and get its root ID.
221        let mut parent_root_id: u64 = 0;
222        if let Some(parent_path) = &self.parent {
223            let file = open_subvol_ro(parent_path)?;
224            check_subvol_readonly(&file, parent_path)?;
225            parent_root_id = get_root_id(&file, parent_path)?;
226        }
227
228        // Collect clone source root IDs and validate they are read-only.
229        let mut clone_sources: Vec<u64> = Vec::new();
230        for cs_path in &self.clone_src {
231            let file = open_subvol_ro(cs_path)?;
232            check_subvol_readonly(&file, cs_path)?;
233            clone_sources.push(get_root_id(&file, cs_path)?);
234        }
235
236        // If a parent was given, add it to clone sources (matches C behavior).
237        if self.parent.is_some() && !clone_sources.contains(&parent_root_id) {
238            clone_sources.push(parent_root_id);
239        }
240
241        let full_send = self.parent.is_none() && self.clone_src.is_empty();
242
243        // Determine protocol version.
244        let first_file = open_subvol_ro(&self.subvolumes[0])?;
245        let fs = btrfs_uapi::filesystem::filesystem_info(first_file.as_fd())
246            .context("failed to get filesystem info")?;
247        let sysfs = SysfsBtrfs::new(&fs.uuid);
248        let proto_supported = sysfs.send_stream_version();
249
250        let mut proto = self.proto.unwrap_or(1);
251        if proto == 0 {
252            proto = proto_supported;
253        }
254
255        if proto > proto_supported && proto_supported == 1 {
256            bail!(
257                "requested protocol version {proto} but kernel supports only {proto_supported}"
258            );
259        }
260
261        // Build send flags.
262        let mut flags = SendFlags::empty();
263        if self.no_data {
264            flags |= SendFlags::NO_FILE_DATA;
265        }
266        if self.compressed_data {
267            if proto == 1 && self.proto.is_none() {
268                proto = 2;
269            }
270            if proto < 2 {
271                bail!(
272                    "--compressed-data requires protocol version >= 2 (requested {proto})"
273                );
274            }
275            if proto_supported < 2 {
276                bail!("kernel does not support --compressed-data");
277            }
278            flags |= SendFlags::COMPRESSED;
279        }
280        if proto_supported > 1 {
281            flags |= SendFlags::VERSION;
282        }
283
284        let buf_size = if proto > 1 {
285            SEND_BUF_SIZE_V2
286        } else {
287            SEND_BUF_SIZE_V1
288        };
289
290        // Send each subvolume.
291        let count = self.subvolumes.len();
292        for (i, subvol_path) in self.subvolumes.iter().enumerate() {
293            let is_first = i == 0;
294            let is_last = i == count - 1;
295
296            eprintln!("At subvol {}", subvol_path.display());
297
298            let subvol_file = open_subvol_ro(subvol_path)?;
299
300            // For incremental send without an explicit parent, find the best
301            // parent among clone sources.
302            let mut this_parent = parent_root_id;
303            if !full_send && self.parent.is_none() {
304                let info =
305                    subvolume_info(subvol_file.as_fd()).with_context(|| {
306                        format!(
307                            "failed to get info for '{}'",
308                            subvol_path.display()
309                        )
310                    })?;
311                match find_good_parent(&info, &self.clone_src)? {
312                    Some(id) => this_parent = id,
313                    None => bail!(
314                        "cannot find a suitable parent for '{}' among clone sources",
315                        subvol_path.display()
316                    ),
317                }
318            }
319
320            // Build per-subvolume flags.
321            let mut subvol_flags = flags;
322            if self.omit_end_cmd {
323                if !is_first {
324                    subvol_flags |= SendFlags::OMIT_STREAM_HEADER;
325                }
326                if !is_last {
327                    subvol_flags |= SendFlags::OMIT_END_CMD;
328                }
329            }
330
331            // Create pipe and spawn reader thread.
332            let (pipe_read, pipe_write) = make_pipe()?;
333            let out = open_output(self.outfile.as_ref())?;
334            let reader = spawn_reader_thread(pipe_read, out, buf_size);
335
336            let send_result = btrfs_uapi::send_receive::send(
337                subvol_file.as_fd(),
338                pipe_write.as_raw_fd(),
339                this_parent,
340                &mut clone_sources,
341                subvol_flags,
342                proto,
343            );
344
345            // Close write end so the reader thread sees EOF.
346            drop(pipe_write);
347
348            if let Err(e) = send_result {
349                let _ = reader.join();
350                if e == nix::errno::Errno::EINVAL && self.omit_end_cmd {
351                    bail!(
352                        "send ioctl failed: {e}\n\
353                         Try upgrading your kernel or don't use -e."
354                    );
355                }
356                return Err(e).with_context(|| {
357                    format!("send failed for '{}'", subvol_path.display())
358                });
359            }
360
361            match reader.join() {
362                Ok(Ok(())) => {}
363                Ok(Err(e)) => {
364                    return Err(e).context("send stream reader failed");
365                }
366                Err(_) => bail!("send stream reader thread panicked"),
367            }
368
369            // After sending, add to clone sources for subsequent subvolumes.
370            if !full_send && self.parent.is_none() {
371                let root_id = get_root_id(&subvol_file, subvol_path)?;
372                if !clone_sources.contains(&root_id) {
373                    clone_sources.push(root_id);
374                }
375            }
376        }
377
378        Ok(())
379    }
380}