arrsync/
client.rs

1use crate::alluse::*;
2use tokio_util::io::StreamReader;
3
4static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(0);
5
6#[derive(Clone)]
7/// The main client struct
8pub struct RsyncClient {
9	inner: Arc<RsyncClientInner>,
10}
11
12struct RsyncClientInner {
13	reqs: Requests,
14	write: AsyncMutex<OwnedWriteHalf>,
15	finish: SyncMutex<Option<JoinHandle<Result<Enveloped>>>>,
16	id: usize,
17}
18
19#[derive(Clone, Debug)]
20/// Connection statistics provided by server
21pub struct Stats {
22	/// Bytes sent by us
23	pub bytes_read: i64,
24	/// Bytes sent by the server/sender
25	pub bytes_written: i64,
26	/// Total size of files on the server.
27	/// (Since this client knows nothing of exclusion lists, you can also calculate this from the file list.)
28	pub file_size: i64,
29}
30
31impl RsyncClient {
32	/// Open a connection to an rsync server and read the initial file list.
33	/// The url must have scheme `rsync` and contain at least one path element (module listing is
34	/// not supported).
35	pub async fn connect(url: &url::Url) -> Result<(RsyncClient, Vec<File>)> {
36		let (path, base) = Self::parse_url(&url)?;
37		let (read, mut write) = Self::stream(&url).await?;
38		let mut read = BufReader::new(read);
39		Self::send_handshake(&mut write, path, base)
40			.await
41			.context("Handshake: send")?;
42		let origin = format!(
43			"{}{}",
44			url.host().expect("Connected to an url without host"),
45			url.port().map(|p| format!("{}", p)).unwrap_or("".to_string())
46		);
47		Self::read_handshake(&mut read, &origin)
48			.await
49			.context("Handshake: receive")?;
50		let mut read = EnvelopeRead::new(read); // Server multiplex start
51		let id = NEXT_CLIENT_ID.fetch_add(1, AtomicOrder::SeqCst);
52		let files = Self::read_file_list(&mut read, id)
53			.await
54			.context("File list: receive")?;
55		write.write_i32_le(-1).await?;
56		anyhow::ensure!(
57			read.read_i32_le().await? == -1,
58			"Phase switch receive-list -> receive-file"
59		);
60		let reqs: Requests = RequestsInner::new_requests();
61		let process = ReadFilesProcess::spawn(read, reqs.clone());
62		let inner = RsyncClientInner {
63			reqs: reqs.clone(),
64			write: AsyncMutex::new(write),
65			finish: SyncMutex::new(Some(process)),
66			id,
67		};
68		Ok((RsyncClient { inner: Arc::new(inner) }, files))
69	}
70	/// Requests the transfer of a [File](crate::File).
71	/// The referenced File must have been returned in the same call as `self`, or an error will be
72	/// returned.
73	///
74	/// There are some worrying remarks in the rsync protocol documentation in openrsync.
75	/// It is stated that requested files may be silently ommited from transfer,
76	/// and that files are not necessarily transmited in the order they were requested.
77	/// Effectively, this means that the only way to detect that a file wasn't sent is to invoke
78	/// [close](crate::RsyncClient::close), and then to wait for the sender to signal the end of the
79	/// connection. Without calling close, the returned [AsyncRead](crate::AsyncRead) may remain
80	/// pending forever.
81	pub async fn get(&self, file: &File) -> Result<impl AsyncRead> {
82		anyhow::ensure!(
83			self.inner.id == file.client_id,
84			"Requested file from client that was not listed in that client's connect call"
85		);
86		anyhow::ensure!(file.is_file(), "Can only request files, {} is not", file);
87		let (data_send, mut data_recv) = mpsc::channel(10);
88		{
89			let mut reqs = self.inner.reqs.lock().unwrap();
90			reqs.refresh_timeout();
91			reqs.requests
92				.as_mut()
93				.context("Client fully exited")?
94				.entry(file.idx)
95				.or_insert(vec![])
96				.push(data_send);
97		}
98		let mut write = self.inner.write.lock().await;
99		log::debug!("Requesting index {}: {}", file.idx, file);
100		write.write_i32_le(file.idx as i32).await?;
101		write.write_all(&[0u8; 16]).await?; // We want no blocks, blocklength, checksum, or terminal block. 4 x 0i32 = 16
102		let stream = async_stream::stream! { while let Some(item) = data_recv.recv().await { yield item; } };
103		Ok(StreamReader::new(stream))
104	}
105	/// Finalizes the connection and returns statistics about the transfer (See [Stats](crate::Stats)).
106	/// All files that have allready been requested will be transferred before the returned future completes.
107	/// Only one call to this function can succeed per [RsyncClient](crate::RsyncClient).
108	pub async fn close(&mut self) -> Result<Stats> {
109		let read = self.inner.finish.lock().unwrap().take();
110		if let Some(read) = read {
111			let mut write = self.inner.write.lock().await;
112			write.write_i32_le(-1).await?; // phase switch, read by ReadProcess
113			let mut read = read.await??;
114			let stats = Stats {
115				bytes_read: read.read_rsync_long().await?,
116				bytes_written: read.read_rsync_long().await?,
117				file_size: read.read_rsync_long().await?,
118			};
119			log::debug!("Rsync Stats: {:?}", stats);
120			write.write_i32_le(-1).await?; // EoS
121			write.shutdown().await.context("Failed to close connection")?;
122			Ok(stats)
123		} else {
124			anyhow::bail!("Only one can close");
125		}
126	}
127
128	fn parse_url(url: &url::Url) -> Result<(&Path, &str)> {
129		anyhow::ensure!(url.scheme() == "rsync", "Only rsync urls supported, not {}", url);
130		anyhow::ensure!(url.path() != "", "Path cannot be / - your url {} is cut short.", url);
131		let path = Path::new(url.path());
132		let path = if path.is_absolute() {
133			path.strip_prefix(Path::new("/"))
134				.expect(&format!("Stripping root from path {:?}", path))
135		// Does this work on windows?
136		} else {
137			path
138		};
139		let mut base = path;
140		let base = loop {
141			match base.parent() {
142				Some(p) if p == Path::new("") => break base,
143				None => panic!("Getting base element of path {:?}", base),
144				Some(p) => base = p,
145			};
146		};
147		let base = base.to_str().expect("TODO: handle paths properly");
148		Ok((path, base))
149	}
150	async fn stream(url: &url::Url) -> Result<(OwnedReadHalf, OwnedWriteHalf)> {
151		let mut addrs = url
152			.socket_addrs(|| Some(873))
153			.context(format!("Get socket addr from url {}", url))?;
154		let mut stream =
155			TcpStream::connect(addrs.pop().expect("Successful resolution returns at least one address")).await;
156		for addr in addrs.iter() {
157			stream = TcpStream::connect(addr).await;
158			if stream.is_ok() {
159				break;
160			}
161		}
162		Ok(stream
163			.context(format!("Connect to {} ({:?})", url, addrs))?
164			.into_split())
165	}
166	async fn send_handshake(write: &mut OwnedWriteHalf, path: &Path, base: &str) -> Result<()> {
167		// rsync seems to be ok with us sending it all at once
168		let initial: Vec<u8> = [
169			// TODO: The openrsync docs only exist for rsync 27, but below rsync 30, dates beyond 1970 + 2^31 can't be handled.
170			// Client hello
171			&b"@RSYNCD: 27.0\n"[..],
172			// Select root
173			base.as_bytes(),
174			&b"\n"[..],
175			// Remote command
176			&b"--server\n"[..],
177			&b"--sender\n"[..],
178			&b"-rl\n"[..],
179			&b".\n"[..],
180			path.to_str().unwrap().as_bytes(),
181			&b"/\n\n"[..],
182			// Exclusion list
183			&b"\0\0\0\0"[..],
184		]
185		.concat();
186		write.write_all(&initial).await?;
187		Ok(())
188	}
189	async fn read_handshake<T: AsyncBufRead + Unpin>(read: &mut T, origin: &str) -> Result<()> {
190		let hello = &mut String::new();
191		read.read_line(hello).await.context("Read server hello")?;
192		let rsyncd = "@RSYNCD: ";
193		anyhow::ensure!(
194			hello.starts_with(rsyncd),
195			"Not an rsync server? First message was {}",
196			hello
197		);
198		let ver = hello[rsyncd.len()..(hello.len() - 1)]
199			.split(' ')
200			.next()
201			.map(|version| version.split('.').map(str::parse).collect::<Result<Vec<u64>, _>>())
202			.ok_or(anyhow::anyhow!(format!("Version nor present")))
203			.context(format!("Parsing server version from {}", hello))??;
204		anyhow::ensure!(
205			ver >= vec![27, 0],
206			"Server version {} not supported - need 27.0 minimum.",
207			ver.iter().map(|i| format!("{}", i)).collect::<Vec<_>>().join(".")
208		);
209		let mut motd = String::new();
210		loop {
211			let select = &mut String::new();
212			read.read_line(select).await.context("Read server startup")?;
213			if select == &format!("{}OK\n", rsyncd) {
214				break;
215			} else {
216				motd += select;
217			}
218		}
219		if &motd != "" {
220			log::info!("MOTD from {}\n{}", origin, motd.strip_suffix("\n").unwrap_or(&motd));
221		}
222		let _random_seed = read.read_u32().await?; // Read random seed - we don't care
223		Ok(())
224	}
225	async fn read_file_list<T: AsyncRead + Unpin + Send>(read: &mut T, client_id: usize) -> Result<Vec<File>> {
226		let mut ret = vec![];
227		let mut filename_buf: Vec<u8> = vec![];
228		let mut mode_buf = None;
229		let mut mtime_buf = None;
230		loop {
231			let meta = FileEntryStatus(read.read_u8().await.context("Status")?);
232			if meta.is_end() {
233				break;
234			}
235			let inherited_filename_length = if meta.inherits_filename() {
236				read.read_u8().await? as usize
237			} else {
238				0
239			};
240			anyhow::ensure!(
241				inherited_filename_length <= filename_buf.len(),
242				"Protocol error: file list format inconsistency"
243			);
244			let filename_length = if meta.integer_filename_len() {
245				read.read_u32_le().await? as usize
246			} else {
247				read.read_u8().await? as usize
248			};
249			filename_buf.resize(filename_length + inherited_filename_length, 0);
250			read.read_exact(&mut filename_buf[inherited_filename_length..]).await?;
251			let show = String::from_utf8_lossy(&filename_buf);
252			let size = read.read_rsync_long().await?;
253			anyhow::ensure!(size >= 0, "Protocol error: negative file size for {}", show);
254			let size = size as u64;
255			mtime_buf = if meta.mtime_repeated() {
256				mtime_buf
257			} else {
258				let ts = read.read_i32_le().await?;
259				#[allow(deprecated)]
260				let naive = NaiveDateTime::from_timestamp(ts as i64, 0);
261				#[allow(deprecated)]
262				let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);
263				Some(datetime)
264			};
265			let mode = if meta.file_mode_repeated() {
266				mode_buf.context(format!("Protocol error: first file {} without mode", show))?
267			} else {
268				read.read_u32_le().await?
269			};
270			mode_buf = Some(mode);
271			if !meta.uid_repeated() {
272				// Actually, I expect the uids to be never present. Oh well.
273				let _uid = read.read_i32_le().await?;
274			}
275			if !meta.gid_repeated() {
276				let _gid = read.read_i32_le().await?;
277			}
278			let symlink = if unix_mode::is_symlink(mode) {
279				let len = read.read_u32_le().await? as usize;
280				let mut link = vec![];
281				link.resize(len, 0);
282				read.read_exact(&mut link).await?;
283				Some(link)
284			} else {
285				None
286			};
287			let idx = usize::MAX;
288			let f = File {
289				path: filename_buf.clone(),
290				mtime: mtime_buf,
291				symlink,
292				mode,
293				size,
294				idx,
295				client_id,
296			};
297			ret.push(f);
298		}
299		if read.read_i32_le().await? != 0 {
300			log::warn!("IO errors while listing files.");
301		}
302		ret.sort_unstable_by(|a, b| a.path.cmp(&b.path));
303		// Hmm. rsyn also dedupes. I've never seen dupes, so I don't know why.
304		for (i, f) in ret.iter_mut().enumerate() {
305			f.idx = i;
306			log::debug!("{:>6} {}", f.idx, f);
307		}
308		Ok(ret)
309	}
310}
311
312/*
313struct Compat1<T> {
314	source: T,
315	internal: Bytes,
316}
317impl<T> Compat1<T>{
318	fn new(source: T) -> Compat1<T> { Compat1 { source, internal: Bytes::new() }}
319}
320impl<E> AsyncRead for Compat1<Receiver<Result<bytes::Bytes, E>>>
321	where E: Into<std::io::Error> {
322	fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &mut ReadBuf<'_>) -> Poll<Result<(), std::io::Error>> {
323		self.source.
324		while buf.capacity() > 0 {
325			if ! self.internal.is_empty()
326		}
327		Poll::Ready(OK(()))
328	}
329}*/