rsync_list/
lib.rs

1use futures::stream::{StreamExt, TryStreamExt};
2use subslice::SubsliceExt;
3use tokio::io::AsyncBufReadExt;
4
5#[derive(Debug,getset::CopyGetters)]
6#[non_exhaustive]
7pub struct Entry {
8	path: Vec<u8>,
9
10	#[getset(get_copy="pub")]
11	size: u64,
12
13	#[getset(get_copy="pub")]
14	last_modified: chrono::DateTime<chrono::Utc>,
15
16	pub kind: Kind,
17}
18
19impl Entry {
20	pub fn path(&self) -> &[u8] {
21		&self.path
22	}
23}
24
25#[derive(Debug)]
26#[non_exhaustive]
27pub enum Kind {
28	Dir(Dir),
29	File(File),
30	Link(Link)
31}
32
33#[derive(Debug)]
34#[non_exhaustive]
35pub struct Dir {
36}
37
38#[derive(Debug)]
39#[non_exhaustive]
40pub struct File {
41}
42
43#[derive(Debug)]
44#[non_exhaustive]
45pub struct Link {
46	target: Vec<u8>,
47}
48
49impl Link {
50	pub fn target(&self) -> &[u8] {
51		&self.target
52	}
53}
54
55/// Configuration for a list.
56///
57/// Many options refernce the rsync command line options and syntax. See https://download.samba.org/pub/rsync/rsync.html for the reference documentation on those options.
58#[derive(Debug)]
59#[non_exhaustive]
60pub struct Config<'a> {
61	/// The source to list.
62	///
63	/// Example: rsync://example.com/path
64	/// Example: me@example.com:/path
65	///
66	/// These are the same syntax as the rsync command line.
67	pub source: &'a std::ffi::OsStr,
68
69	/// Patterns to exclude from the listing.
70	///
71	/// Example: www/*.gz
72	///
73	/// This corresponds to rsync's --exclude flag.
74	pub exclude: &'a[&'a std::ffi::OsStr],
75
76	/// Patterns not to exclude..
77	///
78	/// Example: www/*.tar.gz
79	///
80	/// This corresponds to rsync's --include flag.
81	pub include: &'a[&'a std::ffi::OsStr],
82
83	/// The rsync executable to use.
84	///
85	/// This will be looked for in `$PATH` if it does not contain any directory
86	/// separators.
87	///
88	/// Default: rsync
89	pub executable: &'a std::ffi::OsStr,
90
91	/// If links should be reported. If `false` links will be reported as the
92	/// file that they point to.
93	///
94	/// Note: The last modified time will always be that of the link, not that
95	/// of the target.
96	///
97	/// Default: true
98	pub links: bool,
99
100	/// Recursively list the source.
101	///
102	/// Default: true
103	pub recursive: bool,
104}
105
106impl<'a> Config<'a> {
107	pub fn new(source: &'a (impl AsRef<std::ffi::OsStr> + ?Sized)) -> Self {
108		Config {
109			source: source.as_ref(),
110
111			include: &[],
112			exclude: &[],
113			executable: "rsync".as_ref(),
114			links: true,
115			recursive: true,
116		}
117	}
118
119	pub fn list(&self) -> impl futures::Stream<Item=Result<Entry, failure::Error>> {
120		let mut cmd = tokio::process::Command::new(self.executable);
121		cmd.env("TZ", "UTC0");
122
123		cmd.arg("--list-only");
124		cmd.arg("--no-motd");
125
126		if self.recursive {
127			cmd.arg("-r");
128		}
129
130		cmd.arg(if self.links { "-l" } else { "-L" });
131
132		for exclude in self.exclude {
133			cmd.arg("--exclude");
134			cmd.arg(exclude);
135		}
136		for include in self.include {
137			cmd.arg("--include");
138			cmd.arg(include);
139		}
140
141		cmd.arg(self.source);
142
143		cmd.stdout(std::process::Stdio::piped());
144
145		let result = cmd.spawn()
146			.map_err(|e| -> failure::Error { e.into() });
147
148		futures::stream::iter(Some(result))
149			.map_ok(|mut child| {
150				let stdout = child.stdout.take().unwrap();
151				let split = tokio::io::BufReader::new(stdout)
152					.split(b'\n'); // TODO: Windows?
153
154				tokio_stream::wrappers::SplitStream::new(split)
155					.map_err(|e| -> failure::Error { e.into() })
156					.chain(futures::stream::iter(Some(child))
157						.then(|mut child| async move {
158							let exit_status = child.wait().await?;
159							if exit_status.success() {
160								Ok(())
161							} else {
162								Err(ExitError {
163									exit_status,
164								}.into())
165							}
166						})
167						.try_filter_map(|_| async { Ok(None) }))
168			})
169			.try_flatten()
170			.and_then(|line| async {
171				parse_entry(&line)
172					.map_err(|e| ParseError {
173						raw: line.into(),
174						kind: e,
175					}.into())
176			})
177
178	}
179}
180
181fn parse_entry(line: &[u8]) -> Result<Entry, ParseErrorKind> {
182	let mut cur = 0;
183	let mut word = || -> Result<&[u8], ParseErrorKind> {
184		let len = line[cur..].iter().position(|c| c == &b' ')
185			.ok_or(ParseErrorKind::InvalidSyntax{})?;
186		let w = &line[cur..][..len];
187		cur += len;
188		while line.get(cur) == Some(&b' ') {
189			cur += 1;
190		}
191		Ok(w)
192	};
193
194	let mode = word()?;
195
196	let size = word()?
197		.iter()
198		.filter(|c| **c != b',')
199		.fold(Ok(0), |total, c| {
200			if b'0' <= *c && *c <= b'9' {
201				Ok(total? * 10 + (c - b'0') as u64)
202			} else {
203				Err(ParseErrorKind::InvalidSyntax{})
204			}
205		})?;
206
207	let lm_len = word()?.len() + word()?.len();
208	let last_modified = &line[cur-lm_len-2..cur-1];
209	let last_modified = std::str::from_utf8(last_modified)
210		.map_err(|_| ParseErrorKind::InvalidSyntax{})?;
211	let last_modified = chrono::DateTime::<chrono::Utc>::from_utc(
212		chrono::NaiveDateTime::parse_from_str(
213			&last_modified,
214			"%Y/%m/%d %T")
215			.map_err(|_| ParseErrorKind::InvalidSyntax{})?,
216		chrono::Utc);
217
218	let mut path = &line[cur..];
219	let mut target: &[u8] = &[];
220
221	if mode[0] == b'l' {
222		let link_start = path.find(b" -> ")
223			.ok_or(ParseErrorKind::InvalidSyntax{})?;
224		target = &path[link_start + 4..];
225		if target.find(b" -> ").is_some() {
226			return Err(ParseErrorKind::AmbiguousLink {
227				name_and_target: path.into(),
228				size,
229				last_modified,
230			})
231		}
232		path = &path[..link_start];
233	}
234
235	let r = Entry {
236		path: path_decode(path)?,
237		size,
238		last_modified,
239		kind: match mode[0] {
240			b'd' => Kind::Dir(Dir{}),
241			b'-' => Kind::File(File{}),
242			b'l' => Kind::Link(Link{
243				target: path_decode(target)?,
244			}),
245			other => return Err(ParseErrorKind::UnknownType {
246				char: other,
247			})
248		},
249	};
250
251	Ok(r)
252}
253
254
255fn path_decode(path: &[u8]) -> Result<Vec<u8>, ParseErrorKind> {
256	let mut r = Vec::with_capacity(path.len());
257	let mut i = 0;
258	while i < path.len() {
259		if path[i] != b'\\' {
260			r.push(path[i]);
261			i += 1;
262			continue
263		}
264
265		match path[i+1] {
266			b'#' => {
267				let c: u8 =
268					std::str::from_utf8(&path[i+2..][..3])
269					.map_err(|_| ParseErrorKind::InvalidSyntax{})?
270					.parse()
271					.map_err(|_| ParseErrorKind::InvalidSyntax{})?;
272				r.push(c);
273				i += 5;
274			}
275			char => return Err(ParseErrorKind::UnknownEscape{char}),
276		}
277	}
278	Ok(r)
279}
280
281#[derive(Debug,failure::Fail)]
282#[fail(display="rsync exited with failure: {:?}", exit_status)]
283#[non_exhaustive]
284pub struct ExitError {
285	pub exit_status: std::process::ExitStatus,
286}
287
288#[derive(Debug,failure::Fail)]
289#[fail(display="failed to parse {:?}: {:?}", raw, kind)]
290#[non_exhaustive]
291pub struct ParseError {
292	pub raw: Vec<u8>,
293
294	#[fail(cause)]
295	pub kind: ParseErrorKind,
296}
297
298#[derive(Debug,failure::Fail)]
299#[non_exhaustive]
300pub enum ParseErrorKind {
301    #[fail(display="ambiguous symlink")]
302	#[non_exhaustive]
303	AmbiguousLink {
304		name_and_target: Vec<u8>,
305		size: u64,
306		last_modified: chrono::DateTime<chrono::Utc>,
307	},
308
309    #[fail(display = "unexpected end of entry")]
310	#[non_exhaustive]
311	InvalidSyntax {},
312
313    #[fail(display="invalid escape: \\{:?}", char)]
314	#[non_exhaustive]
315	UnknownEscape {
316		char: u8,
317	},
318
319    #[fail(display="unknown entry type: {:?}", char)]
320	#[non_exhaustive]
321	UnknownType {
322		char: u8,
323	},
324}
325
326#[cfg(test)]
327mod tests {
328	use super::*;
329
330	#[test]
331	fn test_list() {
332		tokio::runtime::Runtime::new().unwrap().block_on(async {
333			let mut names = Vec::new();
334			let mut sizes = Vec::new();
335			let mut targets = Vec::new();
336
337			Config::new("./tests/fixtures/directory")
338				.list()
339				.for_each(|entry| {
340					let entry = entry.unwrap();
341
342					names.push(String::from_utf8_lossy(entry.path()).to_string());
343					sizes.push(match entry.kind {
344						Kind::File(_) | Kind::Link(_) => entry.size(),
345						Kind::Dir(_) => 0,
346					});
347					targets.push(match entry.kind {
348						Kind::Link(link) => {
349							Some(String::from_utf8_lossy(&link.target).to_string())
350						}
351						_ => None,
352					});
353					
354					async {}
355				}).await;
356
357			assert_eq!(names, &[
358				"directory",
359				"directory/lin\x06ky",
360				"directory/some-file.txt",
361				"directory/subdirectory",
362				"directory/subdirectory/other-file.txt",
363				"directory/subdirectory/uplink",
364			]);
365			assert_eq!(sizes, &[
366				0,
367				27,
368				18,
369				0,
370				11,
371				16,
372			]);
373			assert_eq!(targets, &[
374				None,
375				Some("subdirectory/other-file.txt".into()),
376				None,
377				None,
378				None,
379				Some("../some-file.txt".into()),
380			]);
381		})
382	}
383}