1use crate::alluse::*;
2use tokio_util::io::StreamReader;
3
4static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(0);
5
6#[derive(Clone)]
7pub struct RsyncClient {
9 inner: Arc<RsyncClientInner>,
10}
11
12struct RsyncClientInner {
13 reqs: Requests,
14 write: AsyncMutex<OwnedWriteHalf>,
15 finish: SyncMutex<Option<JoinHandle<Result<Enveloped>>>>,
16 id: usize,
17}
18
19#[derive(Clone, Debug)]
20pub struct Stats {
22 pub bytes_read: i64,
24 pub bytes_written: i64,
26 pub file_size: i64,
29}
30
31impl RsyncClient {
32 pub async fn connect(url: &url::Url) -> Result<(RsyncClient, Vec<File>)> {
36 let (path, base) = Self::parse_url(&url)?;
37 let (read, mut write) = Self::stream(&url).await?;
38 let mut read = BufReader::new(read);
39 Self::send_handshake(&mut write, path, base)
40 .await
41 .context("Handshake: send")?;
42 let origin = format!(
43 "{}{}",
44 url.host().expect("Connected to an url without host"),
45 url.port().map(|p| format!("{}", p)).unwrap_or("".to_string())
46 );
47 Self::read_handshake(&mut read, &origin)
48 .await
49 .context("Handshake: receive")?;
50 let mut read = EnvelopeRead::new(read); let id = NEXT_CLIENT_ID.fetch_add(1, AtomicOrder::SeqCst);
52 let files = Self::read_file_list(&mut read, id)
53 .await
54 .context("File list: receive")?;
55 write.write_i32_le(-1).await?;
56 anyhow::ensure!(
57 read.read_i32_le().await? == -1,
58 "Phase switch receive-list -> receive-file"
59 );
60 let reqs: Requests = RequestsInner::new_requests();
61 let process = ReadFilesProcess::spawn(read, reqs.clone());
62 let inner = RsyncClientInner {
63 reqs: reqs.clone(),
64 write: AsyncMutex::new(write),
65 finish: SyncMutex::new(Some(process)),
66 id,
67 };
68 Ok((RsyncClient { inner: Arc::new(inner) }, files))
69 }
70 pub async fn get(&self, file: &File) -> Result<impl AsyncRead> {
82 anyhow::ensure!(
83 self.inner.id == file.client_id,
84 "Requested file from client that was not listed in that client's connect call"
85 );
86 anyhow::ensure!(file.is_file(), "Can only request files, {} is not", file);
87 let (data_send, mut data_recv) = mpsc::channel(10);
88 {
89 let mut reqs = self.inner.reqs.lock().unwrap();
90 reqs.refresh_timeout();
91 reqs.requests
92 .as_mut()
93 .context("Client fully exited")?
94 .entry(file.idx)
95 .or_insert(vec![])
96 .push(data_send);
97 }
98 let mut write = self.inner.write.lock().await;
99 log::debug!("Requesting index {}: {}", file.idx, file);
100 write.write_i32_le(file.idx as i32).await?;
101 write.write_all(&[0u8; 16]).await?; let stream = async_stream::stream! { while let Some(item) = data_recv.recv().await { yield item; } };
103 Ok(StreamReader::new(stream))
104 }
105 pub async fn close(&mut self) -> Result<Stats> {
109 let read = self.inner.finish.lock().unwrap().take();
110 if let Some(read) = read {
111 let mut write = self.inner.write.lock().await;
112 write.write_i32_le(-1).await?; let mut read = read.await??;
114 let stats = Stats {
115 bytes_read: read.read_rsync_long().await?,
116 bytes_written: read.read_rsync_long().await?,
117 file_size: read.read_rsync_long().await?,
118 };
119 log::debug!("Rsync Stats: {:?}", stats);
120 write.write_i32_le(-1).await?; write.shutdown().await.context("Failed to close connection")?;
122 Ok(stats)
123 } else {
124 anyhow::bail!("Only one can close");
125 }
126 }
127
128 fn parse_url(url: &url::Url) -> Result<(&Path, &str)> {
129 anyhow::ensure!(url.scheme() == "rsync", "Only rsync urls supported, not {}", url);
130 anyhow::ensure!(url.path() != "", "Path cannot be / - your url {} is cut short.", url);
131 let path = Path::new(url.path());
132 let path = if path.is_absolute() {
133 path.strip_prefix(Path::new("/"))
134 .expect(&format!("Stripping root from path {:?}", path))
135 } else {
137 path
138 };
139 let mut base = path;
140 let base = loop {
141 match base.parent() {
142 Some(p) if p == Path::new("") => break base,
143 None => panic!("Getting base element of path {:?}", base),
144 Some(p) => base = p,
145 };
146 };
147 let base = base.to_str().expect("TODO: handle paths properly");
148 Ok((path, base))
149 }
150 async fn stream(url: &url::Url) -> Result<(OwnedReadHalf, OwnedWriteHalf)> {
151 let mut addrs = url
152 .socket_addrs(|| Some(873))
153 .context(format!("Get socket addr from url {}", url))?;
154 let mut stream =
155 TcpStream::connect(addrs.pop().expect("Successful resolution returns at least one address")).await;
156 for addr in addrs.iter() {
157 stream = TcpStream::connect(addr).await;
158 if stream.is_ok() {
159 break;
160 }
161 }
162 Ok(stream
163 .context(format!("Connect to {} ({:?})", url, addrs))?
164 .into_split())
165 }
166 async fn send_handshake(write: &mut OwnedWriteHalf, path: &Path, base: &str) -> Result<()> {
167 let initial: Vec<u8> = [
169 &b"@RSYNCD: 27.0\n"[..],
172 base.as_bytes(),
174 &b"\n"[..],
175 &b"--server\n"[..],
177 &b"--sender\n"[..],
178 &b"-rl\n"[..],
179 &b".\n"[..],
180 path.to_str().unwrap().as_bytes(),
181 &b"/\n\n"[..],
182 &b"\0\0\0\0"[..],
184 ]
185 .concat();
186 write.write_all(&initial).await?;
187 Ok(())
188 }
189 async fn read_handshake<T: AsyncBufRead + Unpin>(read: &mut T, origin: &str) -> Result<()> {
190 let hello = &mut String::new();
191 read.read_line(hello).await.context("Read server hello")?;
192 let rsyncd = "@RSYNCD: ";
193 anyhow::ensure!(
194 hello.starts_with(rsyncd),
195 "Not an rsync server? First message was {}",
196 hello
197 );
198 let ver = hello[rsyncd.len()..(hello.len() - 1)]
199 .split(' ')
200 .next()
201 .map(|version| version.split('.').map(str::parse).collect::<Result<Vec<u64>, _>>())
202 .ok_or(anyhow::anyhow!(format!("Version nor present")))
203 .context(format!("Parsing server version from {}", hello))??;
204 anyhow::ensure!(
205 ver >= vec![27, 0],
206 "Server version {} not supported - need 27.0 minimum.",
207 ver.iter().map(|i| format!("{}", i)).collect::<Vec<_>>().join(".")
208 );
209 let mut motd = String::new();
210 loop {
211 let select = &mut String::new();
212 read.read_line(select).await.context("Read server startup")?;
213 if select == &format!("{}OK\n", rsyncd) {
214 break;
215 } else {
216 motd += select;
217 }
218 }
219 if &motd != "" {
220 log::info!("MOTD from {}\n{}", origin, motd.strip_suffix("\n").unwrap_or(&motd));
221 }
222 let _random_seed = read.read_u32().await?; Ok(())
224 }
225 async fn read_file_list<T: AsyncRead + Unpin + Send>(read: &mut T, client_id: usize) -> Result<Vec<File>> {
226 let mut ret = vec![];
227 let mut filename_buf: Vec<u8> = vec![];
228 let mut mode_buf = None;
229 let mut mtime_buf = None;
230 loop {
231 let meta = FileEntryStatus(read.read_u8().await.context("Status")?);
232 if meta.is_end() {
233 break;
234 }
235 let inherited_filename_length = if meta.inherits_filename() {
236 read.read_u8().await? as usize
237 } else {
238 0
239 };
240 anyhow::ensure!(
241 inherited_filename_length <= filename_buf.len(),
242 "Protocol error: file list format inconsistency"
243 );
244 let filename_length = if meta.integer_filename_len() {
245 read.read_u32_le().await? as usize
246 } else {
247 read.read_u8().await? as usize
248 };
249 filename_buf.resize(filename_length + inherited_filename_length, 0);
250 read.read_exact(&mut filename_buf[inherited_filename_length..]).await?;
251 let show = String::from_utf8_lossy(&filename_buf);
252 let size = read.read_rsync_long().await?;
253 anyhow::ensure!(size >= 0, "Protocol error: negative file size for {}", show);
254 let size = size as u64;
255 mtime_buf = if meta.mtime_repeated() {
256 mtime_buf
257 } else {
258 let ts = read.read_i32_le().await?;
259 #[allow(deprecated)]
260 let naive = NaiveDateTime::from_timestamp(ts as i64, 0);
261 #[allow(deprecated)]
262 let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);
263 Some(datetime)
264 };
265 let mode = if meta.file_mode_repeated() {
266 mode_buf.context(format!("Protocol error: first file {} without mode", show))?
267 } else {
268 read.read_u32_le().await?
269 };
270 mode_buf = Some(mode);
271 if !meta.uid_repeated() {
272 let _uid = read.read_i32_le().await?;
274 }
275 if !meta.gid_repeated() {
276 let _gid = read.read_i32_le().await?;
277 }
278 let symlink = if unix_mode::is_symlink(mode) {
279 let len = read.read_u32_le().await? as usize;
280 let mut link = vec![];
281 link.resize(len, 0);
282 read.read_exact(&mut link).await?;
283 Some(link)
284 } else {
285 None
286 };
287 let idx = usize::MAX;
288 let f = File {
289 path: filename_buf.clone(),
290 mtime: mtime_buf,
291 symlink,
292 mode,
293 size,
294 idx,
295 client_id,
296 };
297 ret.push(f);
298 }
299 if read.read_i32_le().await? != 0 {
300 log::warn!("IO errors while listing files.");
301 }
302 ret.sort_unstable_by(|a, b| a.path.cmp(&b.path));
303 for (i, f) in ret.iter_mut().enumerate() {
305 f.idx = i;
306 log::debug!("{:>6} {}", f.idx, f);
307 }
308 Ok(ret)
309 }
310}
311
312