1use futures::stream::{StreamExt, TryStreamExt};
2use subslice::SubsliceExt;
3use tokio::io::AsyncBufReadExt;
4
5#[derive(Debug,getset::CopyGetters)]
6#[non_exhaustive]
7pub struct Entry {
8 path: Vec<u8>,
9
10 #[getset(get_copy="pub")]
11 size: u64,
12
13 #[getset(get_copy="pub")]
14 last_modified: chrono::DateTime<chrono::Utc>,
15
16 pub kind: Kind,
17}
18
19impl Entry {
20 pub fn path(&self) -> &[u8] {
21 &self.path
22 }
23}
24
25#[derive(Debug)]
26#[non_exhaustive]
27pub enum Kind {
28 Dir(Dir),
29 File(File),
30 Link(Link)
31}
32
33#[derive(Debug)]
34#[non_exhaustive]
35pub struct Dir {
36}
37
38#[derive(Debug)]
39#[non_exhaustive]
40pub struct File {
41}
42
43#[derive(Debug)]
44#[non_exhaustive]
45pub struct Link {
46 target: Vec<u8>,
47}
48
49impl Link {
50 pub fn target(&self) -> &[u8] {
51 &self.target
52 }
53}
54
55#[derive(Debug)]
59#[non_exhaustive]
60pub struct Config<'a> {
61 pub source: &'a std::ffi::OsStr,
68
69 pub exclude: &'a[&'a std::ffi::OsStr],
75
76 pub include: &'a[&'a std::ffi::OsStr],
82
83 pub executable: &'a std::ffi::OsStr,
90
91 pub links: bool,
99
100 pub recursive: bool,
104}
105
106impl<'a> Config<'a> {
107 pub fn new(source: &'a (impl AsRef<std::ffi::OsStr> + ?Sized)) -> Self {
108 Config {
109 source: source.as_ref(),
110
111 include: &[],
112 exclude: &[],
113 executable: "rsync".as_ref(),
114 links: true,
115 recursive: true,
116 }
117 }
118
119 pub fn list(&self) -> impl futures::Stream<Item=Result<Entry, failure::Error>> {
120 let mut cmd = tokio::process::Command::new(self.executable);
121 cmd.env("TZ", "UTC0");
122
123 cmd.arg("--list-only");
124 cmd.arg("--no-motd");
125
126 if self.recursive {
127 cmd.arg("-r");
128 }
129
130 cmd.arg(if self.links { "-l" } else { "-L" });
131
132 for exclude in self.exclude {
133 cmd.arg("--exclude");
134 cmd.arg(exclude);
135 }
136 for include in self.include {
137 cmd.arg("--include");
138 cmd.arg(include);
139 }
140
141 cmd.arg(self.source);
142
143 cmd.stdout(std::process::Stdio::piped());
144
145 let result = cmd.spawn()
146 .map_err(|e| -> failure::Error { e.into() });
147
148 futures::stream::iter(Some(result))
149 .map_ok(|mut child| {
150 let stdout = child.stdout.take().unwrap();
151 let split = tokio::io::BufReader::new(stdout)
152 .split(b'\n'); tokio_stream::wrappers::SplitStream::new(split)
155 .map_err(|e| -> failure::Error { e.into() })
156 .chain(futures::stream::iter(Some(child))
157 .then(|mut child| async move {
158 let exit_status = child.wait().await?;
159 if exit_status.success() {
160 Ok(())
161 } else {
162 Err(ExitError {
163 exit_status,
164 }.into())
165 }
166 })
167 .try_filter_map(|_| async { Ok(None) }))
168 })
169 .try_flatten()
170 .and_then(|line| async {
171 parse_entry(&line)
172 .map_err(|e| ParseError {
173 raw: line.into(),
174 kind: e,
175 }.into())
176 })
177
178 }
179}
180
181fn parse_entry(line: &[u8]) -> Result<Entry, ParseErrorKind> {
182 let mut cur = 0;
183 let mut word = || -> Result<&[u8], ParseErrorKind> {
184 let len = line[cur..].iter().position(|c| c == &b' ')
185 .ok_or(ParseErrorKind::InvalidSyntax{})?;
186 let w = &line[cur..][..len];
187 cur += len;
188 while line.get(cur) == Some(&b' ') {
189 cur += 1;
190 }
191 Ok(w)
192 };
193
194 let mode = word()?;
195
196 let size = word()?
197 .iter()
198 .filter(|c| **c != b',')
199 .fold(Ok(0), |total, c| {
200 if b'0' <= *c && *c <= b'9' {
201 Ok(total? * 10 + (c - b'0') as u64)
202 } else {
203 Err(ParseErrorKind::InvalidSyntax{})
204 }
205 })?;
206
207 let lm_len = word()?.len() + word()?.len();
208 let last_modified = &line[cur-lm_len-2..cur-1];
209 let last_modified = std::str::from_utf8(last_modified)
210 .map_err(|_| ParseErrorKind::InvalidSyntax{})?;
211 let last_modified = chrono::DateTime::<chrono::Utc>::from_utc(
212 chrono::NaiveDateTime::parse_from_str(
213 &last_modified,
214 "%Y/%m/%d %T")
215 .map_err(|_| ParseErrorKind::InvalidSyntax{})?,
216 chrono::Utc);
217
218 let mut path = &line[cur..];
219 let mut target: &[u8] = &[];
220
221 if mode[0] == b'l' {
222 let link_start = path.find(b" -> ")
223 .ok_or(ParseErrorKind::InvalidSyntax{})?;
224 target = &path[link_start + 4..];
225 if target.find(b" -> ").is_some() {
226 return Err(ParseErrorKind::AmbiguousLink {
227 name_and_target: path.into(),
228 size,
229 last_modified,
230 })
231 }
232 path = &path[..link_start];
233 }
234
235 let r = Entry {
236 path: path_decode(path)?,
237 size,
238 last_modified,
239 kind: match mode[0] {
240 b'd' => Kind::Dir(Dir{}),
241 b'-' => Kind::File(File{}),
242 b'l' => Kind::Link(Link{
243 target: path_decode(target)?,
244 }),
245 other => return Err(ParseErrorKind::UnknownType {
246 char: other,
247 })
248 },
249 };
250
251 Ok(r)
252}
253
254
255fn path_decode(path: &[u8]) -> Result<Vec<u8>, ParseErrorKind> {
256 let mut r = Vec::with_capacity(path.len());
257 let mut i = 0;
258 while i < path.len() {
259 if path[i] != b'\\' {
260 r.push(path[i]);
261 i += 1;
262 continue
263 }
264
265 match path[i+1] {
266 b'#' => {
267 let c: u8 =
268 std::str::from_utf8(&path[i+2..][..3])
269 .map_err(|_| ParseErrorKind::InvalidSyntax{})?
270 .parse()
271 .map_err(|_| ParseErrorKind::InvalidSyntax{})?;
272 r.push(c);
273 i += 5;
274 }
275 char => return Err(ParseErrorKind::UnknownEscape{char}),
276 }
277 }
278 Ok(r)
279}
280
281#[derive(Debug,failure::Fail)]
282#[fail(display="rsync exited with failure: {:?}", exit_status)]
283#[non_exhaustive]
284pub struct ExitError {
285 pub exit_status: std::process::ExitStatus,
286}
287
288#[derive(Debug,failure::Fail)]
289#[fail(display="failed to parse {:?}: {:?}", raw, kind)]
290#[non_exhaustive]
291pub struct ParseError {
292 pub raw: Vec<u8>,
293
294 #[fail(cause)]
295 pub kind: ParseErrorKind,
296}
297
298#[derive(Debug,failure::Fail)]
299#[non_exhaustive]
300pub enum ParseErrorKind {
301 #[fail(display="ambiguous symlink")]
302 #[non_exhaustive]
303 AmbiguousLink {
304 name_and_target: Vec<u8>,
305 size: u64,
306 last_modified: chrono::DateTime<chrono::Utc>,
307 },
308
309 #[fail(display = "unexpected end of entry")]
310 #[non_exhaustive]
311 InvalidSyntax {},
312
313 #[fail(display="invalid escape: \\{:?}", char)]
314 #[non_exhaustive]
315 UnknownEscape {
316 char: u8,
317 },
318
319 #[fail(display="unknown entry type: {:?}", char)]
320 #[non_exhaustive]
321 UnknownType {
322 char: u8,
323 },
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_list() {
332 tokio::runtime::Runtime::new().unwrap().block_on(async {
333 let mut names = Vec::new();
334 let mut sizes = Vec::new();
335 let mut targets = Vec::new();
336
337 Config::new("./tests/fixtures/directory")
338 .list()
339 .for_each(|entry| {
340 let entry = entry.unwrap();
341
342 names.push(String::from_utf8_lossy(entry.path()).to_string());
343 sizes.push(match entry.kind {
344 Kind::File(_) | Kind::Link(_) => entry.size(),
345 Kind::Dir(_) => 0,
346 });
347 targets.push(match entry.kind {
348 Kind::Link(link) => {
349 Some(String::from_utf8_lossy(&link.target).to_string())
350 }
351 _ => None,
352 });
353
354 async {}
355 }).await;
356
357 assert_eq!(names, &[
358 "directory",
359 "directory/lin\x06ky",
360 "directory/some-file.txt",
361 "directory/subdirectory",
362 "directory/subdirectory/other-file.txt",
363 "directory/subdirectory/uplink",
364 ]);
365 assert_eq!(sizes, &[
366 0,
367 27,
368 18,
369 0,
370 11,
371 16,
372 ]);
373 assert_eq!(targets, &[
374 None,
375 Some("subdirectory/other-file.txt".into()),
376 None,
377 None,
378 None,
379 Some("../some-file.txt".into()),
380 ]);
381 })
382 }
383}