rustsync/
lib.rs

1//! An implementation of an rsync-like protocol (not compatible with
2//! rsync), in pure Rust.
3//!
4//! ```
5//! extern crate rand;
6//! extern crate rustsync;
7//! use rustsync::*;
8//! use rand::Rng;
9//! fn main() {
10//!   // Create 4 different random strings first.
11//!   let chunk_size = 1000;
12//!   let a = rand::thread_rng()
13//!           .gen_ascii_chars()
14//!           .take(chunk_size)
15//!           .collect::<String>();
16//!   let b = rand::thread_rng()
17//!           .gen_ascii_chars()
18//!           .take(50)
19//!           .collect::<String>();
20//!   let b_ = rand::thread_rng()
21//!           .gen_ascii_chars()
22//!           .take(100)
23//!           .collect::<String>();
24//!   let c = rand::thread_rng()
25//!           .gen_ascii_chars()
26//!           .take(chunk_size)
27//!           .collect::<String>();
28//!
29//!   // Now concatenate them in two different ways.
30//!
31//!   let mut source = a.clone() + &b + &c;
32//!   let mut modified = a + &b_ + &c;
33//!
34//!   // Suppose we want to download `modified`, and we already have
35//!   // `source`, which only differs by a few characters in the
36//!   // middle.
37//!
38//!   // We first have to choose a block size, which will be recorded
39//!   // in the signature below. Blocks should normally be much bigger
40//!   // than this in order to be efficient on large files.
41//!
42//!   let block = [0; 32];
43//!
44//!   // We then create a signature of `source`, to be uploaded to the
45//!   // remote machine. Signatures are typically much smaller than
46//!   // files, with just a few bytes per block.
47//!
48//!   let source_sig = signature(source.as_bytes(), block).unwrap();
49//!
50//!   // Then, we let the server compare our signature with their
51//!   // version.
52//!
53//!   let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
54//!
55//!   // We finally download the result of that comparison, and
56//!   // restore their file from that.
57//!
58//!   let mut restored = Vec::new();
59//!   restore_seek(&mut restored, std::io::Cursor::new(source.as_bytes()), vec![0; 1000], &comp).unwrap();
60//!   assert_eq!(&restored[..], modified.as_bytes())
61//! }
62//! ```
63
64extern crate adler32;
65extern crate blake2_rfc;
66extern crate futures;
67#[cfg(test)]
68extern crate rand;
69extern crate serde;
70#[macro_use]
71extern crate serde_derive;
72#[cfg(test)]
73extern crate tokio_core;
74extern crate tokio_io;
75
76use std::collections::HashMap;
77use std::io::{Read, Seek, SeekFrom, Write};
78use tokio_io::{AsyncRead, AsyncWrite};
79use tokio_io::io::{write_all, WriteAll};
80use futures::{Async, Future, Poll};
81
82const BLAKE2_SIZE: usize = 32;
83
84#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
85struct Blake2b([u8; BLAKE2_SIZE]);
86
87impl std::borrow::Borrow<[u8]> for Blake2b {
88    fn borrow(&self) -> &[u8] {
89        &self.0
90    }
91}
92
93#[derive(Debug, Serialize, Deserialize, PartialEq)]
94/// The "signature" of the file, which is essentially a
95/// content-indexed description of the blocks in the file.
96pub struct Signature {
97    pub window: usize,
98    chunks: HashMap<u32, HashMap<Blake2b, usize>>
99}
100
101/// Create the "signature" of a file, essentially a content-indexed
102/// map of blocks. The first step of the protocol is to run this
103/// function on the "source" (the remote file when downloading, the
104/// local file while uploading).
105pub fn signature<R: Read, B: AsRef<[u8]>+AsMut<[u8]>>(mut r: R, mut block: B) -> Result<Signature, std::io::Error> {
106    let mut chunks = HashMap::new();
107
108    let mut i = 0;
109    let block = block.as_mut();
110    let mut eof = false;
111    while !eof {
112        let mut j = 0;
113        while j < block.len() {
114            let r = r.read(&mut block[j..])?;
115            if r == 0 {
116                eof = true;
117                break;
118            }
119            j += r
120        }
121        let block = &block[..j];
122        let hash = adler32::RollingAdler32::from_buffer(block);
123        let mut blake2 = [0; BLAKE2_SIZE];
124        blake2.clone_from_slice(blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &block).as_bytes());
125        println!("{:?} {:?}", block, blake2);
126        chunks
127            .entry(hash.hash())
128            .or_insert(HashMap::new())
129            .insert(Blake2b(blake2), i);
130
131        i += block.len()
132    }
133
134    Ok(Signature {
135        window: block.len(),
136        chunks
137    })
138}
139
140pub struct ReadBlock<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
141    block: Option<(R, B)>,
142    first: usize,
143}
144
145impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> ReadBlock<R, B> {
146    fn new(r: R, block: B) -> Self {
147        ReadBlock {
148            block: Some((r, block)),
149            first: 0,
150        }
151    }
152}
153
154impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for ReadBlock<R, B> {
155    type Item = (R, B, usize);
156    type Error = std::io::Error;
157    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
158        loop {
159            if let Some((mut r, mut block)) = self.block.take() {
160                let n = {
161                    let block = block.as_mut();
162                    if self.first == block.len() {
163                        0
164                    } else {
165                        match r.read(&mut block[self.first..]) {
166                            Ok(n) => n,
167                            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
168                                return Ok(Async::NotReady)
169                            }
170                            Err(e) => return Err(e),
171                        }
172                    }
173                };
174                if n == 0 {
175                    return Ok(Async::Ready((r, block, self.first)));
176                } else {
177                    self.first += n;
178                    self.block = Some((r, block));
179                }
180            } else {
181                panic!("future polled after completion")
182            }
183        }
184    }
185}
186
187pub struct WriteBlock<W: AsyncWrite, B: AsRef<[u8]>> {
188    block: Option<(W, B)>,
189    first: usize,
190    len: usize,
191}
192
193impl<W: AsyncWrite, B: AsRef<[u8]>> WriteBlock<W, B> {
194    fn new(w: W, block: B, first: usize, len: usize) -> Self {
195        WriteBlock {
196            block: Some((w, block)),
197            first,
198            len,
199        }
200    }
201}
202
203impl<W: AsyncWrite, B: AsRef<[u8]>> Future for WriteBlock<W, B> {
204    type Item = (W, B);
205    type Error = std::io::Error;
206    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
207        loop {
208            if let Some((mut w, block)) = self.block.take() {
209                match w.write(&block.as_ref()[self.first..self.len]) {
210                    Ok(n) => {
211                        self.first += n;
212                        if self.first >= self.len {
213                            return Ok(Async::Ready((w, block)));
214                        } else {
215                            self.block = Some((w, block))
216                        }
217                    }
218                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
219                        return Ok(Async::NotReady)
220                    }
221                    Err(e) => return Err(e),
222                }
223            } else {
224                panic!("future polled after completion")
225            }
226        }
227    }
228}
229
230pub struct FutureSignature<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>> {
231    chunks: HashMap<u32, HashMap<Blake2b, usize>>,
232    i: usize,
233    eof: bool,
234    state: Option<ReadBlock<R, B>>,
235}
236
237impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureSignature<R, B> {
238    type Item = (R, Signature);
239    type Error = std::io::Error;
240    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
241        loop {
242            if let Some(mut reading) = self.state.take() {
243                if let Async::Ready((r, mut block, len)) = reading.poll()? {
244                    {
245                        let block_ = block.as_ref();
246                        self.eof = len < block_.len();
247                        let b = &block_[..len];
248                        let hash = adler32::RollingAdler32::from_buffer(b);
249                        let mut blake2 = [0; BLAKE2_SIZE];
250                        blake2.clone_from_slice(
251                            blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &b).as_bytes(),
252                        );
253                        self.chunks
254                            .entry(hash.hash())
255                            .or_insert(HashMap::new())
256                            .insert(Blake2b(blake2), self.i);
257                        self.i += block_.len();
258                    }
259                    if self.eof {
260                        return Ok(Async::Ready((
261                            r,
262                            Signature {
263                                chunks: std::mem::replace(&mut self.chunks, HashMap::new()),
264                                window: block.as_ref().len(),
265                            },
266                        )));
267                    } else {
268                        self.state = Some(ReadBlock::new(r, block))
269                    }
270                } else {
271                    self.state = Some(reading);
272                    return Ok(Async::NotReady);
273                }
274            }
275        }
276    }
277}
278
279/// This is the same as [`signature`](fn.signature.html), except that
280/// this function reads the input source asynchronously.
281pub fn signature_fut<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>>(r: R, b: B) -> FutureSignature<R, B> {
282    FutureSignature {
283        state: Some(ReadBlock::new(r, b)),
284        i: 0,
285        eof: false,
286        chunks: HashMap::new(),
287    }
288}
289
290#[derive(Debug, Serialize, Deserialize, PartialEq)]
291pub enum Block {
292    FromSource(u64),
293    Literal(Vec<u8>),
294}
295
296struct State {
297    result: Vec<Block>,
298    block_oldest: usize,
299    block_len: usize,
300    pending: Vec<u8>,
301}
302
303impl State {
304    fn new() -> Self {
305        State {
306            result: Vec::new(),
307            block_oldest: 0,
308            block_len: 1,
309            pending: Vec::new(),
310        }
311    }
312}
313
314#[derive(Default, Debug, PartialEq)]
315/// The result of comparing two files
316pub struct Delta {
317    /// Description of the new file in terms of blocks.
318    pub blocks: Vec<Block>,
319    /// Size of the window.
320    pub window: usize,
321}
322
323/// Compare a signature with an existing file. This is the second step
324/// of the protocol, `r` is the local file when downloading, and the
325/// remote file when uploading.
326///
327/// `block` must be a buffer the same size as `sig.window`.
328pub fn compare<R: Read, B:AsRef<[u8]>+AsMut<[u8]>>(sig: &Signature, mut r: R, mut block: B) -> Result<Delta, std::io::Error> {
329    let mut st = State::new();
330    let block = block.as_mut();
331    assert_eq!(block.len(), sig.window);
332    while st.block_len > 0 {
333        let mut hash = {
334            let mut j = 0;
335            let block = {
336                while j < sig.window {
337                    let r = r.read(&mut block[..])?;
338                    if r == 0 {
339                        break;
340                    }
341                    j += r
342                }
343                st.block_oldest = 0;
344                st.block_len = j;
345                &block[..j]
346            };
347            adler32::RollingAdler32::from_buffer(block)
348        };
349
350        // Starting from the current block (with hash `hash`), find
351        // the next block with a hash that appears in the signature.
352        loop {
353            if matches(&mut st, sig, &block, &hash) {
354                break;
355            }
356            // The blocks are not equal. Move the hash by one byte
357            // until finding an equal block.
358            let oldest = block[st.block_oldest];
359            hash.remove(st.block_len, oldest);
360            let r = r.read(&mut block[st.block_oldest..st.block_oldest + 1])?;
361            if r > 0 {
362                // If there are still bytes to read, update the hash.
363                hash.update(block[st.block_oldest]);
364            } else if st.block_len > 0 {
365                // Else, just shrink the window, so that the current
366                // block's blake2 hash can be compared with the
367                // signature.
368                st.block_len -= 1;
369            } else {
370                // We're done reading the file.
371                break;
372            }
373            st.pending.push(oldest);
374            st.block_oldest = (st.block_oldest + 1) % sig.window;
375        }
376        if !st.pending.is_empty() {
377            // We've reached the end of the file, and have never found
378            // a matching block again.
379            st.result.push(Block::Literal(std::mem::replace(
380                &mut st.pending,
381                Vec::new(),
382            )))
383        }
384    }
385    Ok(Delta {
386        blocks: st.result,
387        window: sig.window,
388    })
389}
390
391pub struct FutureCompare<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
392    state: Option<CompareState<R, B>>,
393    st: State,
394}
395
396enum CompareState<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
397    ReadBlock {
398        readblock: ReadBlock<R, B>,
399        sig: Signature,
400    },
401    FindNext {
402        hash: adler32::RollingAdler32,
403        r: R,
404        block: B,
405        sig: Signature,
406    },
407    FindNextRead {
408        hash: adler32::RollingAdler32,
409        reading: ReadBlock<R, [u8; 1]>,
410        block: B,
411        sig: Signature,
412    },
413    EndLiteralBlock {
414        r: R,
415        block: B,
416        sig: Signature,
417    },
418}
419
420impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureCompare<R, B> {
421    type Item = (R, Signature, Delta);
422    type Error = std::io::Error;
423    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
424        loop {
425            match self.state.take() {
426                Some(CompareState::ReadBlock { mut readblock, sig }) => {
427                    if let Async::Ready((r, block, len)) = readblock.poll()? {
428                        self.st.block_oldest = 0;
429                        self.st.block_len = len;
430                        let mut hash = adler32::RollingAdler32::from_buffer(&block.as_ref()[..len]);
431                        self.state = Some(CompareState::FindNext {
432                            hash,
433                            block,
434                            r,
435                            sig,
436                        });
437                    } else {
438                        self.state = Some(CompareState::ReadBlock { readblock, sig });
439                        return Ok(Async::NotReady);
440                    }
441                }
442                Some(CompareState::FindNext {
443                    mut hash,
444                    r,
445                    block,
446                    sig,
447                }) => {
448                    if matches(&mut self.st, &sig, block.as_ref(), &hash) {
449                        self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
450                    } else {
451                        let oldest = block.as_ref()[self.st.block_oldest];
452                        hash.remove(self.st.block_len, oldest);
453                        self.state = Some(CompareState::FindNextRead {
454                            hash,
455                            reading: ReadBlock::new(r, [0; 1]),
456                            block,
457                            sig,
458                        })
459                    }
460                }
461                Some(CompareState::FindNextRead {
462                    mut hash,
463                    mut reading,
464                    mut block,
465                    sig,
466                }) => {
467                    if let Async::Ready((r, b, len)) = reading.poll()? {
468                        if len > 0 || self.st.block_len > 0 {
469                            let oldest = block.as_ref()[self.st.block_oldest];
470                            if len > 0 {
471                                block.as_mut()[self.st.block_oldest] = b[0];
472                                hash.update(b[0])
473                            } else {
474                                self.st.block_len -= 1
475                            }
476
477                            self.st.pending.push(oldest);
478                            self.st.block_oldest = (self.st.block_oldest + 1) % sig.window;
479                            self.state = Some(CompareState::FindNext {
480                                hash,
481                                r,
482                                block,
483                                sig,
484                            })
485                        } else {
486                            self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
487                        }
488                    } else {
489                        self.state = Some(CompareState::FindNextRead {
490                            hash,
491                            reading,
492                            block,
493                            sig,
494                        })
495                    }
496                }
497                Some(CompareState::EndLiteralBlock { r, block, sig }) => {
498                    if !self.st.pending.is_empty() {
499                        // We've reached the end of the file, and have never found
500                        // a matching block again.
501                        self.st.result.push(Block::Literal(std::mem::replace(
502                            &mut self.st.pending,
503                            Vec::new(),
504                        )))
505                    }
506                    if self.st.block_len > 0 {
507                        self.state = Some(CompareState::ReadBlock {
508                            readblock: ReadBlock::new(r, block),
509                            sig,
510                        })
511                    } else {
512                        let window = sig.window;
513                        return Ok(Async::Ready((
514                            r,
515                            sig,
516                            Delta {
517                                blocks: std::mem::replace(&mut self.st.result, Vec::new()),
518                                window,
519                            },
520                        )));
521                    }
522                }
523                None => panic!(""),
524            }
525        }
526    }
527}
528
529/// Same as [`compare`](fn.compare.html), except that this function
530/// reads the file asynchronously.
531pub fn compare_fut<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>>(sig: Signature, r: R, block: B) -> FutureCompare<R, B> {
532    assert_eq!(block.as_ref().len(), sig.window);
533    FutureCompare {
534        state: Some(CompareState::ReadBlock {
535            readblock: ReadBlock::new(r, block),
536            sig,
537        }),
538        st: State::new(),
539    }
540}
541
542fn matches(st: &mut State, sig: &Signature, block: &[u8], hash: &adler32::RollingAdler32) -> bool {
543    if let Some(h) = sig.chunks.get(&hash.hash()) {
544        let blake2 = {
545            let mut b = blake2_rfc::blake2b::Blake2b::new(BLAKE2_SIZE);
546            if st.block_oldest + st.block_len > sig.window {
547                b.update(&block[st.block_oldest..]);
548                b.update(&block[..(st.block_oldest + st.block_len) % sig.window]);
549            } else {
550                b.update(&block[st.block_oldest..st.block_oldest + st.block_len])
551            }
552            b.finalize()
553        };
554
555        if let Some(&index) = h.get(blake2.as_bytes()) {
556            // Matching hash found! If we have non-matching
557            // material before the match, add it.
558            if !st.pending.is_empty() {
559                st.result.push(Block::Literal(std::mem::replace(
560                    &mut st.pending,
561                    Vec::new(),
562                )));
563            }
564            st.result.push(Block::FromSource(index as u64));
565            return true;
566        }
567    }
568    false
569}
570
571/// Restore a file, using a "delta" (resulting from
572/// [`compare`](fn.compare.html))
573pub fn restore<W: Write>(mut w: W, s: &[u8], delta: &Delta) -> Result<(), std::io::Error> {
574    for d in delta.blocks.iter() {
575        match *d {
576            Block::FromSource(i) => {
577                let i = i as usize;
578                if i + delta.window <= s.len() {
579                    w.write(&s[i..i + delta.window])?
580                } else {
581                    w.write(&s[i..])?
582                }
583            }
584            Block::Literal(ref l) => w.write(l)?,
585        };
586    }
587    Ok(())
588}
589
590/// Same as [`restore`](fn.restore.html), except that this function
591/// uses a seekable, readable stream instead of the entire file in a
592/// slice.
593///
594/// `buf` must be a buffer the same size as `sig.window`.
595pub fn restore_seek<W: Write, R: Read + Seek, B: AsRef<[u8]>+AsMut<[u8]>>(
596    mut w: W,
597    mut s: R,
598    mut buf: B,
599    delta: &Delta,
600) -> Result<(), std::io::Error> {
601    let buf = buf.as_mut();
602
603    for d in delta.blocks.iter() {
604        match *d {
605            Block::FromSource(i) => {
606                s.seek(SeekFrom::Start(i as u64))?;
607                // fill the buffer from r.
608                let mut n = 0;
609                loop {
610                    let r = s.read(&mut buf[n..delta.window])?;
611                    if r == 0 {
612                        break;
613                    }
614                    n += r
615                }
616                // write the buffer to w.
617                let mut m = 0;
618                while m < n {
619                    m += w.write(&buf[m..n])?;
620                }
621            }
622            Block::Literal(ref l) => {
623                w.write(l)?;
624            }
625        }
626    }
627    Ok(())
628}
629
630pub struct FutureRestore<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
631    state: Option<RestoreState<W, R, B>>,
632    delta: Delta,
633    delta_pos: usize,
634}
635
636enum RestoreState<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
637    Delta {
638        w: W,
639        r: R,
640        buf: B,
641    },
642    WriteBuf {
643        write: WriteBlock<W, B>,
644        r: R,
645    },
646    WriteVec {
647        write: WriteAll<W, Vec<u8>>,
648        r: R,
649        buf: B,
650    },
651}
652
653/// Same as [`restore_seek`](fn.restore_seek.html), except that this
654/// function writes its output asynchronously.
655pub fn restore_seek_fut<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>>(
656    w: W,
657    r: R,
658    buf: B,
659    delta: Delta,
660) -> FutureRestore<W, R, B> {
661    FutureRestore {
662        state: Some(RestoreState::Delta { w, r, buf }),
663        delta,
664        delta_pos: 0,
665    }
666}
667
668impl<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> Future
669    for FutureRestore<W, R, B> {
670    type Item = (W, R, Delta);
671    type Error = std::io::Error;
672    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
673        loop {
674            match self.state.take() {
675                Some(RestoreState::Delta { w, mut r, mut buf }) => {
676                    if self.delta_pos >= self.delta.blocks.len() {
677                        return Ok(Async::Ready((
678                            w,
679                            r,
680                            std::mem::replace(&mut self.delta, Delta::default()),
681                        )));
682                    }
683                    match self.delta.blocks[self.delta_pos] {
684                        Block::FromSource(i) => {
685                            r.seek(SeekFrom::Start(i as u64))?;
686                            // fill the buffer from r.
687                            let mut n = 0;
688                            {
689                                let buf_ = buf.as_mut();
690                                loop {
691                                    let k = r.read(&mut buf_[n..self.delta.window])?;
692                                    if k == 0 {
693                                        break;
694                                    }
695                                    n += k
696                                }
697                            }
698                            // write the buffer to w.
699                            self.state = Some(RestoreState::WriteBuf {
700                                write: WriteBlock::new(w, buf, 0, n),
701                                r,
702                            })
703                        }
704                        Block::Literal(ref mut l) => {
705                            let vec = std::mem::replace(l, Vec::new());
706                            self.state = Some(RestoreState::WriteVec {
707                                write: write_all(w, vec),
708                                r,
709                                buf,
710                            })
711                        }
712                    }
713                }
714                Some(RestoreState::WriteBuf { mut write, r }) => match write.poll()? {
715                    Async::Ready((w, buf)) => {
716                        self.delta_pos = self.delta_pos + 1;
717                        self.state = Some(RestoreState::Delta { w, r, buf })
718                    }
719                    Async::NotReady => {
720                        self.state = Some(RestoreState::WriteBuf { write, r });
721                        return Ok(Async::NotReady);
722                    }
723                },
724                Some(RestoreState::WriteVec { mut write, r, buf }) => match write.poll()? {
725                    Async::Ready((w, vec)) => {
726                        self.delta.blocks[self.delta_pos] = Block::Literal(vec);
727                        self.delta_pos += 1;
728                        self.state = Some(RestoreState::Delta { w, r, buf })
729                    }
730                    Async::NotReady => {
731                        self.state = Some(RestoreState::WriteVec { write, r, buf });
732                        return Ok(Async::NotReady);
733                    }
734                },
735                None => panic!(""),
736            }
737        }
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use rand;
744    use super::*;
745    use rand::Rng;
746    use tokio_core::reactor::Core;
747    const WINDOW: usize = 32;
748    #[test]
749    fn basic() {
750        for index in 0..10 {
751            let source = rand::thread_rng()
752                .gen_ascii_chars()
753                .take(WINDOW * 10 + 8)
754                .collect::<String>();
755            let mut modified = source.clone();
756            let index = WINDOW * index + 3;
757            unsafe {
758                modified.as_bytes_mut()[index] =
759                    ((source.as_bytes()[index] as usize + 1) & 255) as u8
760            }
761            let block = [0; WINDOW];
762            let source_sig = signature(source.as_bytes(), block).unwrap();
763            let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
764
765            let mut restored = Vec::new();
766            let source = std::io::Cursor::new(source.as_bytes());
767            restore_seek(&mut restored, source, [0; WINDOW], &comp).unwrap();
768            if &restored[..] != modified.as_bytes() {
769                for i in 0..10 {
770                    let a = &restored[i * WINDOW..(i + 1) * WINDOW];
771                    let b = &modified.as_bytes()[i * WINDOW..(i + 1) * WINDOW];
772                    println!("{:?}\n{:?}\n", a, b);
773                    if a != b {
774                        println!(">>>>>>>>");
775                    }
776                }
777                panic!("different");
778            }
779        }
780    }
781    #[test]
782    fn futures() {
783        let source = rand::thread_rng()
784            .gen_ascii_chars()
785            .take(WINDOW * 10 + 8)
786            .collect::<String>();
787        let mut modified = source.clone();
788        let index = WINDOW + 3;
789        unsafe {
790            modified.as_bytes_mut()[index] = ((source.as_bytes()[index] as usize + 1) & 255) as u8
791        }
792
793        let mut l = Core::new().unwrap();
794        let block = [0; WINDOW];
795        let source_sig = signature(source.as_bytes(), block).unwrap();
796        println!("==================\n");
797        let (_, source_sig_) = l.run(signature_fut(source.as_bytes(), block)).unwrap();
798        assert_eq!(source_sig, source_sig_);
799        println!("{:?} {:?}", source_sig, source_sig_);
800
801        let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
802        let (_, _, comp_) = l.run(compare_fut(source_sig, modified.as_bytes(), block)).unwrap();
803        assert_eq!(comp, comp_);
804        println!("{:?}", comp);
805
806        let v = Vec::new();
807        let (rest_, _, _) = l.run(restore_seek_fut(
808            std::io::Cursor::new(v),
809            std::io::Cursor::new(source.as_bytes()),
810            [0; WINDOW],
811            comp,
812        )).unwrap();
813        assert_eq!(rest_.into_inner().as_slice(), modified.as_bytes());
814    }
815}