1use crate::{Format, Runnable};
2use anyhow::{Context, Result, bail};
3use btrfs_uapi::{
4 send_receive::SendFlags,
5 subvolume::{SubvolumeFlags, subvolume_flags_get, subvolume_info},
6 sysfs::SysfsBtrfs,
7};
8use clap::Parser;
9use std::{
10 fs::File,
11 io::{self, Read, Write},
12 os::{
13 fd::{AsFd, AsRawFd, OwnedFd},
14 unix::io::FromRawFd,
15 },
16 path::PathBuf,
17 thread,
18};
19
20const HEADING_INCREMENTAL: &str = "Incremental";
21const HEADING_PROTOCOL: &str = "Protocol";
22
23#[derive(Parser, Debug)]
31pub struct SendCommand {
32 #[clap(required = true)]
34 subvolumes: Vec<PathBuf>,
35
36 #[clap(short = 'e', long)]
38 omit_end_cmd: bool,
39
40 #[clap(short = 'p', long, help_heading = HEADING_INCREMENTAL)]
42 parent: Option<PathBuf>,
43
44 #[clap(short = 'c', long = "clone-src", help_heading = HEADING_INCREMENTAL)]
46 clone_src: Vec<PathBuf>,
47
48 #[clap(short = 'f', long)]
50 outfile: Option<PathBuf>,
51
52 #[clap(long, help_heading = HEADING_PROTOCOL)]
54 no_data: bool,
55
56 #[clap(long, help_heading = HEADING_PROTOCOL)]
58 proto: Option<u32>,
59
60 #[clap(long, help_heading = HEADING_PROTOCOL)]
62 compressed_data: bool,
63}
64
65const SEND_BUF_SIZE_V1: usize = 64 * 1024;
67const SEND_BUF_SIZE_V2: usize = 16 * 1024 + 128 * 1024;
69
70fn open_subvol_ro(path: &PathBuf) -> Result<File> {
71 File::open(path)
72 .with_context(|| format!("cannot open '{}'", path.display()))
73}
74
75fn check_subvol_readonly(file: &File, path: &PathBuf) -> Result<()> {
76 let flags = subvolume_flags_get(file.as_fd()).with_context(|| {
77 format!("failed to get flags for '{}'", path.display())
78 })?;
79 if !flags.contains(SubvolumeFlags::RDONLY) {
80 bail!("subvolume '{}' is not read-only", path.display());
81 }
82 Ok(())
83}
84
85fn get_root_id(file: &File, path: &PathBuf) -> Result<u64> {
86 let info = subvolume_info(file.as_fd()).with_context(|| {
87 format!("failed to get subvolume info for '{}'", path.display())
88 })?;
89 Ok(info.id)
90}
91
92fn find_good_parent(
97 subvol_info: &btrfs_uapi::subvolume::SubvolumeInfo,
98 clone_source_paths: &[PathBuf],
99) -> Result<Option<u64>> {
100 if subvol_info.parent_uuid.is_nil() {
101 return Ok(None);
102 }
103
104 let mut best_root_id = None;
105 let mut best_diff = u64::MAX;
106
107 for cs_path in clone_source_paths {
108 let cs_file = open_subvol_ro(cs_path)?;
109 let cs_info = subvolume_info(cs_file.as_fd()).with_context(|| {
110 format!(
111 "failed to get info for clone source '{}'",
112 cs_path.display()
113 )
114 })?;
115
116 if cs_info.parent_uuid != subvol_info.parent_uuid
118 && cs_info.uuid != subvol_info.parent_uuid
119 {
120 continue;
121 }
122
123 let diff = subvol_info.ctransid.abs_diff(cs_info.ctransid);
124 if diff < best_diff {
125 best_diff = diff;
126 best_root_id = Some(cs_info.id);
127 }
128 }
129
130 Ok(best_root_id)
131}
132
133fn make_pipe() -> Result<(OwnedFd, OwnedFd)> {
135 let mut fds = [0i32; 2];
136 let ret = unsafe { nix::libc::pipe(fds.as_mut_ptr()) };
137 if ret < 0 {
138 return Err(io::Error::last_os_error())
139 .context("failed to create pipe");
140 }
141 let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) };
143 let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) };
144 Ok((read_end, write_end))
145}
146
147fn spawn_reader_thread(
149 read_fd: OwnedFd,
150 mut out: Box<dyn Write + Send>,
151 buf_size: usize,
152) -> thread::JoinHandle<Result<()>> {
153 thread::spawn(move || {
154 let mut file = File::from(read_fd);
155 let mut buf = vec![0u8; buf_size];
156 loop {
157 let n = file
158 .read(&mut buf)
159 .context("failed to read send stream from kernel")?;
160 if n == 0 {
161 return Ok(());
162 }
163 out.write_all(&buf[..n])
164 .context("failed to write send stream to output")?;
165 }
166 })
167}
168
169fn open_output(outfile: &Option<PathBuf>) -> Result<Box<dyn Write + Send>> {
171 match outfile {
172 Some(path) => {
173 let file = File::options()
174 .write(true)
175 .append(true)
176 .open(path)
177 .with_context(|| {
178 format!("cannot open '{}' for writing", path.display())
179 })?;
180 Ok(Box::new(file))
181 }
182 None => Ok(Box::new(io::stdout())),
183 }
184}
185
186impl Runnable for SendCommand {
187 fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
188 if let Some(path) = &self.outfile {
190 File::options()
193 .write(true)
194 .truncate(true)
195 .open(path)
196 .or_else(|_| {
197 File::options()
198 .write(true)
199 .truncate(true)
200 .create(true)
201 .open(path)
202 })
203 .with_context(|| {
204 format!("cannot create '{}'", path.display())
205 })?;
206 } else {
207 let stdout = io::stdout();
208 if unsafe { nix::libc::isatty(stdout.as_fd().as_raw_fd()) } == 1 {
209 bail!(
210 "not dumping send stream into a terminal, redirect it into a file"
211 );
212 }
213 }
214
215 for subvol_path in &self.subvolumes {
217 let file = open_subvol_ro(subvol_path)?;
218 check_subvol_readonly(&file, subvol_path)?;
219 }
220
221 let mut parent_root_id: u64 = 0;
223 if let Some(parent_path) = &self.parent {
224 let file = open_subvol_ro(parent_path)?;
225 check_subvol_readonly(&file, parent_path)?;
226 parent_root_id = get_root_id(&file, parent_path)?;
227 }
228
229 let mut clone_sources: Vec<u64> = Vec::new();
231 for cs_path in &self.clone_src {
232 let file = open_subvol_ro(cs_path)?;
233 check_subvol_readonly(&file, cs_path)?;
234 clone_sources.push(get_root_id(&file, cs_path)?);
235 }
236
237 if self.parent.is_some() && !clone_sources.contains(&parent_root_id) {
239 clone_sources.push(parent_root_id);
240 }
241
242 let full_send = self.parent.is_none() && self.clone_src.is_empty();
243
244 let first_file = open_subvol_ro(&self.subvolumes[0])?;
246 let fs = btrfs_uapi::filesystem::filesystem_info(first_file.as_fd())
247 .context("failed to get filesystem info")?;
248 let sysfs = SysfsBtrfs::new(&fs.uuid);
249 let proto_supported = sysfs.send_stream_version();
250
251 let mut proto = self.proto.unwrap_or(1);
252 if proto == 0 {
253 proto = proto_supported;
254 }
255
256 if proto > proto_supported && proto_supported == 1 {
257 bail!(
258 "requested protocol version {} but kernel supports only {}",
259 proto,
260 proto_supported
261 );
262 }
263
264 let mut flags = SendFlags::empty();
266 if self.no_data {
267 flags |= SendFlags::NO_FILE_DATA;
268 }
269 if self.compressed_data {
270 if proto == 1 && self.proto.is_none() {
271 proto = 2;
272 }
273 if proto < 2 {
274 bail!(
275 "--compressed-data requires protocol version >= 2 (requested {proto})"
276 );
277 }
278 if proto_supported < 2 {
279 bail!("kernel does not support --compressed-data");
280 }
281 flags |= SendFlags::COMPRESSED;
282 }
283 if proto_supported > 1 {
284 flags |= SendFlags::VERSION;
285 }
286
287 let buf_size = if proto > 1 {
288 SEND_BUF_SIZE_V2
289 } else {
290 SEND_BUF_SIZE_V1
291 };
292
293 let count = self.subvolumes.len();
295 for (i, subvol_path) in self.subvolumes.iter().enumerate() {
296 let is_first = i == 0;
297 let is_last = i == count - 1;
298
299 eprintln!("At subvol {}", subvol_path.display());
300
301 let subvol_file = open_subvol_ro(subvol_path)?;
302
303 let mut this_parent = parent_root_id;
306 if !full_send && self.parent.is_none() {
307 let info =
308 subvolume_info(subvol_file.as_fd()).with_context(|| {
309 format!(
310 "failed to get info for '{}'",
311 subvol_path.display()
312 )
313 })?;
314 match find_good_parent(&info, &self.clone_src)? {
315 Some(id) => this_parent = id,
316 None => bail!(
317 "cannot find a suitable parent for '{}' among clone sources",
318 subvol_path.display()
319 ),
320 }
321 }
322
323 let mut subvol_flags = flags;
325 if self.omit_end_cmd {
326 if !is_first {
327 subvol_flags |= SendFlags::OMIT_STREAM_HEADER;
328 }
329 if !is_last {
330 subvol_flags |= SendFlags::OMIT_END_CMD;
331 }
332 }
333
334 let (pipe_read, pipe_write) = make_pipe()?;
336 let out = open_output(&self.outfile)?;
337 let reader = spawn_reader_thread(pipe_read, out, buf_size);
338
339 let send_result = btrfs_uapi::send_receive::send(
340 subvol_file.as_fd(),
341 pipe_write.as_raw_fd(),
342 this_parent,
343 &mut clone_sources,
344 subvol_flags,
345 proto,
346 );
347
348 drop(pipe_write);
350
351 if let Err(e) = send_result {
352 let _ = reader.join();
353 if e == nix::errno::Errno::EINVAL && self.omit_end_cmd {
354 bail!(
355 "send ioctl failed: {e}\n\
356 Try upgrading your kernel or don't use -e."
357 );
358 }
359 return Err(e).with_context(|| {
360 format!("send failed for '{}'", subvol_path.display())
361 });
362 }
363
364 match reader.join() {
365 Ok(Ok(())) => {}
366 Ok(Err(e)) => {
367 return Err(e).context("send stream reader failed");
368 }
369 Err(_) => bail!("send stream reader thread panicked"),
370 }
371
372 if !full_send && self.parent.is_none() {
374 let root_id = get_root_id(&subvol_file, subvol_path)?;
375 if !clone_sources.contains(&root_id) {
376 clone_sources.push(root_id);
377 }
378 }
379 }
380
381 Ok(())
382 }
383}