1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use anyhow::{anyhow, Context};
use bytes::Bytes;
use futures::Stream;
use std::{
    fmt::{Debug, Display},
    io,
    pin::Pin,
    str::FromStr,
    task,
};
use tokio::io::AsyncRead;
use wnfs_common::utils::{BoxStream, CondSend};

mod fixed;
mod rabin;

/// Chunks are limited to 1MiB by default
pub const DEFAULT_CHUNK_SIZE_LIMIT: usize = 1024 * 1024;

pub use self::{
    fixed::{Fixed, DEFAULT_CHUNKS_SIZE},
    rabin::Rabin,
};

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Chunker {
    Fixed(Fixed),
    Rabin(Box<Rabin>),
}

impl Display for Chunker {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Fixed(c) => write!(f, "Chunker::Fixed({})", c.chunk_size),
            Self::Rabin(_) => write!(f, "Chunker::Rabin"),
        }
    }
}

/// Chunker configuration.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum ChunkerConfig {
    /// Fixed sized chunker.
    Fixed(usize),
    /// Rabin chunker.
    Rabin,
}

impl Display for ChunkerConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Fixed(chunk_size) => write!(f, "fixed-{chunk_size}"),
            Self::Rabin => write!(f, "rabin"),
        }
    }
}

impl FromStr for ChunkerConfig {
    type Err = anyhow::Error;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        if s == "rabin" {
            return Ok(ChunkerConfig::Rabin);
        }

        if let Some(rest) = s.strip_prefix("fixed") {
            if rest.is_empty() {
                return Ok(ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE));
            }

            if let Some(rest) = rest.strip_prefix('-') {
                let chunk_size: usize = rest.parse().context("invalid chunk size")?;
                if chunk_size > DEFAULT_CHUNK_SIZE_LIMIT {
                    return Err(anyhow!("chunk size too large"));
                }

                return Ok(ChunkerConfig::Fixed(chunk_size));
            }
        }

        Err(anyhow!("unknown chunker: {}", s))
    }
}

impl From<ChunkerConfig> for Chunker {
    fn from(cfg: ChunkerConfig) -> Self {
        match cfg {
            ChunkerConfig::Fixed(chunk_size) => Chunker::Fixed(Fixed::new(chunk_size)),
            ChunkerConfig::Rabin => Chunker::Rabin(Box::default()),
        }
    }
}

pub enum ChunkerStream<'a> {
    Fixed(BoxStream<'a, io::Result<Bytes>>),
    Rabin(BoxStream<'a, io::Result<Bytes>>),
}

impl<'a> Debug for ChunkerStream<'a> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Fixed(_) => write!(f, "Fixed(impl Stream<Item=Bytes>)"),
            Self::Rabin(_) => write!(f, "Rabin(impl Stream<Item=Bytes>)"),
        }
    }
}

impl<'a> Stream for ChunkerStream<'a> {
    type Item = io::Result<Bytes>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> task::Poll<Option<Self::Item>> {
        match &mut *self {
            Self::Fixed(ref mut stream) => Pin::new(stream).poll_next(cx),
            Self::Rabin(ref mut stream) => Pin::new(stream).poll_next(cx),
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        match self {
            Self::Fixed(ref stream) => stream.size_hint(),
            Self::Rabin(ref stream) => stream.size_hint(),
        }
    }
}

impl Chunker {
    pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(self, source: R) -> ChunkerStream<'a> {
        match self {
            Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)),
            Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)),
        }
    }
}