syncfast/sync/
fs.rs

1//! Synchronization from and to local files.
2
3use cdchunking::{Chunker, ZPAQ};
4use futures::channel::mpsc::{Receiver, channel};
5use futures::sink::SinkExt;
6use futures::stream::StreamExt;
7use log::{log_enabled, debug, info};
8use log::Level::Debug;
9use std::cell::RefCell;
10use std::collections::VecDeque;
11use std::ffi::OsString;
12use std::fs::{File, OpenOptions};
13use std::future::Future;
14use std::io::{Seek, SeekFrom, Write};
15use std::ops::DerefMut;
16use std::path::{Path, PathBuf};
17use std::pin::Pin;
18use std::rc::Rc;
19use std::string::FromUtf8Error;
20
21use crate::{Error, HashDigest, temp_name, untemp_name};
22use crate::index::{MAX_BLOCK_SIZE, ZPAQ_BITS, Index};
23use crate::sync::{Destination, DestinationEvent, Source, SourceEvent};
24use crate::sync::utils::{Condition, ConditionFuture, move_file};
25
26fn read_block(path: &Path, offset: usize) -> Result<Vec<u8>, Error> {
27    let mut file = File::open(path)?;
28    file.seek(SeekFrom::Start(offset as u64))?;
29    let chunker = Chunker::new(
30        ZPAQ::new(ZPAQ_BITS),
31    ).max_size(MAX_BLOCK_SIZE);
32    let block = chunker.whole_chunks(file).next()
33        .unwrap_or(Err(
34            std::io::Error::new(
35                std::io::ErrorKind::InvalidData,
36                "No such chunk in file",
37            ),
38        ))?;
39    Ok(block)
40}
41
42fn write_block(
43    name: &Path,
44    offset: usize,
45    block: &[u8],
46) -> Result<(), Error> {
47    let mut file = OpenOptions::new().write(true).create(true).open(name)?;
48    file.seek(SeekFrom::Start(offset as u64))?;
49    file.write_all(block)?;
50    Ok(())
51}
52
53pub fn fs_source(root_dir: PathBuf) -> Result<Source, Error> {
54    info!("Indexing source into {:?}...", root_dir.join(".syncfast.idx"));
55    let mut index = Index::open(&root_dir.join(".syncfast.idx"))?;
56    index.index_path(&root_dir)?;
57    index.remove_missing_files(&root_dir)?;
58    index.commit()?;
59
60    // The source can't handle multiple input events, so we just implement
61    // a Stream, and use a channel for the Sink
62    debug!("FsSource: state=ListFiles");
63    let (sender, receiver) = channel(1);
64    Ok(Source {
65        // Stream generating events using FsSourceFrom::stream
66        stream: futures::stream::unfold(
67            Box::pin(FsSourceFrom {
68                index,
69                root_dir,
70                receiver,
71                state: FsSourceState::ListFiles(None),
72            }),
73            FsSourceFrom::stream,
74        ).boxed_local(),
75        // Simple Sink feeding the channel for the Stream to read
76        sink: Box::pin(futures::sink::unfold((), move |(), event: DestinationEvent| {
77            let mut sender = sender.clone();
78            async move {
79                sender.send(event).await.map_err(|_| Error::Io(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "FsSource channel is closed")))
80            }
81        })),
82    })
83}
84
85enum FsSourceState {
86    ListFiles(Option<VecDeque<(Vec<u8>, usize, HashDigest)>>),
87    Respond,
88    ListBlocks(VecDeque<(HashDigest, usize)>),
89    Done,
90}
91
92struct FsSourceFrom {
93    index: Index,
94    root_dir: PathBuf,
95    receiver: Receiver<DestinationEvent>,
96    state: FsSourceState,
97}
98
99impl FsSourceFrom {
100    fn project<'b>(self: &'b mut Pin<Box<Self>>) -> (&'b mut Index, &'b Path, Pin<&'b mut Receiver<DestinationEvent>>, &'b mut FsSourceState) {
101        unsafe { // Required for pin projection
102            let s = self.as_mut().get_unchecked_mut();
103            (
104                &mut s.index,
105                &mut s.root_dir,
106                Pin::new_unchecked(&mut s.receiver),
107                &mut s.state,
108            )
109        }
110    }
111
112    fn stream(mut stream: Pin<Box<FsSourceFrom>>) -> impl Future<Output=Option<(Result<SourceEvent, Error>, Pin<Box<FsSourceFrom>>)>> {
113        async {
114            let (index, root_dir, mut receiver, state) = stream.project();
115
116            macro_rules! err {
117                ($e:expr) => {
118                    Some((Err($e), stream))
119                }
120            }
121            // FIXME: Replace by try_block when supported by Rust
122            macro_rules! try_ {
123                ($v:expr) => {
124                    match $v {
125                        Ok(r) => r,
126                        Err(e) => return err!(e),
127                    }
128                }
129            }
130
131            match *state {
132                // Send files list
133                FsSourceState::ListFiles(ref mut list) => {
134                    // If we don't have data, fetch from database
135                    if list.is_none() {
136                        // FIXME: Don't get all files at once, iterate
137                        let files = try_!(index.list_files());
138                        let mut new_list = VecDeque::with_capacity(files.len());
139                        for (_file_id, path, _modified, size, blocks_hash) in files {
140                            let path = path
141                                .into_os_string()
142                                .into_string();
143                            let path = try_!(path.map_err(|_: OsString| Error::BadFilenameEncoding));
144                            let path = path.into_bytes();
145                            new_list.push_back((path, size as usize, blocks_hash));
146                        }
147                        debug!("FsSource: preparing to send {} files", new_list.len());
148                        *list = Some(new_list);
149                    }
150                    let list = list.as_mut().unwrap();
151                    match list.pop_front() {
152                        Some((path, size, blocks_hash)) => {
153                            if log_enabled!(Debug) {
154                                debug!("FsSource: send FileEntry({})", String::from_utf8_lossy(&path));
155                            }
156                            Some((Ok(SourceEvent::FileEntry(path, size, blocks_hash)), stream))
157                        }
158                        None => {
159                            debug!("FsSource: state=Respond");
160                            *state = FsSourceState::Respond;
161                            debug!("FsSource: send EndFiles");
162                            Some((Ok(SourceEvent::EndFiles), stream))
163                        }
164                    }
165                }
166                // Files are sent, respond to requests
167                FsSourceState::Respond => {
168                    let req = match receiver.as_mut().next().await {
169                        None => {
170                            debug!("FsSource: got end of input");
171                            return None;
172                        }
173                        Some(e) => e,
174                    };
175                    debug!("FsSource: recv {:?}", req);
176                    match req {
177                        DestinationEvent::GetFile(path) => {
178                            let path_str = try_!(
179                                String::from_utf8(path)
180                                    .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)
181                            );
182                            let (file_id, _modified, _blocks_hash) = match try_!(index.get_file(Path::new(&path_str))) {
183                                Some(t) => t,
184                                None => return err!(Error::Sync("Requested file is unknown".to_owned())),
185                            };
186                            debug!("FsSource: file_id={}", file_id);
187                            // FIXME: Don't get all blocks at once, iterate
188                            let blocks = try_!(index.list_file_blocks(file_id));
189                            let mut new_blocks = VecDeque::with_capacity(blocks.len());
190                            for (hash, _offset, size) in blocks {
191                                new_blocks.push_back((hash, size));
192                            }
193                            debug!("FsSource: state=ListBlocks");
194                            debug!("FsSource: preparing to send {} blocks", new_blocks.len());
195                            *state = FsSourceState::ListBlocks(new_blocks);
196                            debug!("FsSource: send FileStart");
197                            Some((Ok(SourceEvent::FileStart(path_str.into_bytes())), stream))
198                        }
199                        DestinationEvent::GetBlock(hash) => {
200                            let (path, offset, _size) = match try_!(index.get_block(&hash)) {
201                                Some(t) => t,
202                                None => return err!(Error::Sync("Requested block is unknown".to_owned())),
203                            };
204                            debug!("FsSource: found block in {:?} offset {}", path, offset);
205                            let data = try_!(read_block(&root_dir.join(&path), offset));
206                            debug!("FsSource: send BlockData");
207                            Some((Ok(SourceEvent::BlockData(hash, data)), stream))
208                        }
209                        DestinationEvent::Complete => {
210                            *state = FsSourceState::Done;
211                            debug!("FsSource: state=Done");
212                            None
213                        }
214                    }
215                }
216                // List blocks
217                FsSourceState::ListBlocks(ref mut list) => {
218                    match list.pop_front() {
219                        Some((hash, size)) => {
220                            debug!("FsSource: send FileBlock");
221                            Some((Ok(SourceEvent::FileBlock(hash, size)), stream))
222                        }
223                        None => {
224                            debug!("FsSource: out of blocks");
225                            debug!("FsSource: state=Respond");
226                            *state = FsSourceState::Respond;
227                            debug!("FsSource: send FileEnd");
228                            Some((Ok(SourceEvent::FileEnd), stream))
229                        }
230                    }
231                }
232                // Stream is done
233                FsSourceState::Done => None,
234            }
235        }
236    }
237}
238
239pub fn fs_destination(root_dir: PathBuf) -> Result<Destination, Error> {
240    info!(
241        "Indexing destination into {:?}...",
242        root_dir.join(".syncfast.idx")
243    );
244    std::fs::create_dir_all(&root_dir)?;
245    let mut index = Index::open(&root_dir.join(".syncfast.idx"))?;
246    index.index_path(&root_dir)?;
247    index.remove_missing_files(&root_dir)?;
248    index.commit()?;
249
250    // The destination has to handle input while producing output (for
251    // example getting BlockData while sending GetBlock), so it has both a
252    // custom Stream and Sink implementations
253    // State changes are triggered by Sink
254    let destination = Rc::new(RefCell::new(FsDestinationInner {
255        index,
256        root_dir,
257        state: FsDestinationState::FilesList { cond: Default::default() },
258    }));
259    debug!("FsDestination: state=FilesList");
260    Ok(Destination {
261        // Stream generating events using FsDestination::stream
262        stream: futures::stream::unfold(
263            destination.clone(),
264            FsDestinationInner::stream,
265        ).boxed_local(),
266        // Sink handling events using FsDestination::sink
267        sink: Box::pin(futures::sink::unfold(
268            destination,
269            FsDestinationInner::sink,
270        )),
271    })
272}
273
274struct FsDestinationInner {
275    index: Index,
276    root_dir: PathBuf,
277    state: FsDestinationState,
278}
279
280enum FsDestinationState {
281    FilesList {
282        /// Sink indicates state change (`SourceEvent::EndFiles`)
283        cond: Condition,
284    },
285    GetFiles {
286        /// List of files to request the blocks of
287        files_to_request: VecDeque<Vec<u8>>,
288        /// Number of files to receive
289        files_to_receive: usize,
290        /// Sink indicates state change (got `SourceEvent::FileEnd` and no more files_to_request)
291        cond: Condition,
292        /// file_id and offset for the blocks we're receiving (from previous FileStart)
293        file_blocks_id: Option<(u32, usize)>,
294    },
295    GetBlocks {
296        /// List of blocks to request, None if we've sent `DestinationEvent::Complete`
297        blocks_to_request: Option<VecDeque<HashDigest>>,
298        /// Number of blocks to receive
299        blocks_to_receive: usize,
300    },
301}
302
303impl FsDestinationInner {
304    fn stream(inner: Rc<RefCell<FsDestinationInner>>) -> impl Future<Output=Option<(Result<DestinationEvent, Error>, Rc<RefCell<FsDestinationInner>>)>> {
305        async move {
306            loop {
307                // This works around borrow issue: have to do stuff after inner.borrow_mut() ends
308                enum WhatToDo {
309                    Wait(ConditionFuture),
310                    Return(DestinationEvent),
311                }
312                let what_to_do = match inner.borrow_mut().state {
313                    // Receive files list
314                    FsDestinationState::FilesList { ref mut cond } => {
315                        // Nothing to produce, wait for state change
316                        WhatToDo::Wait(cond.wait())
317                    }
318                    // Request blocks for files
319                    FsDestinationState::GetFiles { ref mut files_to_request, ref mut cond, .. } => {
320                        match files_to_request.pop_front() {
321                            Some(name) => {
322                                if log_enabled!(Debug) {
323                                    debug!("FsDestination::stream: send GetFile({:?})", String::from_utf8_lossy(&name));
324                                }
325                                WhatToDo::Return(DestinationEvent::GetFile(name))
326                            }
327                            None => {
328                                debug!("FsDestination::stream: no more files, waiting...");
329                                WhatToDo::Wait(cond.wait())
330                            }
331                        }
332                    }
333                    // Request block data
334                    FsDestinationState::GetBlocks { ref mut blocks_to_request, .. } => {
335                        match blocks_to_request {
336                            Some(ref mut l) => match l.pop_front() {
337                                Some(hash) => {
338                                    debug!("FsDestination::stream: send GetBlock({})", hash);
339                                    WhatToDo::Return(DestinationEvent::GetBlock(hash))
340                                }
341                                None => {
342                                    debug!("FsDestination::stream: no more blocks, send Complete");
343                                    *blocks_to_request = None;
344                                    WhatToDo::Return(DestinationEvent::Complete)
345                                }
346                            }
347                            None => {
348                                debug!("FsDestination::stream: done");
349                                return None;
350                            }
351                        }
352                    }
353                };
354                match what_to_do {
355                    WhatToDo::Wait(cond) => cond.await,
356                    WhatToDo::Return(r) => return Some((Ok(r), inner)),
357                }
358            }
359        }
360    }
361
362    fn sink(inner: Rc<RefCell<FsDestinationInner>>, event: SourceEvent) -> impl Future<Output=Result<Rc<RefCell<FsDestinationInner>>, Error>> {
363        async move {
364            {
365                let mut inner_: std::cell::RefMut<FsDestinationInner> = inner.borrow_mut();
366                let inner_: &mut FsDestinationInner = inner_.deref_mut();
367
368                // Can't mutably borrow more than once
369                let mut new_state: Option<FsDestinationState> = None;
370                let state = &mut inner_.state;
371                let index = &mut inner_.index;
372                let root_dir = &inner_.root_dir;
373
374                debug!("FsDestination::sink: recv {:?}", event);
375
376                match state {
377                    // Receive files list
378                    FsDestinationState::FilesList { ref mut cond } => {
379                        match event {
380                            SourceEvent::FileEntry(path, _size, blocks_hash) => {
381                                let path: PathBuf = String::from_utf8(path)
382                                    .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)?
383                                    .into();
384                                let file = inner_.index.get_file(&path)?;
385                                let add = match file {
386                                    Some((_file_id, _modified, recorded_blocks_hash)) => {
387                                        if blocks_hash == recorded_blocks_hash {
388                                            debug!("FsDestination::sink:  file's blocks_hash matches");
389                                            false // File is up to date, do nothing
390                                        } else {
391                                            debug!("FsDestination::sink: file exists but blocks_hash differs");
392                                            true
393                                        }
394                                    }
395                                    None => {
396                                        debug!("FsDestination::sink: file doesn't exist");
397                                        true
398                                    }
399                                };
400                                if add {
401                                    // Create temporary file
402                                    inner_.index.add_temp_file(&path)?;
403                                    let temp_path = inner_.root_dir.join(temp_name(&path)?);
404                                    debug!("FsDestination::sink: creating temp file {:?}", temp_path);
405                                    if let Some(parent) = temp_path.parent() {
406                                        std::fs::create_dir_all(parent)?;
407                                    }
408                                    OpenOptions::new()
409                                        .write(true)
410                                        .truncate(true)
411                                        .create(true)
412                                        .open(temp_path)?;
413                                }
414                            }
415                            SourceEvent::EndFiles => {
416                                // FIXME: Don't get all files at once, iterate
417                                let mut files_to_request = VecDeque::new();
418                                for name in index.list_temp_files()? {
419                                    let name = untemp_name(&name)?;
420                                    let name = name
421                                        .into_os_string()
422                                        .into_string()
423                                        .map_err(|_: OsString| Error::BadFilenameEncoding)?
424                                        .into_bytes();
425                                    files_to_request.push_back(name);
426                                }
427                                if !files_to_request.is_empty() {
428                                    let files_to_receive = files_to_request.len();
429                                    debug!("FsDestination::sink: state=GetFiles({} files)", files_to_receive);
430                                    new_state = Some(FsDestinationState::GetFiles {
431                                        files_to_request,
432                                        files_to_receive,
433                                        cond: Default::default(),
434                                        file_blocks_id: None,
435                                    });
436                                } else {
437                                    debug!("FsDestination::sink: state=GetBlocks(0 blocks)");
438                                    new_state = Some(FsDestinationState::GetBlocks {
439                                        blocks_to_request: Some(VecDeque::new()),
440                                        blocks_to_receive: 0,
441                                    });
442                                }
443                                cond.set();
444                            }
445                            _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
446                        }
447                    }
448                    // Receive blocks for files
449                    FsDestinationState::GetFiles { ref mut cond, ref mut file_blocks_id, ref mut files_to_receive, .. } => {
450                        *file_blocks_id = match (*file_blocks_id, event) {
451                            (None, SourceEvent::FileStart(path)) => {
452                                let path: PathBuf = String::from_utf8(path)
453                                    .map_err(|_: FromUtf8Error| Error::BadFilenameEncoding)?
454                                    .into();
455                                let (file_id, _modified) = index.get_temp_file(&path)?
456                                    .ok_or(Error::Sync(format!("Unknown file {:?}", path)))?;
457                                Some((file_id, 0))
458                            }
459                            // FIXME: Don't need to capture all of them by ref,
460                            // but necessary for Rust 1.45
461                            (Some((file_id, offset)), SourceEvent::FileBlock(ref hash, ref size)) => {
462                                // See if we have this block, to copy it right now
463                                match index.get_block(&hash)? {
464                                    Some((from_path, from_offset, _from_size)) => {
465                                        let path = index.get_file_name(file_id)?;
466                                        let path = path.ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "File gone from index during sync"))?;
467                                        debug!("FsDestination::sink: Copying block from {:?} offset {:?}", from_path, from_offset);
468                                        let block = read_block(&root_dir.join(&from_path), from_offset)?;
469                                        write_block(&root_dir.join(&path), offset, &block)?;
470                                        index.add_block(&hash, file_id, offset, *size)?;
471                                    }
472                                    None => {
473                                        debug!("FsDestination::sink: Don't know that block");
474                                        index.add_missing_block(&hash, file_id, offset, *size)?;
475                                    }
476                                }
477                                Some((file_id, offset + size))
478                            }
479                            (Some((file_id, offset)), SourceEvent::FileEnd) => {
480                                index.set_file_size_and_compute_blocks_hash(file_id, offset)?;
481                                *files_to_receive -= 1;
482                                debug!("FsDestination::sink: {} files left to receive", *files_to_receive);
483                                if *files_to_receive == 0 {
484                                    // FIXME: Don't get all files at once, iterate
485                                    let mut blocks_to_request = VecDeque::new();
486                                    for hash in index.list_missing_blocks()? {
487                                        blocks_to_request.push_back(hash);
488                                    }
489                                    let blocks_to_receive = blocks_to_request.len();
490                                    debug!("FsDestination::sink: state=GetBlocks({} blocks)", blocks_to_receive);
491                                    new_state = Some(FsDestinationState::GetBlocks {
492                                        blocks_to_request: Some(blocks_to_request),
493                                        blocks_to_receive,
494                                    });
495                                    cond.set();
496                                }
497                                None
498                            }
499                            _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
500                        }
501                    }
502                    // Receiving block data
503                    FsDestinationState::GetBlocks { ref mut blocks_to_receive, .. } => {
504                        match event {
505                            SourceEvent::BlockData(hash, data) => {
506                                for (file_id, name, offset, _size) in index.list_block_locations(&hash)? {
507                                    debug!("FsDestination::sink: writing block to {:?} offset {}", name, offset);
508                                    write_block(&root_dir.join(&name), offset, &data)?;
509                                    index.mark_block_present(file_id, &hash, offset)?;
510                                }
511                                *blocks_to_receive -= 1;
512                                debug!("FsDestination::sink: {} blocks left to receive", *blocks_to_receive);
513                                if *blocks_to_receive == 0 {
514                                    Self::finish(root_dir, index)?;
515                                }
516                            }
517                            _ => return Err(Error::Sync("Unexpected message from source".to_owned())),
518                        }
519                    }
520                }
521                if let Some(s) = new_state {
522                    *state = s;
523                }
524            }
525            Ok(inner)
526        }
527    }
528
529    fn finish(root_dir: &Path, index: &mut Index) -> Result<(), Error> {
530        for (file_id, name, missing_blocks) in index.check_temp_files()? {
531            if missing_blocks {
532                return Err(Error::Sync(
533                    format!("Missing blocks in file {:?}", name),
534                ));
535            }
536
537            let final_name = untemp_name(&name)?;
538            debug!("FsDestination: moving {:?} to {:?}", name, final_name);
539
540            // Rename temporary file into destination
541            move_file(&root_dir.join(name), &root_dir.join(&final_name))?;
542
543            // Update index
544            index.move_temp_file_into_place(file_id, &final_name)?;
545        }
546        index.commit()?;
547        Ok(())
548    }
549}