1mod dump;
2mod ops;
3
4use crate::{Format, Runnable};
5use anyhow::{Context, Result, bail};
6use btrfs_disk::stream::{StreamCommand, StreamReader};
7use clap::Parser;
8use ops::ReceiveContext;
9use std::{fs::File, io, path::PathBuf};
10
11#[derive(Parser, Debug)]
18pub struct ReceiveCommand {
19 mount: Option<PathBuf>,
21
22 #[clap(short = 'f')]
24 file: Option<PathBuf>,
25
26 #[clap(short = 'e')]
28 terminate_on_end: bool,
29
30 #[clap(short = 'C', long)]
32 chroot: bool,
33
34 #[clap(short = 'E', long)]
36 max_errors: Option<u64>,
37
38 #[clap(short = 'm', long = "root-mount")]
40 root_mount: Option<PathBuf>,
41
42 #[clap(long)]
44 force_decompress: bool,
45
46 #[clap(long)]
48 dump: bool,
49}
50
51impl Runnable for ReceiveCommand {
52 fn run(&self, _format: Format, _dry_run: bool) -> Result<()> {
53 let input: Box<dyn io::Read> = match &self.file {
54 Some(path) => Box::new(
55 File::open(path).with_context(|| format!("cannot open '{}'", path.display()))?,
56 ),
57 None => Box::new(io::stdin()),
58 };
59
60 if self.dump {
61 return dump::dump_stream(input);
62 }
63
64 let mount = self
65 .mount
66 .as_ref()
67 .ok_or_else(|| anyhow::anyhow!("mount point is required (unless --dump)"))?;
68
69 if !mount.is_dir() {
70 bail!("'{}' is not a directory", mount.display());
71 }
72
73 let mut reader = StreamReader::new(input)?;
76
77 let dest = if self.chroot {
78 let mount_cstr = std::ffi::CString::new(
81 mount
82 .to_str()
83 .ok_or_else(|| anyhow::anyhow!("mount path is not valid UTF-8"))?,
84 )
85 .context("mount path contains null byte")?;
86
87 if unsafe { nix::libc::chroot(mount_cstr.as_ptr()) } != 0 {
88 return Err(std::io::Error::last_os_error())
89 .context(format!("failed to chroot to '{}'", mount.display()));
90 }
91 if unsafe { nix::libc::chdir(c"/".as_ptr()) } != 0 {
92 return Err(std::io::Error::last_os_error())
93 .context("failed to chdir to / after chroot");
94 }
95 eprintln!("Chroot to {}", mount.display());
96 PathBuf::from("/")
97 } else {
98 mount.clone()
99 };
100
101 let mut ctx = ReceiveContext::new(&dest)?;
102 let max_errors = self.max_errors.unwrap_or(0);
103 let mut error_count = 0u64;
104 let mut received_subvol = false;
105
106 loop {
107 match reader.next_command() {
108 Err(e) => {
109 error_count += 1;
110 eprintln!("ERROR: {e:#}");
111 if max_errors > 0 && error_count >= max_errors {
112 bail!("too many errors ({error_count}), aborting");
113 }
114 continue;
115 }
116 Ok(None) => {
117 break;
119 }
120 Ok(Some(StreamCommand::End)) => {
121 ctx.close_write_fd();
122 ctx.finish_subvol()?;
123 received_subvol = false;
124
125 if self.terminate_on_end {
126 return Ok(());
127 }
128
129 let inner = reader.into_inner();
134 match StreamReader::new(inner) {
135 Ok(new_reader) => {
136 reader = new_reader;
137 }
138 Err(_) => {
139 return Ok(());
141 }
142 }
143 continue;
144 }
145 Ok(Some(cmd)) => {
146 if matches!(
147 &cmd,
148 StreamCommand::Subvol { .. } | StreamCommand::Snapshot { .. }
149 ) {
150 received_subvol = true;
151 }
152 if let Err(e) = ctx.process_command(&cmd) {
153 error_count += 1;
154 eprintln!("ERROR: {e:#}");
155 if max_errors > 0 && error_count >= max_errors {
156 bail!("too many errors ({error_count}), aborting");
157 }
158 }
159 }
160 }
161 }
162
163 if received_subvol {
165 ctx.finish_subvol()?;
166 }
167
168 Ok(())
169 }
170}