Skip to main content

btrfs_cli/
send.rs

1use crate::{Format, Runnable};
2use anyhow::{Context, Result, bail};
3use btrfs_uapi::{
4    send_receive::SendFlags,
5    subvolume::{SubvolumeFlags, subvolume_flags_get, subvolume_info},
6    sysfs::SysfsBtrfs,
7};
8use clap::Parser;
9use std::{
10    fs::File,
11    io::{self, Read, Write},
12    os::{
13        fd::{AsFd, AsRawFd, OwnedFd},
14        unix::io::FromRawFd,
15    },
16    path::{Path, PathBuf},
17    thread,
18};
19
20const HEADING_INCREMENTAL: &str = "Incremental";
21const HEADING_PROTOCOL: &str = "Protocol";
22
23/// Send the subvolume(s) to stdout.
24///
25/// Generate a stream representation of one or more subvolumes that can be
26/// transmitted over the network or stored for later restoration. Streams
27/// are incremental and can be based on a parent subvolume to only send
28/// changes. The stream output is in btrfs send format and can be received
29/// with the receive command. Requires CAP_SYS_ADMIN.
30#[derive(Parser, Debug)]
31pub struct SendCommand {
32    /// Subvolume(s) to send
33    #[clap(required = true)]
34    subvolumes: Vec<PathBuf>,
35
36    /// Omit end-cmd marker between subvolumes
37    #[clap(short = 'e', long)]
38    omit_end_cmd: bool,
39
40    /// Send an incremental stream from parent to the subvolume
41    #[clap(short = 'p', long, help_heading = HEADING_INCREMENTAL)]
42    parent: Option<PathBuf>,
43
44    /// Use this snapshot as a clone source (may be given multiple times)
45    #[clap(short = 'c', long = "clone-src", help_heading = HEADING_INCREMENTAL)]
46    clone_src: Vec<PathBuf>,
47
48    /// Write output to a file instead of stdout
49    #[clap(short = 'f', long)]
50    outfile: Option<PathBuf>,
51
52    /// Send in NO_FILE_DATA mode
53    #[clap(long, help_heading = HEADING_PROTOCOL)]
54    no_data: bool,
55
56    /// Use send protocol version N (0 = highest supported by kernel)
57    #[clap(long, help_heading = HEADING_PROTOCOL)]
58    proto: Option<u32>,
59
60    /// Send compressed data directly without decompressing
61    #[clap(long, help_heading = HEADING_PROTOCOL)]
62    compressed_data: bool,
63}
64
65/// Buffer size for protocol v1 (matches BTRFS_SEND_BUF_SIZE_V1 = 64 KiB).
66const SEND_BUF_SIZE_V1: usize = 64 * 1024;
67/// Buffer size for protocol v2+ (16 KiB + 128 KiB compressed = 144 KiB).
68const SEND_BUF_SIZE_V2: usize = 16 * 1024 + 128 * 1024;
69
70fn open_subvol_ro(path: &Path) -> Result<File> {
71    File::open(path)
72        .with_context(|| format!("cannot open '{}'", path.display()))
73}
74
75fn check_subvol_readonly(file: &File, path: &Path) -> Result<()> {
76    let flags = subvolume_flags_get(file.as_fd()).with_context(|| {
77        format!("failed to get flags for '{}'", path.display())
78    })?;
79    if !flags.contains(SubvolumeFlags::RDONLY) {
80        bail!("subvolume '{}' is not read-only", path.display());
81    }
82    Ok(())
83}
84
85fn get_root_id(file: &File, path: &Path) -> Result<u64> {
86    let info = subvolume_info(file.as_fd()).with_context(|| {
87        format!("failed to get subvolume info for '{}'", path.display())
88    })?;
89    Ok(info.id)
90}
91
92/// Find the best parent among clone sources for incremental send.
93///
94/// Looks for a clone source that shares the same parent UUID as the target
95/// subvolume and picks the one with the closest ctransid.
96fn find_good_parent(
97    subvol_info: &btrfs_uapi::subvolume::SubvolumeInfo,
98    clone_source_paths: &[PathBuf],
99) -> Result<Option<u64>> {
100    if subvol_info.parent_uuid.is_nil() {
101        return Ok(None);
102    }
103
104    let mut best_root_id = None;
105    let mut best_diff = u64::MAX;
106
107    for cs_path in clone_source_paths {
108        let cs_file = open_subvol_ro(cs_path)?;
109        let cs_info = subvolume_info(cs_file.as_fd()).with_context(|| {
110            format!(
111                "failed to get info for clone source '{}'",
112                cs_path.display()
113            )
114        })?;
115
116        // Check if this clone source shares the same parent or IS the parent.
117        if cs_info.parent_uuid != subvol_info.parent_uuid
118            && cs_info.uuid != subvol_info.parent_uuid
119        {
120            continue;
121        }
122
123        let diff = subvol_info.ctransid.abs_diff(cs_info.ctransid);
124        if diff < best_diff {
125            best_diff = diff;
126            best_root_id = Some(cs_info.id);
127        }
128    }
129
130    Ok(best_root_id)
131}
132
133/// Create a pipe and return (read_end, write_end) as OwnedFds.
134fn make_pipe() -> Result<(OwnedFd, OwnedFd)> {
135    let mut fds = [0i32; 2];
136    let ret = unsafe { nix::libc::pipe(fds.as_mut_ptr()) };
137    if ret < 0 {
138        return Err(io::Error::last_os_error())
139            .context("failed to create pipe");
140    }
141    // SAFETY: pipe() just returned two valid fds.
142    let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) };
143    let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) };
144    Ok((read_end, write_end))
145}
146
147/// Spawn a thread that reads from `read_fd` and writes everything to `out`.
148fn spawn_reader_thread(
149    read_fd: OwnedFd,
150    mut out: Box<dyn Write + Send>,
151    buf_size: usize,
152) -> thread::JoinHandle<Result<()>> {
153    thread::spawn(move || {
154        let mut file = File::from(read_fd);
155        let mut buf = vec![0u8; buf_size];
156        loop {
157            let n = file
158                .read(&mut buf)
159                .context("failed to read send stream from kernel")?;
160            if n == 0 {
161                return Ok(());
162            }
163            out.write_all(&buf[..n])
164                .context("failed to write send stream to output")?;
165        }
166    })
167}
168
169/// Open or create the output writer for the reader thread.
170fn open_output(outfile: &Option<PathBuf>) -> Result<Box<dyn Write + Send>> {
171    match outfile {
172        Some(path) => {
173            let file =
174                File::options().append(true).open(path).with_context(|| {
175                    format!("cannot open '{}' for writing", path.display())
176                })?;
177            Ok(Box::new(file))
178        }
179        None => Ok(Box::new(io::stdout())),
180    }
181}
182
183impl Runnable for SendCommand {
184    fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
185        // Validate output destination.
186        if let Some(path) = &self.outfile {
187            // Try opening existing file first, then create. Truncate since
188            // this is the start of a new send.
189            File::options()
190                .write(true)
191                .truncate(true)
192                .open(path)
193                .or_else(|_| {
194                    File::options()
195                        .write(true)
196                        .truncate(true)
197                        .create(true)
198                        .open(path)
199                })
200                .with_context(|| {
201                    format!("cannot create '{}'", path.display())
202                })?;
203        } else {
204            let stdout = io::stdout();
205            if unsafe { nix::libc::isatty(stdout.as_fd().as_raw_fd()) } == 1 {
206                bail!(
207                    "not dumping send stream into a terminal, redirect it into a file"
208                );
209            }
210        }
211
212        // Validate all subvolumes are read-only.
213        for subvol_path in &self.subvolumes {
214            let file = open_subvol_ro(subvol_path)?;
215            check_subvol_readonly(&file, subvol_path)?;
216        }
217
218        // Validate parent is read-only and get its root ID.
219        let mut parent_root_id: u64 = 0;
220        if let Some(parent_path) = &self.parent {
221            let file = open_subvol_ro(parent_path)?;
222            check_subvol_readonly(&file, parent_path)?;
223            parent_root_id = get_root_id(&file, parent_path)?;
224        }
225
226        // Collect clone source root IDs and validate they are read-only.
227        let mut clone_sources: Vec<u64> = Vec::new();
228        for cs_path in &self.clone_src {
229            let file = open_subvol_ro(cs_path)?;
230            check_subvol_readonly(&file, cs_path)?;
231            clone_sources.push(get_root_id(&file, cs_path)?);
232        }
233
234        // If a parent was given, add it to clone sources (matches C behavior).
235        if self.parent.is_some() && !clone_sources.contains(&parent_root_id) {
236            clone_sources.push(parent_root_id);
237        }
238
239        let full_send = self.parent.is_none() && self.clone_src.is_empty();
240
241        // Determine protocol version.
242        let first_file = open_subvol_ro(&self.subvolumes[0])?;
243        let fs = btrfs_uapi::filesystem::filesystem_info(first_file.as_fd())
244            .context("failed to get filesystem info")?;
245        let sysfs = SysfsBtrfs::new(&fs.uuid);
246        let proto_supported = sysfs.send_stream_version();
247
248        let mut proto = self.proto.unwrap_or(1);
249        if proto == 0 {
250            proto = proto_supported;
251        }
252
253        if proto > proto_supported && proto_supported == 1 {
254            bail!(
255                "requested protocol version {proto} but kernel supports only {proto_supported}"
256            );
257        }
258
259        // Build send flags.
260        let mut flags = SendFlags::empty();
261        if self.no_data {
262            flags |= SendFlags::NO_FILE_DATA;
263        }
264        if self.compressed_data {
265            if proto == 1 && self.proto.is_none() {
266                proto = 2;
267            }
268            if proto < 2 {
269                bail!(
270                    "--compressed-data requires protocol version >= 2 (requested {proto})"
271                );
272            }
273            if proto_supported < 2 {
274                bail!("kernel does not support --compressed-data");
275            }
276            flags |= SendFlags::COMPRESSED;
277        }
278        if proto_supported > 1 {
279            flags |= SendFlags::VERSION;
280        }
281
282        let buf_size = if proto > 1 {
283            SEND_BUF_SIZE_V2
284        } else {
285            SEND_BUF_SIZE_V1
286        };
287
288        // Send each subvolume.
289        let count = self.subvolumes.len();
290        for (i, subvol_path) in self.subvolumes.iter().enumerate() {
291            let is_first = i == 0;
292            let is_last = i == count - 1;
293
294            eprintln!("At subvol {}", subvol_path.display());
295
296            let subvol_file = open_subvol_ro(subvol_path)?;
297
298            // For incremental send without an explicit parent, find the best
299            // parent among clone sources.
300            let mut this_parent = parent_root_id;
301            if !full_send && self.parent.is_none() {
302                let info =
303                    subvolume_info(subvol_file.as_fd()).with_context(|| {
304                        format!(
305                            "failed to get info for '{}'",
306                            subvol_path.display()
307                        )
308                    })?;
309                match find_good_parent(&info, &self.clone_src)? {
310                    Some(id) => this_parent = id,
311                    None => bail!(
312                        "cannot find a suitable parent for '{}' among clone sources",
313                        subvol_path.display()
314                    ),
315                }
316            }
317
318            // Build per-subvolume flags.
319            let mut subvol_flags = flags;
320            if self.omit_end_cmd {
321                if !is_first {
322                    subvol_flags |= SendFlags::OMIT_STREAM_HEADER;
323                }
324                if !is_last {
325                    subvol_flags |= SendFlags::OMIT_END_CMD;
326                }
327            }
328
329            // Create pipe and spawn reader thread.
330            let (pipe_read, pipe_write) = make_pipe()?;
331            let out = open_output(&self.outfile)?;
332            let reader = spawn_reader_thread(pipe_read, out, buf_size);
333
334            let send_result = btrfs_uapi::send_receive::send(
335                subvol_file.as_fd(),
336                pipe_write.as_raw_fd(),
337                this_parent,
338                &mut clone_sources,
339                subvol_flags,
340                proto,
341            );
342
343            // Close write end so the reader thread sees EOF.
344            drop(pipe_write);
345
346            if let Err(e) = send_result {
347                let _ = reader.join();
348                if e == nix::errno::Errno::EINVAL && self.omit_end_cmd {
349                    bail!(
350                        "send ioctl failed: {e}\n\
351                         Try upgrading your kernel or don't use -e."
352                    );
353                }
354                return Err(e).with_context(|| {
355                    format!("send failed for '{}'", subvol_path.display())
356                });
357            }
358
359            match reader.join() {
360                Ok(Ok(())) => {}
361                Ok(Err(e)) => {
362                    return Err(e).context("send stream reader failed");
363                }
364                Err(_) => bail!("send stream reader thread panicked"),
365            }
366
367            // After sending, add to clone sources for subsequent subvolumes.
368            if !full_send && self.parent.is_none() {
369                let root_id = get_root_id(&subvol_file, subvol_path)?;
370                if !clone_sources.contains(&root_id) {
371                    clone_sources.push(root_id);
372                }
373            }
374        }
375
376        Ok(())
377    }
378}