rsync-list 1.71.0

List files using rsync.
Documentation
use futures::stream::{StreamExt, TryStreamExt};
use subslice::SubsliceExt;
use tokio::io::AsyncBufReadExt;

#[derive(Debug,getset::CopyGetters)]
#[non_exhaustive]
pub struct Entry {
	path: Vec<u8>,

	#[getset(get_copy="pub")]
	size: u64,

	#[getset(get_copy="pub")]
	last_modified: chrono::DateTime<chrono::Utc>,

	pub kind: Kind,
}

impl Entry {
	pub fn path(&self) -> &[u8] {
		&self.path
	}
}

#[derive(Debug)]
#[non_exhaustive]
pub enum Kind {
	Dir(Dir),
	File(File),
	Link(Link)
}

#[derive(Debug)]
#[non_exhaustive]
pub struct Dir {
}

#[derive(Debug)]
#[non_exhaustive]
pub struct File {
}

#[derive(Debug)]
#[non_exhaustive]
pub struct Link {
	target: Vec<u8>,
}

impl Link {
	pub fn target(&self) -> &[u8] {
		&self.target
	}
}

/// Configuration for a list.
///
/// Many options refernce the rsync command line options and syntax. See https://download.samba.org/pub/rsync/rsync.html for the reference documentation on those options.
#[derive(Debug)]
#[non_exhaustive]
pub struct Config<'a> {
	/// The source to list.
	///
	/// Example: rsync://example.com/path
	/// Example: me@example.com:/path
	///
	/// These are the same syntax as the rsync command line.
	pub source: &'a std::ffi::OsStr,

	/// Patterns to exclude from the listing.
	///
	/// Example: www/*.gz
	///
	/// This corresponds to rsync's --exclude flag.
	pub exclude: &'a[&'a std::ffi::OsStr],

	/// Patterns not to exclude..
	///
	/// Example: www/*.tar.gz
	///
	/// This corresponds to rsync's --include flag.
	pub include: &'a[&'a std::ffi::OsStr],

	/// The rsync executable to use.
	///
	/// This will be looked for in `$PATH` if it does not contain any directory
	/// separators.
	///
	/// Default: rsync
	pub executable: &'a std::ffi::OsStr,

	/// If links should be reported. If `false` links will be reported as the
	/// file that they point to.
	///
	/// Note: The last modified time will always be that of the link, not that
	/// of the target.
	///
	/// Default: true
	pub links: bool,

	/// Recursively list the source.
	///
	/// Default: true
	pub recursive: bool,
}

impl<'a> Config<'a> {
	pub fn new(source: &'a (impl AsRef<std::ffi::OsStr> + ?Sized)) -> Self {
		Config {
			source: source.as_ref(),

			include: &[],
			exclude: &[],
			executable: "rsync".as_ref(),
			links: true,
			recursive: true,
		}
	}

	pub fn list(&self) -> impl futures::Stream<Item=Result<Entry, failure::Error>> {
		let mut cmd = tokio::process::Command::new(self.executable);
		cmd.env("TZ", "UTC0");

		cmd.arg("--list-only");
		cmd.arg("--no-motd");

		if self.recursive {
			cmd.arg("-r");
		}

		cmd.arg(if self.links { "-l" } else { "-L" });

		for exclude in self.exclude {
			cmd.arg("--exclude");
			cmd.arg(exclude);
		}
		for include in self.include {
			cmd.arg("--include");
			cmd.arg(include);
		}

		cmd.arg(self.source);

		cmd.stdout(std::process::Stdio::piped());

		let result = cmd.spawn()
			.map_err(|e| -> failure::Error { e.into() });

		futures::stream::iter(Some(result))
			.map_ok(|mut child| {
				let stdout = child.stdout.take().unwrap();
				let split = tokio::io::BufReader::new(stdout)
					.split(b'\n'); // TODO: Windows?

				tokio_stream::wrappers::SplitStream::new(split)
					.map_err(|e| -> failure::Error { e.into() })
					.chain(futures::stream::iter(Some(child))
						.then(|mut child| async move {
							let exit_status = child.wait().await?;
							if exit_status.success() {
								Ok(())
							} else {
								Err(ExitError {
									exit_status,
								}.into())
							}
						})
						.try_filter_map(|_| async { Ok(None) }))
			})
			.try_flatten()
			.and_then(|line| async {
				parse_entry(&line)
					.map_err(|e| ParseError {
						raw: line.into(),
						kind: e,
					}.into())
			})

	}
}

