1extern crate adler32;
65extern crate blake2_rfc;
66extern crate futures;
67#[cfg(test)]
68extern crate rand;
69extern crate serde;
70#[macro_use]
71extern crate serde_derive;
72#[cfg(test)]
73extern crate tokio_core;
74extern crate tokio_io;
75
76use std::collections::HashMap;
77use std::io::{Read, Seek, SeekFrom, Write};
78use tokio_io::{AsyncRead, AsyncWrite};
79use tokio_io::io::{write_all, WriteAll};
80use futures::{Async, Future, Poll};
81
82const BLAKE2_SIZE: usize = 32;
83
84#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
85struct Blake2b([u8; BLAKE2_SIZE]);
86
87impl std::borrow::Borrow<[u8]> for Blake2b {
88 fn borrow(&self) -> &[u8] {
89 &self.0
90 }
91}
92
93#[derive(Debug, Serialize, Deserialize, PartialEq)]
94pub struct Signature {
97 pub window: usize,
98 chunks: HashMap<u32, HashMap<Blake2b, usize>>
99}
100
101pub fn signature<R: Read, B: AsRef<[u8]>+AsMut<[u8]>>(mut r: R, mut block: B) -> Result<Signature, std::io::Error> {
106 let mut chunks = HashMap::new();
107
108 let mut i = 0;
109 let block = block.as_mut();
110 let mut eof = false;
111 while !eof {
112 let mut j = 0;
113 while j < block.len() {
114 let r = r.read(&mut block[j..])?;
115 if r == 0 {
116 eof = true;
117 break;
118 }
119 j += r
120 }
121 let block = &block[..j];
122 let hash = adler32::RollingAdler32::from_buffer(block);
123 let mut blake2 = [0; BLAKE2_SIZE];
124 blake2.clone_from_slice(blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &block).as_bytes());
125 println!("{:?} {:?}", block, blake2);
126 chunks
127 .entry(hash.hash())
128 .or_insert(HashMap::new())
129 .insert(Blake2b(blake2), i);
130
131 i += block.len()
132 }
133
134 Ok(Signature {
135 window: block.len(),
136 chunks
137 })
138}
139
140pub struct ReadBlock<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
141 block: Option<(R, B)>,
142 first: usize,
143}
144
145impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> ReadBlock<R, B> {
146 fn new(r: R, block: B) -> Self {
147 ReadBlock {
148 block: Some((r, block)),
149 first: 0,
150 }
151 }
152}
153
154impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for ReadBlock<R, B> {
155 type Item = (R, B, usize);
156 type Error = std::io::Error;
157 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
158 loop {
159 if let Some((mut r, mut block)) = self.block.take() {
160 let n = {
161 let block = block.as_mut();
162 if self.first == block.len() {
163 0
164 } else {
165 match r.read(&mut block[self.first..]) {
166 Ok(n) => n,
167 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
168 return Ok(Async::NotReady)
169 }
170 Err(e) => return Err(e),
171 }
172 }
173 };
174 if n == 0 {
175 return Ok(Async::Ready((r, block, self.first)));
176 } else {
177 self.first += n;
178 self.block = Some((r, block));
179 }
180 } else {
181 panic!("future polled after completion")
182 }
183 }
184 }
185}
186
187pub struct WriteBlock<W: AsyncWrite, B: AsRef<[u8]>> {
188 block: Option<(W, B)>,
189 first: usize,
190 len: usize,
191}
192
193impl<W: AsyncWrite, B: AsRef<[u8]>> WriteBlock<W, B> {
194 fn new(w: W, block: B, first: usize, len: usize) -> Self {
195 WriteBlock {
196 block: Some((w, block)),
197 first,
198 len,
199 }
200 }
201}
202
203impl<W: AsyncWrite, B: AsRef<[u8]>> Future for WriteBlock<W, B> {
204 type Item = (W, B);
205 type Error = std::io::Error;
206 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
207 loop {
208 if let Some((mut w, block)) = self.block.take() {
209 match w.write(&block.as_ref()[self.first..self.len]) {
210 Ok(n) => {
211 self.first += n;
212 if self.first >= self.len {
213 return Ok(Async::Ready((w, block)));
214 } else {
215 self.block = Some((w, block))
216 }
217 }
218 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
219 return Ok(Async::NotReady)
220 }
221 Err(e) => return Err(e),
222 }
223 } else {
224 panic!("future polled after completion")
225 }
226 }
227 }
228}
229
230pub struct FutureSignature<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>> {
231 chunks: HashMap<u32, HashMap<Blake2b, usize>>,
232 i: usize,
233 eof: bool,
234 state: Option<ReadBlock<R, B>>,
235}
236
237impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureSignature<R, B> {
238 type Item = (R, Signature);
239 type Error = std::io::Error;
240 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
241 loop {
242 if let Some(mut reading) = self.state.take() {
243 if let Async::Ready((r, mut block, len)) = reading.poll()? {
244 {
245 let block_ = block.as_ref();
246 self.eof = len < block_.len();
247 let b = &block_[..len];
248 let hash = adler32::RollingAdler32::from_buffer(b);
249 let mut blake2 = [0; BLAKE2_SIZE];
250 blake2.clone_from_slice(
251 blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &b).as_bytes(),
252 );
253 self.chunks
254 .entry(hash.hash())
255 .or_insert(HashMap::new())
256 .insert(Blake2b(blake2), self.i);
257 self.i += block_.len();
258 }
259 if self.eof {
260 return Ok(Async::Ready((
261 r,
262 Signature {
263 chunks: std::mem::replace(&mut self.chunks, HashMap::new()),
264 window: block.as_ref().len(),
265 },
266 )));
267 } else {
268 self.state = Some(ReadBlock::new(r, block))
269 }
270 } else {
271 self.state = Some(reading);
272 return Ok(Async::NotReady);
273 }
274 }
275 }
276 }
277}
278
279pub fn signature_fut<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>>(r: R, b: B) -> FutureSignature<R, B> {
282 FutureSignature {
283 state: Some(ReadBlock::new(r, b)),
284 i: 0,
285 eof: false,
286 chunks: HashMap::new(),
287 }
288}
289
290#[derive(Debug, Serialize, Deserialize, PartialEq)]
291pub enum Block {
292 FromSource(u64),
293 Literal(Vec<u8>),
294}
295
296struct State {
297 result: Vec<Block>,
298 block_oldest: usize,
299 block_len: usize,
300 pending: Vec<u8>,
301}
302
303impl State {
304 fn new() -> Self {
305 State {
306 result: Vec::new(),
307 block_oldest: 0,
308 block_len: 1,
309 pending: Vec::new(),
310 }
311 }
312}
313
314#[derive(Default, Debug, PartialEq)]
315pub struct Delta {
317 pub blocks: Vec<Block>,
319 pub window: usize,
321}
322
323pub fn compare<R: Read, B:AsRef<[u8]>+AsMut<[u8]>>(sig: &Signature, mut r: R, mut block: B) -> Result<Delta, std::io::Error> {
329 let mut st = State::new();
330 let block = block.as_mut();
331 assert_eq!(block.len(), sig.window);
332 while st.block_len > 0 {
333 let mut hash = {
334 let mut j = 0;
335 let block = {
336 while j < sig.window {
337 let r = r.read(&mut block[..])?;
338 if r == 0 {
339 break;
340 }
341 j += r
342 }
343 st.block_oldest = 0;
344 st.block_len = j;
345 &block[..j]
346 };
347 adler32::RollingAdler32::from_buffer(block)
348 };
349
350 loop {
353 if matches(&mut st, sig, &block, &hash) {
354 break;
355 }
356 let oldest = block[st.block_oldest];
359 hash.remove(st.block_len, oldest);
360 let r = r.read(&mut block[st.block_oldest..st.block_oldest + 1])?;
361 if r > 0 {
362 hash.update(block[st.block_oldest]);
364 } else if st.block_len > 0 {
365 st.block_len -= 1;
369 } else {
370 break;
372 }
373 st.pending.push(oldest);
374 st.block_oldest = (st.block_oldest + 1) % sig.window;
375 }
376 if !st.pending.is_empty() {
377 st.result.push(Block::Literal(std::mem::replace(
380 &mut st.pending,
381 Vec::new(),
382 )))
383 }
384 }
385 Ok(Delta {
386 blocks: st.result,
387 window: sig.window,
388 })
389}
390
391pub struct FutureCompare<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
392 state: Option<CompareState<R, B>>,
393 st: State,
394}
395
396enum CompareState<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
397 ReadBlock {
398 readblock: ReadBlock<R, B>,
399 sig: Signature,
400 },
401 FindNext {
402 hash: adler32::RollingAdler32,
403 r: R,
404 block: B,
405 sig: Signature,
406 },
407 FindNextRead {
408 hash: adler32::RollingAdler32,
409 reading: ReadBlock<R, [u8; 1]>,
410 block: B,
411 sig: Signature,
412 },
413 EndLiteralBlock {
414 r: R,
415 block: B,
416 sig: Signature,
417 },
418}
419
420impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureCompare<R, B> {
421 type Item = (R, Signature, Delta);
422 type Error = std::io::Error;
423 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
424 loop {
425 match self.state.take() {
426 Some(CompareState::ReadBlock { mut readblock, sig }) => {
427 if let Async::Ready((r, block, len)) = readblock.poll()? {
428 self.st.block_oldest = 0;
429 self.st.block_len = len;
430 let mut hash = adler32::RollingAdler32::from_buffer(&block.as_ref()[..len]);
431 self.state = Some(CompareState::FindNext {
432 hash,
433 block,
434 r,
435 sig,
436 });
437 } else {
438 self.state = Some(CompareState::ReadBlock { readblock, sig });
439 return Ok(Async::NotReady);
440 }
441 }
442 Some(CompareState::FindNext {
443 mut hash,
444 r,
445 block,
446 sig,
447 }) => {
448 if matches(&mut self.st, &sig, block.as_ref(), &hash) {
449 self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
450 } else {
451 let oldest = block.as_ref()[self.st.block_oldest];
452 hash.remove(self.st.block_len, oldest);
453 self.state = Some(CompareState::FindNextRead {
454 hash,
455 reading: ReadBlock::new(r, [0; 1]),
456 block,
457 sig,
458 })
459 }
460 }
461 Some(CompareState::FindNextRead {
462 mut hash,
463 mut reading,
464 mut block,
465 sig,
466 }) => {
467 if let Async::Ready((r, b, len)) = reading.poll()? {
468 if len > 0 || self.st.block_len > 0 {
469 let oldest = block.as_ref()[self.st.block_oldest];
470 if len > 0 {
471 block.as_mut()[self.st.block_oldest] = b[0];
472 hash.update(b[0])
473 } else {
474 self.st.block_len -= 1
475 }
476
477 self.st.pending.push(oldest);
478 self.st.block_oldest = (self.st.block_oldest + 1) % sig.window;
479 self.state = Some(CompareState::FindNext {
480 hash,
481 r,
482 block,
483 sig,
484 })
485 } else {
486 self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
487 }
488 } else {
489 self.state = Some(CompareState::FindNextRead {
490 hash,
491 reading,
492 block,
493 sig,
494 })
495 }
496 }
497 Some(CompareState::EndLiteralBlock { r, block, sig }) => {
498 if !self.st.pending.is_empty() {
499 self.st.result.push(Block::Literal(std::mem::replace(
502 &mut self.st.pending,
503 Vec::new(),
504 )))
505 }
506 if self.st.block_len > 0 {
507 self.state = Some(CompareState::ReadBlock {
508 readblock: ReadBlock::new(r, block),
509 sig,
510 })
511 } else {
512 let window = sig.window;
513 return Ok(Async::Ready((
514 r,
515 sig,
516 Delta {
517 blocks: std::mem::replace(&mut self.st.result, Vec::new()),
518 window,
519 },
520 )));
521 }
522 }
523 None => panic!(""),
524 }
525 }
526 }
527}
528
529pub fn compare_fut<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>>(sig: Signature, r: R, block: B) -> FutureCompare<R, B> {
532 assert_eq!(block.as_ref().len(), sig.window);
533 FutureCompare {
534 state: Some(CompareState::ReadBlock {
535 readblock: ReadBlock::new(r, block),
536 sig,
537 }),
538 st: State::new(),
539 }
540}
541
542fn matches(st: &mut State, sig: &Signature, block: &[u8], hash: &adler32::RollingAdler32) -> bool {
543 if let Some(h) = sig.chunks.get(&hash.hash()) {
544 let blake2 = {
545 let mut b = blake2_rfc::blake2b::Blake2b::new(BLAKE2_SIZE);
546 if st.block_oldest + st.block_len > sig.window {
547 b.update(&block[st.block_oldest..]);
548 b.update(&block[..(st.block_oldest + st.block_len) % sig.window]);
549 } else {
550 b.update(&block[st.block_oldest..st.block_oldest + st.block_len])
551 }
552 b.finalize()
553 };
554
555 if let Some(&index) = h.get(blake2.as_bytes()) {
556 if !st.pending.is_empty() {
559 st.result.push(Block::Literal(std::mem::replace(
560 &mut st.pending,
561 Vec::new(),
562 )));
563 }
564 st.result.push(Block::FromSource(index as u64));
565 return true;
566 }
567 }
568 false
569}
570
571pub fn restore<W: Write>(mut w: W, s: &[u8], delta: &Delta) -> Result<(), std::io::Error> {
574 for d in delta.blocks.iter() {
575 match *d {
576 Block::FromSource(i) => {
577 let i = i as usize;
578 if i + delta.window <= s.len() {
579 w.write(&s[i..i + delta.window])?
580 } else {
581 w.write(&s[i..])?
582 }
583 }
584 Block::Literal(ref l) => w.write(l)?,
585 };
586 }
587 Ok(())
588}
589
590pub fn restore_seek<W: Write, R: Read + Seek, B: AsRef<[u8]>+AsMut<[u8]>>(
596 mut w: W,
597 mut s: R,
598 mut buf: B,
599 delta: &Delta,
600) -> Result<(), std::io::Error> {
601 let buf = buf.as_mut();
602
603 for d in delta.blocks.iter() {
604 match *d {
605 Block::FromSource(i) => {
606 s.seek(SeekFrom::Start(i as u64))?;
607 let mut n = 0;
609 loop {
610 let r = s.read(&mut buf[n..delta.window])?;
611 if r == 0 {
612 break;
613 }
614 n += r
615 }
616 let mut m = 0;
618 while m < n {
619 m += w.write(&buf[m..n])?;
620 }
621 }
622 Block::Literal(ref l) => {
623 w.write(l)?;
624 }
625 }
626 }
627 Ok(())
628}
629
630pub struct FutureRestore<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
631 state: Option<RestoreState<W, R, B>>,
632 delta: Delta,
633 delta_pos: usize,
634}
635
636enum RestoreState<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
637 Delta {
638 w: W,
639 r: R,
640 buf: B,
641 },
642 WriteBuf {
643 write: WriteBlock<W, B>,
644 r: R,
645 },
646 WriteVec {
647 write: WriteAll<W, Vec<u8>>,
648 r: R,
649 buf: B,
650 },
651}
652
653pub fn restore_seek_fut<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>>(
656 w: W,
657 r: R,
658 buf: B,
659 delta: Delta,
660) -> FutureRestore<W, R, B> {
661 FutureRestore {
662 state: Some(RestoreState::Delta { w, r, buf }),
663 delta,
664 delta_pos: 0,
665 }
666}
667
668impl<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> Future
669 for FutureRestore<W, R, B> {
670 type Item = (W, R, Delta);
671 type Error = std::io::Error;
672 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
673 loop {
674 match self.state.take() {
675 Some(RestoreState::Delta { w, mut r, mut buf }) => {
676 if self.delta_pos >= self.delta.blocks.len() {
677 return Ok(Async::Ready((
678 w,
679 r,
680 std::mem::replace(&mut self.delta, Delta::default()),
681 )));
682 }
683 match self.delta.blocks[self.delta_pos] {
684 Block::FromSource(i) => {
685 r.seek(SeekFrom::Start(i as u64))?;
686 let mut n = 0;
688 {
689 let buf_ = buf.as_mut();
690 loop {
691 let k = r.read(&mut buf_[n..self.delta.window])?;
692 if k == 0 {
693 break;
694 }
695 n += k
696 }
697 }
698 self.state = Some(RestoreState::WriteBuf {
700 write: WriteBlock::new(w, buf, 0, n),
701 r,
702 })
703 }
704 Block::Literal(ref mut l) => {
705 let vec = std::mem::replace(l, Vec::new());
706 self.state = Some(RestoreState::WriteVec {
707 write: write_all(w, vec),
708 r,
709 buf,
710 })
711 }
712 }
713 }
714 Some(RestoreState::WriteBuf { mut write, r }) => match write.poll()? {
715 Async::Ready((w, buf)) => {
716 self.delta_pos = self.delta_pos + 1;
717 self.state = Some(RestoreState::Delta { w, r, buf })
718 }
719 Async::NotReady => {
720 self.state = Some(RestoreState::WriteBuf { write, r });
721 return Ok(Async::NotReady);
722 }
723 },
724 Some(RestoreState::WriteVec { mut write, r, buf }) => match write.poll()? {
725 Async::Ready((w, vec)) => {
726 self.delta.blocks[self.delta_pos] = Block::Literal(vec);
727 self.delta_pos += 1;
728 self.state = Some(RestoreState::Delta { w, r, buf })
729 }
730 Async::NotReady => {
731 self.state = Some(RestoreState::WriteVec { write, r, buf });
732 return Ok(Async::NotReady);
733 }
734 },
735 None => panic!(""),
736 }
737 }
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use rand;
744 use super::*;
745 use rand::Rng;
746 use tokio_core::reactor::Core;
747 const WINDOW: usize = 32;
748 #[test]
749 fn basic() {
750 for index in 0..10 {
751 let source = rand::thread_rng()
752 .gen_ascii_chars()
753 .take(WINDOW * 10 + 8)
754 .collect::<String>();
755 let mut modified = source.clone();
756 let index = WINDOW * index + 3;
757 unsafe {
758 modified.as_bytes_mut()[index] =
759 ((source.as_bytes()[index] as usize + 1) & 255) as u8
760 }
761 let block = [0; WINDOW];
762 let source_sig = signature(source.as_bytes(), block).unwrap();
763 let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
764
765 let mut restored = Vec::new();
766 let source = std::io::Cursor::new(source.as_bytes());
767 restore_seek(&mut restored, source, [0; WINDOW], &comp).unwrap();
768 if &restored[..] != modified.as_bytes() {
769 for i in 0..10 {
770 let a = &restored[i * WINDOW..(i + 1) * WINDOW];
771 let b = &modified.as_bytes()[i * WINDOW..(i + 1) * WINDOW];
772 println!("{:?}\n{:?}\n", a, b);
773 if a != b {
774 println!(">>>>>>>>");
775 }
776 }
777 panic!("different");
778 }
779 }
780 }
781 #[test]
782 fn futures() {
783 let source = rand::thread_rng()
784 .gen_ascii_chars()
785 .take(WINDOW * 10 + 8)
786 .collect::<String>();
787 let mut modified = source.clone();
788 let index = WINDOW + 3;
789 unsafe {
790 modified.as_bytes_mut()[index] = ((source.as_bytes()[index] as usize + 1) & 255) as u8
791 }
792
793 let mut l = Core::new().unwrap();
794 let block = [0; WINDOW];
795 let source_sig = signature(source.as_bytes(), block).unwrap();
796 println!("==================\n");
797 let (_, source_sig_) = l.run(signature_fut(source.as_bytes(), block)).unwrap();
798 assert_eq!(source_sig, source_sig_);
799 println!("{:?} {:?}", source_sig, source_sig_);
800
801 let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
802 let (_, _, comp_) = l.run(compare_fut(source_sig, modified.as_bytes(), block)).unwrap();
803 assert_eq!(comp, comp_);
804 println!("{:?}", comp);
805
806 let v = Vec::new();
807 let (rest_, _, _) = l.run(restore_seek_fut(
808 std::io::Cursor::new(v),
809 std::io::Cursor::new(source.as_bytes()),
810 [0; WINDOW],
811 comp,
812 )).unwrap();
813 assert_eq!(rest_.into_inner().as_slice(), modified.as_bytes());
814 }
815}