1use crate::{Format, Runnable};
2use anyhow::{Context, Result, bail};
3use btrfs_uapi::{
4 send_receive::SendFlags,
5 subvolume::{SubvolumeFlags, subvolume_flags_get, subvolume_info},
6 sysfs::SysfsBtrfs,
7};
8use clap::Parser;
9use std::{
10 fs::File,
11 io::{self, Read, Write},
12 os::{
13 fd::{AsFd, AsRawFd, OwnedFd},
14 unix::io::FromRawFd,
15 },
16 path::{Path, PathBuf},
17 thread,
18};
19
20const HEADING_INCREMENTAL: &str = "Incremental";
21const HEADING_PROTOCOL: &str = "Protocol";
22
23#[derive(Parser, Debug)]
31#[allow(clippy::doc_markdown)]
32pub struct SendCommand {
33 #[clap(required = true)]
35 subvolumes: Vec<PathBuf>,
36
37 #[clap(short = 'e', long)]
39 omit_end_cmd: bool,
40
41 #[clap(short = 'p', long, help_heading = HEADING_INCREMENTAL)]
43 parent: Option<PathBuf>,
44
45 #[clap(short = 'c', long = "clone-src", help_heading = HEADING_INCREMENTAL)]
47 clone_src: Vec<PathBuf>,
48
49 #[clap(short = 'f', long)]
51 outfile: Option<PathBuf>,
52
53 #[clap(long, help_heading = HEADING_PROTOCOL)]
55 no_data: bool,
56
57 #[clap(long, help_heading = HEADING_PROTOCOL)]
59 proto: Option<u32>,
60
61 #[clap(long, help_heading = HEADING_PROTOCOL)]
63 compressed_data: bool,
64}
65
66const SEND_BUF_SIZE_V1: usize = 64 * 1024;
68const SEND_BUF_SIZE_V2: usize = 16 * 1024 + 128 * 1024;
70
71fn open_subvol_ro(path: &Path) -> Result<File> {
72 File::open(path)
73 .with_context(|| format!("cannot open '{}'", path.display()))
74}
75
76fn check_subvol_readonly(file: &File, path: &Path) -> Result<()> {
77 let flags = subvolume_flags_get(file.as_fd()).with_context(|| {
78 format!("failed to get flags for '{}'", path.display())
79 })?;
80 if !flags.contains(SubvolumeFlags::RDONLY) {
81 bail!("subvolume '{}' is not read-only", path.display());
82 }
83 Ok(())
84}
85
86fn get_root_id(file: &File, path: &Path) -> Result<u64> {
87 let info = subvolume_info(file.as_fd()).with_context(|| {
88 format!("failed to get subvolume info for '{}'", path.display())
89 })?;
90 Ok(info.id)
91}
92
93fn find_good_parent(
98 subvol_info: &btrfs_uapi::subvolume::SubvolumeInfo,
99 clone_source_paths: &[PathBuf],
100) -> Result<Option<u64>> {
101 if subvol_info.parent_uuid.is_nil() {
102 return Ok(None);
103 }
104
105 let mut best_root_id = None;
106 let mut best_diff = u64::MAX;
107
108 for cs_path in clone_source_paths {
109 let cs_file = open_subvol_ro(cs_path)?;
110 let cs_info = subvolume_info(cs_file.as_fd()).with_context(|| {
111 format!(
112 "failed to get info for clone source '{}'",
113 cs_path.display()
114 )
115 })?;
116
117 if cs_info.parent_uuid != subvol_info.parent_uuid
119 && cs_info.uuid != subvol_info.parent_uuid
120 {
121 continue;
122 }
123
124 let diff = subvol_info.ctransid.abs_diff(cs_info.ctransid);
125 if diff < best_diff {
126 best_diff = diff;
127 best_root_id = Some(cs_info.id);
128 }
129 }
130
131 Ok(best_root_id)
132}
133
134fn make_pipe() -> Result<(OwnedFd, OwnedFd)> {
136 let mut fds = [0i32; 2];
137 let ret = unsafe { nix::libc::pipe(fds.as_mut_ptr()) };
138 if ret < 0 {
139 return Err(io::Error::last_os_error())
140 .context("failed to create pipe");
141 }
142 let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) };
144 let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) };
145 Ok((read_end, write_end))
146}
147
148fn spawn_reader_thread(
150 read_fd: OwnedFd,
151 mut out: Box<dyn Write + Send>,
152 buf_size: usize,
153) -> thread::JoinHandle<Result<()>> {
154 thread::spawn(move || {
155 let mut file = File::from(read_fd);
156 let mut buf = vec![0u8; buf_size];
157 loop {
158 let n = file
159 .read(&mut buf)
160 .context("failed to read send stream from kernel")?;
161 if n == 0 {
162 return Ok(());
163 }
164 out.write_all(&buf[..n])
165 .context("failed to write send stream to output")?;
166 }
167 })
168}
169
170fn open_output(outfile: Option<&PathBuf>) -> Result<Box<dyn Write + Send>> {
172 match outfile {
173 Some(path) => {
174 let file =
175 File::options().append(true).open(path).with_context(|| {
176 format!("cannot open '{}' for writing", path.display())
177 })?;
178 Ok(Box::new(file))
179 }
180 None => Ok(Box::new(io::stdout())),
181 }
182}
183
184impl Runnable for SendCommand {
185 #[allow(clippy::too_many_lines)]
186 fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
187 if let Some(path) = &self.outfile {
189 File::options()
192 .write(true)
193 .truncate(true)
194 .open(path)
195 .or_else(|_| {
196 File::options()
197 .write(true)
198 .truncate(true)
199 .create(true)
200 .open(path)
201 })
202 .with_context(|| {
203 format!("cannot create '{}'", path.display())
204 })?;
205 } else {
206 let stdout = io::stdout();
207 if unsafe { nix::libc::isatty(stdout.as_fd().as_raw_fd()) } == 1 {
208 bail!(
209 "not dumping send stream into a terminal, redirect it into a file"
210 );
211 }
212 }
213
214 for subvol_path in &self.subvolumes {
216 let file = open_subvol_ro(subvol_path)?;
217 check_subvol_readonly(&file, subvol_path)?;
218 }
219
220 let mut parent_root_id: u64 = 0;
222 if let Some(parent_path) = &self.parent {
223 let file = open_subvol_ro(parent_path)?;
224 check_subvol_readonly(&file, parent_path)?;
225 parent_root_id = get_root_id(&file, parent_path)?;
226 }
227
228 let mut clone_sources: Vec<u64> = Vec::new();
230 for cs_path in &self.clone_src {
231 let file = open_subvol_ro(cs_path)?;
232 check_subvol_readonly(&file, cs_path)?;
233 clone_sources.push(get_root_id(&file, cs_path)?);
234 }
235
236 if self.parent.is_some() && !clone_sources.contains(&parent_root_id) {
238 clone_sources.push(parent_root_id);
239 }
240
241 let full_send = self.parent.is_none() && self.clone_src.is_empty();
242
243 let first_file = open_subvol_ro(&self.subvolumes[0])?;
245 let fs = btrfs_uapi::filesystem::filesystem_info(first_file.as_fd())
246 .context("failed to get filesystem info")?;
247 let sysfs = SysfsBtrfs::new(&fs.uuid);
248 let proto_supported = sysfs.send_stream_version();
249
250 let mut proto = self.proto.unwrap_or(1);
251 if proto == 0 {
252 proto = proto_supported;
253 }
254
255 if proto > proto_supported && proto_supported == 1 {
256 bail!(
257 "requested protocol version {proto} but kernel supports only {proto_supported}"
258 );
259 }
260
261 let mut flags = SendFlags::empty();
263 if self.no_data {
264 flags |= SendFlags::NO_FILE_DATA;
265 }
266 if self.compressed_data {
267 if proto == 1 && self.proto.is_none() {
268 proto = 2;
269 }
270 if proto < 2 {
271 bail!(
272 "--compressed-data requires protocol version >= 2 (requested {proto})"
273 );
274 }
275 if proto_supported < 2 {
276 bail!("kernel does not support --compressed-data");
277 }
278 flags |= SendFlags::COMPRESSED;
279 }
280 if proto_supported > 1 {
281 flags |= SendFlags::VERSION;
282 }
283
284 let buf_size = if proto > 1 {
285 SEND_BUF_SIZE_V2
286 } else {
287 SEND_BUF_SIZE_V1
288 };
289
290 let count = self.subvolumes.len();
292 for (i, subvol_path) in self.subvolumes.iter().enumerate() {
293 let is_first = i == 0;
294 let is_last = i == count - 1;
295
296 eprintln!("At subvol {}", subvol_path.display());
297
298 let subvol_file = open_subvol_ro(subvol_path)?;
299
300 let mut this_parent = parent_root_id;
303 if !full_send && self.parent.is_none() {
304 let info =
305 subvolume_info(subvol_file.as_fd()).with_context(|| {
306 format!(
307 "failed to get info for '{}'",
308 subvol_path.display()
309 )
310 })?;
311 match find_good_parent(&info, &self.clone_src)? {
312 Some(id) => this_parent = id,
313 None => bail!(
314 "cannot find a suitable parent for '{}' among clone sources",
315 subvol_path.display()
316 ),
317 }
318 }
319
320 let mut subvol_flags = flags;
322 if self.omit_end_cmd {
323 if !is_first {
324 subvol_flags |= SendFlags::OMIT_STREAM_HEADER;
325 }
326 if !is_last {
327 subvol_flags |= SendFlags::OMIT_END_CMD;
328 }
329 }
330
331 let (pipe_read, pipe_write) = make_pipe()?;
333 let out = open_output(self.outfile.as_ref())?;
334 let reader = spawn_reader_thread(pipe_read, out, buf_size);
335
336 let send_result = btrfs_uapi::send_receive::send(
337 subvol_file.as_fd(),
338 pipe_write.as_raw_fd(),
339 this_parent,
340 &mut clone_sources,
341 subvol_flags,
342 proto,
343 );
344
345 drop(pipe_write);
347
348 if let Err(e) = send_result {
349 let _ = reader.join();
350 if e == nix::errno::Errno::EINVAL && self.omit_end_cmd {
351 bail!(
352 "send ioctl failed: {e}\n\
353 Try upgrading your kernel or don't use -e."
354 );
355 }
356 return Err(e).with_context(|| {
357 format!("send failed for '{}'", subvol_path.display())
358 });
359 }
360
361 match reader.join() {
362 Ok(Ok(())) => {}
363 Ok(Err(e)) => {
364 return Err(e).context("send stream reader failed");
365 }
366 Err(_) => bail!("send stream reader thread panicked"),
367 }
368
369 if !full_send && self.parent.is_none() {
371 let root_id = get_root_id(&subvol_file, subvol_path)?;
372 if !clone_sources.contains(&root_id) {
373 clone_sources.push(root_id);
374 }
375 }
376 }
377
378 Ok(())
379 }
380}