fn parse_entry(line: &[u8]) -> Result<Entry, ParseErrorKind> {
	let mut cur = 0;
	let mut word = || -> Result<&[u8], ParseErrorKind> {
		let len = line[cur..].iter().position(|c| c == &b' ')
			.ok_or(ParseErrorKind::InvalidSyntax{})?;
		let w = &line[cur..][..len];
		cur += len;
		while line.get(cur) == Some(&b' ') {
			cur += 1;
		}
		Ok(w)
	};

	let mode = word()?;

	let size = word()?
		.iter()
		.filter(|c| **c != b',')
		.fold(Ok(0), |total, c| {
			if b'0' <= *c && *c <= b'9' {
				Ok(total? * 10 + (c - b'0') as u64)
			} else {
				Err(ParseErrorKind::InvalidSyntax{})
			}
		})?;

	let lm_len = word()?.len() + word()?.len();
	let last_modified = &line[cur-lm_len-2..cur-1];
	let last_modified = std::str::from_utf8(last_modified)
		.map_err(|_| ParseErrorKind::InvalidSyntax{})?;
	let last_modified = chrono::DateTime::<chrono::Utc>::from_utc(
		chrono::NaiveDateTime::parse_from_str(
			&last_modified,
			"%Y/%m/%d %T")
			.map_err(|_| ParseErrorKind::InvalidSyntax{})?,
		chrono::Utc);

	let mut path = &line[cur..];
	let mut target: &[u8] = &[];

	if mode[0] == b'l' {
		let link_start = path.find(b" -> ")
			.ok_or(ParseErrorKind::InvalidSyntax{})?;
		target = &path[link_start + 4..];
		if target.find(b" -> ").is_some() {
			return Err(ParseErrorKind::AmbiguousLink {
				name_and_target: path.into(),
				size,
				last_modified,
			})
		}
		path = &path[..link_start];
	}

	let r = Entry {
		path: path_decode(path)?,
		size,
		last_modified,
		kind: match mode[0] {
			b'd' => Kind::Dir(Dir{}),
			b'-' => Kind::File(File{}),
			b'l' => Kind::Link(Link{
				target: path_decode(target)?,
			}),
			other => return Err(ParseErrorKind::UnknownType {
				char: other,
			})
		},
	};

	Ok(r)
}


fn path_decode(path: &[u8]) -> Result<Vec<u8>, ParseErrorKind> {
	let mut r = Vec::with_capacity(path.len());
	let mut i = 0;
	while i < path.len() {
		if path[i] != b'\\' {
			r.push(path[i]);
			i += 1;
			continue
		}

		match path[i+1] {
			b'#' => {
				let c: u8 =
					std::str::from_utf8(&path[i+2..][..3])
					.map_err(|_| ParseErrorKind::InvalidSyntax{})?
					.parse()
					.map_err(|_| ParseErrorKind::InvalidSyntax{})?;
				r.push(c);
				i += 5;
			}
			char => return Err(ParseErrorKind::UnknownEscape{char}),
		}
	}
	Ok(r)
}

#[derive(Debug,failure::Fail)]
#[fail(display="rsync exited with failure: {:?}", exit_status)]
#[non_exhaustive]
pub struct ExitError {
	pub exit_status: std::process::ExitStatus,
}

#[derive(Debug,failure::Fail)]
#[fail(display="failed to parse {:?}: {:?}", raw, kind)]
#[non_exhaustive]
pub struct ParseError {
	pub raw: Vec<u8>,

	#[fail(cause)]
	pub kind: ParseErrorKind,
}

#[derive(Debug,failure::Fail)]
#[non_exhaustive]
pub enum ParseErrorKind {
    #[fail(display="ambiguous symlink")]
	#[non_exhaustive]
	AmbiguousLink {
		name_and_target: Vec<u8>,
		size: u64,
		last_modified: chrono::DateTime<chrono::Utc>,
	},

    #[fail(display = "unexpected end of entry")]
	#[non_exhaustive]
	InvalidSyntax {},

    #[fail(display="invalid escape: \\{:?}", char)]
	#[non_exhaustive]
	UnknownEscape {
		char: u8,
	},

    #[fail(display="unknown entry type: {:?}", char)]
	#[non_exhaustive]
	UnknownType {
		char: u8,
	},
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_list() {
		tokio::runtime::Runtime::new().unwrap().block_on(async {
			let mut names = Vec::new();
			let mut sizes = Vec::new();
			let mut targets = Vec::new();

			Config::new("./tests/fixtures/directory")
				.list()
				.for_each(|entry| {
					let entry = entry.unwrap();

					names.push(String::from_utf8_lossy(entry.path()).to_string());
					sizes.push(match entry.kind {
						Kind::File(_) | Kind::Link(_) => entry.size(),
						Kind::Dir(_) => 0,
					});
					targets.push(match entry.kind {
						Kind::Link(link) => {
							Some(String::from_utf8_lossy(&link.target).to_string())
						}
						_ => None,
					});
					
					async {}
				}).await;

			assert_eq!(names, &[
				"directory",
				"directory/lin\x06ky",
				"directory/some-file.txt",
				"directory/subdirectory",
				"directory/subdirectory/other-file.txt",
				"directory/subdirectory/uplink",
			]);
			assert_eq!(sizes, &[
				0,
				27,
				18,
				0,
				11,
				16,
			]);
			assert_eq!(targets, &[
				None,
				Some("subdirectory/other-file.txt".into()),
				None,
				None,
				None,
				Some("../some-file.txt".into()),
			]);
		})
	}
}