wnfs_unixfs_file/
chunker.rs

1use anyhow::{anyhow, Context};
2use bytes::Bytes;
3use futures::Stream;
4use std::{
5    fmt::{Debug, Display},
6    io,
7    pin::Pin,
8    str::FromStr,
9    task,
10};
11use tokio::io::AsyncRead;
12use wnfs_common::utils::{BoxStream, CondSend};
13
14mod fixed;
15mod rabin;
16
17/// Chunks are limited to 1MiB by default
18pub const DEFAULT_CHUNK_SIZE_LIMIT: usize = 1024 * 1024;
19
20pub use self::{
21    fixed::{Fixed, DEFAULT_CHUNKS_SIZE},
22    rabin::Rabin,
23};
24
25#[derive(Debug, PartialEq, Eq, Clone)]
26pub enum Chunker {
27    Fixed(Fixed),
28    Rabin(Box<Rabin>),
29}
30
31impl Display for Chunker {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            Self::Fixed(c) => write!(f, "Chunker::Fixed({})", c.chunk_size),
35            Self::Rabin(_) => write!(f, "Chunker::Rabin"),
36        }
37    }
38}
39
40/// Chunker configuration.
41#[derive(Debug, Clone, PartialEq, Eq, Copy)]
42pub enum ChunkerConfig {
43    /// Fixed sized chunker.
44    Fixed(usize),
45    /// Rabin chunker.
46    Rabin,
47}
48
49impl Display for ChunkerConfig {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            Self::Fixed(chunk_size) => write!(f, "fixed-{chunk_size}"),
53            Self::Rabin => write!(f, "rabin"),
54        }
55    }
56}
57
58impl FromStr for ChunkerConfig {
59    type Err = anyhow::Error;
60
61    fn from_str(s: &str) -> Result<Self, Self::Err> {
62        if s == "rabin" {
63            return Ok(ChunkerConfig::Rabin);
64        }
65
66        if let Some(rest) = s.strip_prefix("fixed") {
67            if rest.is_empty() {
68                return Ok(ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE));
69            }
70
71            if let Some(rest) = rest.strip_prefix('-') {
72                let chunk_size: usize = rest.parse().context("invalid chunk size")?;
73                if chunk_size > DEFAULT_CHUNK_SIZE_LIMIT {
74                    return Err(anyhow!("chunk size too large"));
75                }
76
77                return Ok(ChunkerConfig::Fixed(chunk_size));
78            }
79        }
80
81        Err(anyhow!("unknown chunker: {}", s))
82    }
83}
84
85impl From<ChunkerConfig> for Chunker {
86    fn from(cfg: ChunkerConfig) -> Self {
87        match cfg {
88            ChunkerConfig::Fixed(chunk_size) => Chunker::Fixed(Fixed::new(chunk_size)),
89            ChunkerConfig::Rabin => Chunker::Rabin(Box::default()),
90        }
91    }
92}
93
94pub enum ChunkerStream<'a> {
95    Fixed(BoxStream<'a, io::Result<Bytes>>),
96    Rabin(BoxStream<'a, io::Result<Bytes>>),
97}
98
99impl<'a> Debug for ChunkerStream<'a> {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Self::Fixed(_) => write!(f, "Fixed(impl Stream<Item=Bytes>)"),
103            Self::Rabin(_) => write!(f, "Rabin(impl Stream<Item=Bytes>)"),
104        }
105    }
106}
107
108impl<'a> Stream for ChunkerStream<'a> {
109    type Item = io::Result<Bytes>;
110
111    fn poll_next(
112        mut self: Pin<&mut Self>,
113        cx: &mut task::Context<'_>,
114    ) -> task::Poll<Option<Self::Item>> {
115        match &mut *self {
116            Self::Fixed(ref mut stream) => Pin::new(stream).poll_next(cx),
117            Self::Rabin(ref mut stream) => Pin::new(stream).poll_next(cx),
118        }
119    }
120
121    fn size_hint(&self) -> (usize, Option<usize>) {
122        match self {
123            Self::Fixed(ref stream) => stream.size_hint(),
124            Self::Rabin(ref stream) => stream.size_hint(),
125        }
126    }
127}
128
129impl Chunker {
130    pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(self, source: R) -> ChunkerStream<'a> {
131        match self {
132            Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)),
133            Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)),
134        }
135    }
136}