1use cdchunking::{Chunker, ZPAQ};
4use futures::channel::mpsc::{Receiver, channel};
5use futures::sink::SinkExt;
6use futures::stream::StreamExt;
7use log::{log_enabled, debug, info};
8use log::Level::Debug;
9use std::cell::RefCell;
10use std::collections::VecDeque;
11use std::ffi::OsString;
12use std::fs::{File, OpenOptions};
13use std::future::Future;
14use std::io::{Seek, SeekFrom, Write};
15use std::ops::DerefMut;
16use std::path::{Path, PathBuf};
17use std::pin::Pin;
18use std::rc::Rc;
19use std::string::FromUtf8Error;
20
21use crate::{Error, HashDigest, temp_name, untemp_name};
22use crate::index::{MAX_BLOCK_SIZE, ZPAQ_BITS, Index};
23use crate::sync::{Destination, DestinationEvent, Source, SourceEvent};
24use crate::sync::utils::{Condition, ConditionFuture, move_file};
25
26fn read_block(path: &Path, offset: usize) -> Result<Vec<u8>, Error> {
27 let mut file = File::open(path)?;
28 file.seek(SeekFrom::Start(offset as u64))?;
29 let chunker = Chunker::new(
30 ZPAQ::new(ZPAQ_BITS),
31 ).max_size(MAX_BLOCK_SIZE);
32 let block = chunker.whole_chunks(file).next()
33 .unwrap_or(Err(
34 std::io::Error::new(
35 std::io::ErrorKind::InvalidData,
36 "No such chunk in file",
37 ),
38 ))?;
39 Ok(block)
40}
41
42fn write_block(
43 name: &Path,
44 offset: usize,
45 block: &[u8],
46) -> Result<(), Error> {
47 let mut file = OpenOptions::new().write(true).create(true).open(name)?;
48 file.seek(SeekFrom::Start(offset as u64))?;
49 file.write_all(block)?;
50 Ok(())
51}
52
53pub fn fs_source(root_dir: PathBuf) -> Result<Source, Error> {
54 info!("Indexing source into {:?}...", root_dir.join(".syncfast.idx"));
55 let mut index = Index::open(&root_dir.join(".syncfast.idx"))?;
56 index.index_path(&root_dir)?;
57 index.remove_missing_files(&root_dir)?;
58 index.commit()?;
59
60 debug!("FsSource: state=ListFiles");
63 let (sender, receiver) = channel(1);
64 Ok(Source {
65 stream: futures::stream::unfold(
67 Box::pin(FsSourceFrom {
68 index,
69 root_dir,
70 receiver,
71 state: FsSourceState::ListFiles(None),
72 }),
73 FsSourceFrom::stream,
74 ).boxed_local(),
75 sink: Box::pin(futures::sink::unfold((), move |(), event: DestinationEvent| {
77 let mut sender = sender.clone();
78 async move {
79 sender.send(event).await.map_err(|_| Error::Io(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "FsSource channel is closed")))
80 }
81 })),
82 })
83}
84
85enum FsSourceState {
86 ListFiles(Option<VecDeque<(Vec<u8>, usize, HashDigest)>>),
87 Respond,
88 ListBlocks(VecDeque<(HashDigest, usize)>),
89 Done,
90}
91
92struct FsSourceFrom {
93 index: Index,
94 root_dir: PathBuf,
95 receiver: Receiver<DestinationEvent>,
96 state: FsSourceState,
97}
98
99impl FsSourceFrom {
100 fn project<'b>(self: &'b mut Pin<Box<Self>>) -> (&'b mut Index, &'b Path, Pin<&'b mut Receiver<DestinationEvent>>, &'b mut FsSourceState) {
101 unsafe { let s = self.as_mut().get_unchecked_mut();
103 (
104 &mut s.index,
105 &mut s.root_dir,
106 Pin::new_unchecked(&mut s.receiver),
107 &mut s.state,
108 )
109 }
110 }
111
112 fn stream(mut stream: Pin<Box<FsSourceFrom>>) -> impl Future<Output=Option<(Result<SourceEvent, Error>, Pin<Box<FsSourceFrom>>)>> {
113 async {
114 let (index, root_dir, mut receiver, state) = stream.project();
115
116 macro_rules! err {
117 ($e:expr) => {
118 Some((Err($e), stream))
119 }
120 }
121 macro_rules! try_ {
123 ($v:expr) => {
124 match $v {
125 Ok(r) => r,
126 Err(e) => return err!(e),
127 }
128 }
129 }
130
131 match *state {
132 FsSourceState::ListFiles(ref mut list) => {
134 if list.is_none() {
136 let files = try_!(index.list_files());
138 let mut new_list = VecDeque::with_capacity(files.len());
139 for (_file_id, path, _modified, size, blocks_hash) in files {
140 let path = path
141 .into_os_string()
142 .into_string();
143 let path = try_!(path.map_err(|_: OsString| Error::BadFilenameEncoding));
144 let path = path.into_bytes();
145 new_list.push_back((path, size as usize, blocks_hash));
146 }
147 debug!("FsSource: preparing to send {} files", new_list.len());
148 *list = Some(new_list);
149 }
150 let list = list.as_mut().unwrap();
151 match list.pop_front() {
152 Some((path, size, blocks_hash)) => {
153 if log_enabled!(Debug) {
154 debug!("FsSource: send FileEntry({})", String::from_utf8_lossy(&path));
155 }
156 Some((Ok(SourceEvent::FileEntry(path, size, blocks_hash)), stream))
157 }
158 None => {
159 debug!("FsSource: state=Respond");
160 *state = FsSourceState::Respond;
161 debug!("FsSource: send EndFiles");
162 Some((Ok(SourceEvent::EndFiles), stream))
163 }
164 }
165 }
166 FsSourceState::Respond => {
168 let req = match receiver.as_mut().next().await {
169 None => {
170 debug!("FsSource: got end of input");
171 return None;
172 }
173 Some(e) => e,
174 };
175 debug!("FsSource: recv {:?}", req);
176 match req {
177 DestinationEvent::GetFile(path) => {
178 let path_str = try_!(
179 String::from_utf8(path)
180 .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)
181 );
182 let (file_id, _modified, _blocks_hash) = match try_!(index.get_file(Path::new(&path_str))) {
183 Some(t) => t,
184 None => return err!(Error::Sync("Requested file is unknown".to_owned())),
185 };
186 debug!("FsSource: file_id={}", file_id);
187 let blocks = try_!(index.list_file_blocks(file_id));
189 let mut new_blocks = VecDeque::with_capacity(blocks.len());
190 for (hash, _offset, size) in blocks {
191 new_blocks.push_back((hash, size));
192 }
193 debug!("FsSource: state=ListBlocks");
194 debug!("FsSource: preparing to send {} blocks", new_blocks.len());
195 *state = FsSourceState::ListBlocks(new_blocks);
196 debug!("FsSource: send FileStart");
197 Some((Ok(SourceEvent::FileStart(path_str.into_bytes())), stream))
198 }
199 DestinationEvent::GetBlock(hash) => {
200 let (path, offset, _size) = match try_!(index.get_block(&hash)) {
201 Some(t) => t,
202 None => return err!(Error::Sync("Requested block is unknown".to_owned())),
203 };
204 debug!("FsSource: found block in {:?} offset {}", path, offset);
205 let data = try_!(read_block(&root_dir.join(&path), offset));
206 debug!("FsSource: send BlockData");
207 Some((Ok(SourceEvent::BlockData(hash, data)), stream))
208 }
209 DestinationEvent::Complete => {
210 *state = FsSourceState::Done;
211 debug!("FsSource: state=Done");
212 None
213 }
214 }
215 }
216 FsSourceState::ListBlocks(ref mut list) => {
218 match list.pop_front() {
219 Some((hash, size)) => {
220 debug!("FsSource: send FileBlock");
221 Some((Ok(SourceEvent::FileBlock(hash, size)), stream))
222 }
223 None => {
224 debug!("FsSource: out of blocks");
225 debug!("FsSource: state=Respond");
226 *state = FsSourceState::Respond;
227 debug!("FsSource: send FileEnd");
228 Some((Ok(SourceEvent::FileEnd), stream))
229 }
230 }
231 }
232 FsSourceState::Done => None,
234 }
235 }
236 }
237}
238
239pub fn fs_destination(root_dir: PathBuf) -> Result<Destination, Error> {
240 info!(
241 "Indexing destination into {:?}...",
242 root_dir.join(".syncfast.idx")
243 );
244 std::fs::create_dir_all(&root_dir)?;
245 let mut index = Index::open(&root_dir.join(".syncfast.idx"))?;
246 index.index_path(&root_dir)?;
247 index.remove_missing_files(&root_dir)?;
248 index.commit()?;
249
250 let destination = Rc::new(RefCell::new(FsDestinationInner {
255 index,
256 root_dir,
257 state: FsDestinationState::FilesList { cond: Default::default() },
258 }));
259 debug!("FsDestination: state=FilesList");
260 Ok(Destination {
261 stream: futures::stream::unfold(
263 destination.clone(),
264 FsDestinationInner::stream,
265 ).boxed_local(),
266 sink: Box::pin(futures::sink::unfold(
268 destination,
269 FsDestinationInner::sink,
270 )),
271 })
272}
273
274struct FsDestinationInner {
275 index: Index,
276 root_dir: PathBuf,
277 state: FsDestinationState,
278}
279
280enum FsDestinationState {
281 FilesList {
282 cond: Condition,
284 },
285 GetFiles {
286 files_to_request: VecDeque<Vec<u8>>,
288 files_to_receive: usize,
290 cond: Condition,
292 file_blocks_id: Option<(u32, usize)>,
294 },
295 GetBlocks {
296 blocks_to_request: Option<VecDeque<HashDigest>>,
298 blocks_to_receive: usize,
300 },
301}
302
303impl FsDestinationInner {
304 fn stream(inner: Rc<RefCell<FsDestinationInner>>) -> impl Future<Output=Option<(Result<DestinationEvent, Error>, Rc<RefCell<FsDestinationInner>>)>> {
305 async move {
306 loop {
307 enum WhatToDo {
309 Wait(ConditionFuture),
310 Return(DestinationEvent),
311 }
312 let what_to_do = match inner.borrow_mut().state {
313 FsDestinationState::FilesList { ref mut cond } => {
315 WhatToDo::Wait(cond.wait())
317 }
318 FsDestinationState::GetFiles { ref mut files_to_request, ref mut cond, .. } => {
320 match files_to_request.pop_front() {
321 Some(name) => {
322 if log_enabled!(Debug) {
323 debug!("FsDestination::stream: send GetFile({:?})", String::from_utf8_lossy(&name));
324 }
325 WhatToDo::Return(DestinationEvent::GetFile(name))
326 }
327 None => {
328 debug!("FsDestination::stream: no more files, waiting...");
329 WhatToDo::Wait(cond.wait())
330 }
331 }
332 }
333 FsDestinationState::GetBlocks { ref mut blocks_to_request, .. } => {
335 match blocks_to_request {
336 Some(ref mut l) => match l.pop_front() {
337 Some(hash) => {
338 debug!("FsDestination::stream: send GetBlock({})", hash);
339 WhatToDo::Return(DestinationEvent::GetBlock(hash))
340 }
341 None => {
342 debug!("FsDestination::stream: no more blocks, send Complete");
343 *blocks_to_request = None;
344 WhatToDo::Return(DestinationEvent::Complete)
345 }
346 }
347 None => {
348 debug!("FsDestination::stream: done");
349 return None;
350 }
351 }
352 }
353 };
354 match what_to_do {
355 WhatToDo::Wait(cond) => cond.await,
356 WhatToDo::Return(r) => return Some((Ok(r), inner)),
357 }
358 }
359 }
360 }
361
362 fn sink(inner: Rc<RefCell<FsDestinationInner>>, event: SourceEvent) -> impl Future<Output=Result<Rc<RefCell<FsDestinationInner>>, Error>> {
363 async move {
364 {
365 let mut inner_: std::cell::RefMut<FsDestinationInner> = inner.borrow_mut();
366 let inner_: &mut FsDestinationInner = inner_.deref_mut();
367
368 let mut new_state: Option<FsDestinationState> = None;
370 let state = &mut inner_.state;
371 let index = &mut inner_.index;
372 let root_dir = &inner_.root_dir;
373
374 debug!("FsDestination::sink: recv {:?}", event);
375
376 match state {
377 FsDestinationState::FilesList { ref mut cond } => {
379 match event {
380 SourceEvent::FileEntry(path, _size, blocks_hash) => {
381 let path: PathBuf = String::from_utf8(path)
382 .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)?
383 .into();
384 let file = inner_.index.get_file(&path)?;
385 let add = match file {
386 Some((_file_id, _modified, recorded_blocks_hash)) => {
387 if blocks_hash == recorded_blocks_hash {
388 debug!("FsDestination::sink: file's blocks_hash matches");
389 false } else {
391 debug!("FsDestination::sink: file exists but blocks_hash differs");
392 true
393 }
394 }
395 None => {
396 debug!("FsDestination::sink: file doesn't exist");
397 true
398 }
399 };
400 if add {
401 inner_.index.add_temp_file(&path)?;
403 let temp_path = inner_.root_dir.join(temp_name(&path)?);
404 debug!("FsDestination::sink: creating temp file {:?}", temp_path);
405 if let Some(parent) = temp_path.parent() {
406 std::fs::create_dir_all(parent)?;
407 }
408 OpenOptions::new()
409 .write(true)
410 .truncate(true)
411 .create(true)
412 .open(temp_path)?;
413 }
414 }
415 SourceEvent::EndFiles => {
416 let mut files_to_request = VecDeque::new();
418 for name in index.list_temp_files()? {
419 let name = untemp_name(&name)?;
420 let name = name
421 .into_os_string()
422 .into_string()
423 .map_err(|_: OsString| Error::BadFilenameEncoding)?
424 .into_bytes();
425 files_to_request.push_back(name);
426 }
427 if !files_to_request.is_empty() {
428 let files_to_receive = files_to_request.len();
429 debug!("FsDestination::sink: state=GetFiles({} files)", files_to_receive);
430 new_state = Some(FsDestinationState::GetFiles {
431 files_to_request,
432 files_to_receive,
433 cond: Default::default(),
434 file_blocks_id: None,
435 });
436 } else {
437 debug!("FsDestination::sink: state=GetBlocks(0 blocks)");
438 new_state = Some(FsDestinationState::GetBlocks {
439 blocks_to_request: Some(VecDeque::new()),
440 blocks_to_receive: 0,
441 });
442 }
443 cond.set();
444 }
445 _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
446 }
447 }
448 FsDestinationState::GetFiles { ref mut cond, ref mut file_blocks_id, ref mut files_to_receive, .. } => {
450 *file_blocks_id = match (*file_blocks_id, event) {
451 (None, SourceEvent::FileStart(path)) => {
452 let path: PathBuf = String::from_utf8(path)
453 .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)?
454 .into();
455 let (file_id, _modified) = index.get_temp_file(&path)?
456 .ok_or(Error::Sync(format!("Unknown file {:?}", path)))?;
457 Some((file_id, 0))
458 }
459 (Some((file_id, offset)), SourceEvent::FileBlock(ref hash, ref size)) => {
462 match index.get_block(&hash)? {
464 Some((from_path, from_offset, _from_size)) => {
465 let path = index.get_file_name(file_id)?;
466 let path = path.ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "File gone from index during sync"))?;
467 debug!("FsDestination::sink: Copying block from {:?} offset {:?}", from_path, from_offset);
468 let block = read_block(&root_dir.join(&from_path), from_offset)?;
469 write_block(&root_dir.join(&path), offset, &block)?;
470 index.add_block(&hash, file_id, offset, *size)?;
471 }
472 None => {
473 debug!("FsDestination::sink: Don't know that block");
474 index.add_missing_block(&hash, file_id, offset, *size)?;
475 }
476 }
477 Some((file_id, offset + size))
478 }
479 (Some((file_id, offset)), SourceEvent::FileEnd) => {
480 index.set_file_size_and_compute_blocks_hash(file_id, offset)?;
481 *files_to_receive -= 1;
482 debug!("FsDestination::sink: {} files left to receive", *files_to_receive);
483 if *files_to_receive == 0 {
484 let mut blocks_to_request = VecDeque::new();
486 for hash in index.list_missing_blocks()? {
487 blocks_to_request.push_back(hash);
488 }
489 let blocks_to_receive = blocks_to_request.len();
490 debug!("FsDestination::sink: state=GetBlocks({} blocks)", blocks_to_receive);
491 new_state = Some(FsDestinationState::GetBlocks {
492 blocks_to_request: Some(blocks_to_request),
493 blocks_to_receive,
494 });
495 cond.set();
496 }
497 None
498 }
499 _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
500 }
501 }
502 FsDestinationState::GetBlocks { ref mut blocks_to_receive, .. } => {
504 match event {
505 SourceEvent::BlockData(hash, data) => {
506 for (file_id, name, offset, _size) in index.list_block_locations(&hash)? {
507 debug!("FsDestination::sink: writing block to {:?} offset {}", name, offset);
508 write_block(&root_dir.join(&name), offset, &data)?;
509 index.mark_block_present(file_id, &hash, offset)?;
510 }
511 *blocks_to_receive -= 1;
512 debug!("FsDestination::sink: {} blocks left to receive", *blocks_to_receive);
513 if *blocks_to_receive == 0 {
514 Self::finish(root_dir, index)?;
515 }
516 }
517 _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
518 }
519 }
520 }
521 if let Some(s) = new_state {
522 *state = s;
523 }
524 }
525 Ok(inner)
526 }
527 }
528
529 fn finish(root_dir: &Path, index: &mut Index) -> Result<(), Error> {
530 for (file_id, name, missing_blocks) in index.check_temp_files()? {
531 if missing_blocks {
532 return Err(Error::Sync(
533 format!("Missing blocks in file {:?}", name),
534 ));
535 }
536
537 let final_name = untemp_name(&name)?;
538 debug!("FsDestination: moving {:?} to {:?}", name, final_name);
539
540 move_file(&root_dir.join(name), &root_dir.join(&final_name))?;
542
543 index.move_temp_file_into_place(file_id, &final_name)?;
545 }
546 index.commit()?;
547 Ok(())
548 }
549}