Skip to main content

btrfs_cli/
send.rs

1use crate::{Format, Runnable};
2use anyhow::{Context, Result, bail};
3use btrfs_uapi::{
4    send::SendFlags,
5    subvolume::{SubvolumeFlags, subvolume_flags_get, subvolume_info},
6    sysfs::SysfsBtrfs,
7};
8use clap::Parser;
9use std::{
10    fs::File,
11    io::{self, Read, Write},
12    os::{
13        fd::{AsFd, AsRawFd, OwnedFd},
14        unix::io::FromRawFd,
15    },
16    path::PathBuf,
17    thread,
18};
19
20/// Send the subvolume(s) to stdout.
21///
22/// Generate a stream representation of one or more subvolumes that can be
23/// transmitted over the network or stored for later restoration. Streams
24/// are incremental and can be based on a parent subvolume to only send
25/// changes. The stream output is in btrfs send format and can be received
26/// with the receive command. Requires CAP_SYS_ADMIN.
27#[derive(Parser, Debug)]
28pub struct SendCommand {
29    /// Subvolume(s) to send
30    #[clap(required = true)]
31    subvolumes: Vec<PathBuf>,
32
33    /// Omit end-cmd marker between subvolumes
34    #[clap(short = 'e')]
35    omit_end_cmd: bool,
36
37    /// Send an incremental stream from parent to the subvolume
38    #[clap(short = 'p', long)]
39    parent: Option<PathBuf>,
40
41    /// Use this snapshot as a clone source (may be given multiple times)
42    #[clap(short = 'c', long = "clone-src")]
43    clone_src: Vec<PathBuf>,
44
45    /// Write output to a file instead of stdout
46    #[clap(short = 'f', long)]
47    outfile: Option<PathBuf>,
48
49    /// Send in NO_FILE_DATA mode
50    #[clap(long)]
51    no_data: bool,
52
53    /// Use send protocol version N (0 = highest supported by kernel)
54    #[clap(long)]
55    proto: Option<u32>,
56
57    /// Send compressed data directly without decompressing
58    #[clap(long)]
59    compressed_data: bool,
60}
61
62/// Buffer size for protocol v1 (matches BTRFS_SEND_BUF_SIZE_V1 = 64 KiB).
63const SEND_BUF_SIZE_V1: usize = 64 * 1024;
64/// Buffer size for protocol v2+ (16 KiB + 128 KiB compressed = 144 KiB).
65const SEND_BUF_SIZE_V2: usize = 16 * 1024 + 128 * 1024;
66
67fn open_subvol_ro(path: &PathBuf) -> Result<File> {
68    File::open(path).with_context(|| format!("cannot open '{}'", path.display()))
69}
70
71fn check_subvol_readonly(file: &File, path: &PathBuf) -> Result<()> {
72    let flags = subvolume_flags_get(file.as_fd())
73        .with_context(|| format!("failed to get flags for '{}'", path.display()))?;
74    if !flags.contains(SubvolumeFlags::RDONLY) {
75        bail!("subvolume '{}' is not read-only", path.display());
76    }
77    Ok(())
78}
79
80fn get_root_id(file: &File, path: &PathBuf) -> Result<u64> {
81    let info = subvolume_info(file.as_fd())
82        .with_context(|| format!("failed to get subvolume info for '{}'", path.display()))?;
83    Ok(info.id)
84}
85
86/// Find the best parent among clone sources for incremental send.
87///
88/// Looks for a clone source that shares the same parent UUID as the target
89/// subvolume and picks the one with the closest ctransid.
90fn find_good_parent(
91    subvol_info: &btrfs_uapi::subvolume::SubvolumeInfo,
92    clone_source_paths: &[PathBuf],
93) -> Result<Option<u64>> {
94    if subvol_info.parent_uuid.is_nil() {
95        return Ok(None);
96    }
97
98    let mut best_root_id = None;
99    let mut best_diff = u64::MAX;
100
101    for cs_path in clone_source_paths {
102        let cs_file = open_subvol_ro(cs_path)?;
103        let cs_info = subvolume_info(cs_file.as_fd()).with_context(|| {
104            format!(
105                "failed to get info for clone source '{}'",
106                cs_path.display()
107            )
108        })?;
109
110        // Check if this clone source shares the same parent or IS the parent.
111        if cs_info.parent_uuid != subvol_info.parent_uuid && cs_info.uuid != subvol_info.parent_uuid
112        {
113            continue;
114        }
115
116        let diff = subvol_info.ctransid.abs_diff(cs_info.ctransid);
117        if diff < best_diff {
118            best_diff = diff;
119            best_root_id = Some(cs_info.id);
120        }
121    }
122
123    Ok(best_root_id)
124}
125
126/// Create a pipe and return (read_end, write_end) as OwnedFds.
127fn make_pipe() -> Result<(OwnedFd, OwnedFd)> {
128    let mut fds = [0i32; 2];
129    let ret = unsafe { nix::libc::pipe(fds.as_mut_ptr()) };
130    if ret < 0 {
131        return Err(io::Error::last_os_error()).context("failed to create pipe");
132    }
133    // SAFETY: pipe() just returned two valid fds.
134    let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) };
135    let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) };
136    Ok((read_end, write_end))
137}
138
139/// Spawn a thread that reads from `read_fd` and writes everything to `out`.
140fn spawn_reader_thread(
141    read_fd: OwnedFd,
142    mut out: Box<dyn Write + Send>,
143    buf_size: usize,
144) -> thread::JoinHandle<Result<()>> {
145    thread::spawn(move || {
146        let mut file = File::from(read_fd);
147        let mut buf = vec![0u8; buf_size];
148        loop {
149            let n = file
150                .read(&mut buf)
151                .context("failed to read send stream from kernel")?;
152            if n == 0 {
153                return Ok(());
154            }
155            out.write_all(&buf[..n])
156                .context("failed to write send stream to output")?;
157        }
158    })
159}
160
161/// Open or create the output writer for the reader thread.
162fn open_output(outfile: &Option<PathBuf>) -> Result<Box<dyn Write + Send>> {
163    match outfile {
164        Some(path) => {
165            let file = File::options()
166                .write(true)
167                .append(true)
168                .open(path)
169                .with_context(|| format!("cannot open '{}' for writing", path.display()))?;
170            Ok(Box::new(file))
171        }
172        None => Ok(Box::new(io::stdout())),
173    }
174}
175
176impl Runnable for SendCommand {
177    fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
178        // Validate output destination.
179        if let Some(path) = &self.outfile {
180            // Try opening existing file first, then create. Truncate since
181            // this is the start of a new send.
182            File::options()
183                .write(true)
184                .truncate(true)
185                .open(path)
186                .or_else(|_| {
187                    File::options()
188                        .write(true)
189                        .truncate(true)
190                        .create(true)
191                        .open(path)
192                })
193                .with_context(|| format!("cannot create '{}'", path.display()))?;
194        } else {
195            let stdout = io::stdout();
196            if unsafe { nix::libc::isatty(stdout.as_fd().as_raw_fd()) } == 1 {
197                bail!("not dumping send stream into a terminal, redirect it into a file");
198            }
199        }
200
201        // Validate all subvolumes are read-only.
202        for subvol_path in &self.subvolumes {
203            let file = open_subvol_ro(subvol_path)?;
204            check_subvol_readonly(&file, subvol_path)?;
205        }
206
207        // Validate parent is read-only and get its root ID.
208        let mut parent_root_id: u64 = 0;
209        if let Some(parent_path) = &self.parent {
210            let file = open_subvol_ro(parent_path)?;
211            check_subvol_readonly(&file, parent_path)?;
212            parent_root_id = get_root_id(&file, parent_path)?;
213        }
214
215        // Collect clone source root IDs and validate they are read-only.
216        let mut clone_sources: Vec<u64> = Vec::new();
217        for cs_path in &self.clone_src {
218            let file = open_subvol_ro(cs_path)?;
219            check_subvol_readonly(&file, cs_path)?;
220            clone_sources.push(get_root_id(&file, cs_path)?);
221        }
222
223        // If a parent was given, add it to clone sources (matches C behavior).
224        if self.parent.is_some() && !clone_sources.contains(&parent_root_id) {
225            clone_sources.push(parent_root_id);
226        }
227
228        let full_send = self.parent.is_none() && self.clone_src.is_empty();
229
230        // Determine protocol version.
231        let first_file = open_subvol_ro(&self.subvolumes[0])?;
232        let fs = btrfs_uapi::filesystem::fs_info(first_file.as_fd())
233            .context("failed to get filesystem info")?;
234        let sysfs = SysfsBtrfs::new(&fs.uuid);
235        let proto_supported = sysfs.send_stream_version();
236
237        let mut proto = self.proto.unwrap_or(1);
238        if proto == 0 {
239            proto = proto_supported;
240        }
241
242        if proto > proto_supported && proto_supported == 1 {
243            bail!(
244                "requested protocol version {} but kernel supports only {}",
245                proto,
246                proto_supported
247            );
248        }
249
250        // Build send flags.
251        let mut flags = SendFlags::empty();
252        if self.no_data {
253            flags |= SendFlags::NO_FILE_DATA;
254        }
255        if self.compressed_data {
256            if proto == 1 && self.proto.is_none() {
257                proto = 2;
258            }
259            if proto < 2 {
260                bail!("--compressed-data requires protocol version >= 2 (requested {proto})");
261            }
262            if proto_supported < 2 {
263                bail!("kernel does not support --compressed-data");
264            }
265            flags |= SendFlags::COMPRESSED;
266        }
267        if proto_supported > 1 {
268            flags |= SendFlags::VERSION;
269        }
270
271        let buf_size = if proto > 1 {
272            SEND_BUF_SIZE_V2
273        } else {
274            SEND_BUF_SIZE_V1
275        };
276
277        // Send each subvolume.
278        let count = self.subvolumes.len();
279        for (i, subvol_path) in self.subvolumes.iter().enumerate() {
280            let is_first = i == 0;
281            let is_last = i == count - 1;
282
283            eprintln!("At subvol {}", subvol_path.display());
284
285            let subvol_file = open_subvol_ro(subvol_path)?;
286
287            // For incremental send without an explicit parent, find the best
288            // parent among clone sources.
289            let mut this_parent = parent_root_id;
290            if !full_send && self.parent.is_none() {
291                let info = subvolume_info(subvol_file.as_fd()).with_context(|| {
292                    format!("failed to get info for '{}'", subvol_path.display())
293                })?;
294                match find_good_parent(&info, &self.clone_src)? {
295                    Some(id) => this_parent = id,
296                    None => bail!(
297                        "cannot find a suitable parent for '{}' among clone sources",
298                        subvol_path.display()
299                    ),
300                }
301            }
302
303            // Build per-subvolume flags.
304            let mut subvol_flags = flags;
305            if self.omit_end_cmd {
306                if !is_first {
307                    subvol_flags |= SendFlags::OMIT_STREAM_HEADER;
308                }
309                if !is_last {
310                    subvol_flags |= SendFlags::OMIT_END_CMD;
311                }
312            }
313
314            // Create pipe and spawn reader thread.
315            let (pipe_read, pipe_write) = make_pipe()?;
316            let out = open_output(&self.outfile)?;
317            let reader = spawn_reader_thread(pipe_read, out, buf_size);
318
319            let send_result = btrfs_uapi::send::send(
320                subvol_file.as_fd(),
321                pipe_write.as_raw_fd(),
322                this_parent,
323                &mut clone_sources,
324                subvol_flags,
325                proto,
326            );
327
328            // Close write end so the reader thread sees EOF.
329            drop(pipe_write);
330
331            if let Err(e) = send_result {
332                let _ = reader.join();
333                if e == nix::errno::Errno::EINVAL && self.omit_end_cmd {
334                    bail!(
335                        "send ioctl failed: {e}\n\
336                         Try upgrading your kernel or don't use -e."
337                    );
338                }
339                return Err(e)
340                    .with_context(|| format!("send failed for '{}'", subvol_path.display()));
341            }
342
343            match reader.join() {
344                Ok(Ok(())) => {}
345                Ok(Err(e)) => return Err(e).context("send stream reader failed"),
346                Err(_) => bail!("send stream reader thread panicked"),
347            }
348
349            // After sending, add to clone sources for subsequent subvolumes.
350            if !full_send && self.parent.is_none() {
351                let root_id = get_root_id(&subvol_file, subvol_path)?;
352                if !clone_sources.contains(&root_id) {
353                    clone_sources.push(root_id);
354                }
355            }
356        }
357
358        Ok(())
359    }
360}