wnfs_unixfs_file/
chunker.rs1use anyhow::{anyhow, Context};
2use bytes::Bytes;
3use futures::Stream;
4use std::{
5 fmt::{Debug, Display},
6 io,
7 pin::Pin,
8 str::FromStr,
9 task,
10};
11use tokio::io::AsyncRead;
12use wnfs_common::utils::{BoxStream, CondSend};
13
14mod fixed;
15mod rabin;
16
17pub const DEFAULT_CHUNK_SIZE_LIMIT: usize = 1024 * 1024;
19
20pub use self::{
21 fixed::{Fixed, DEFAULT_CHUNKS_SIZE},
22 rabin::Rabin,
23};
24
25#[derive(Debug, PartialEq, Eq, Clone)]
26pub enum Chunker {
27 Fixed(Fixed),
28 Rabin(Box<Rabin>),
29}
30
31impl Display for Chunker {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::Fixed(c) => write!(f, "Chunker::Fixed({})", c.chunk_size),
35 Self::Rabin(_) => write!(f, "Chunker::Rabin"),
36 }
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Copy)]
42pub enum ChunkerConfig {
43 Fixed(usize),
45 Rabin,
47}
48
49impl Display for ChunkerConfig {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 Self::Fixed(chunk_size) => write!(f, "fixed-{chunk_size}"),
53 Self::Rabin => write!(f, "rabin"),
54 }
55 }
56}
57
58impl FromStr for ChunkerConfig {
59 type Err = anyhow::Error;
60
61 fn from_str(s: &str) -> Result<Self, Self::Err> {
62 if s == "rabin" {
63 return Ok(ChunkerConfig::Rabin);
64 }
65
66 if let Some(rest) = s.strip_prefix("fixed") {
67 if rest.is_empty() {
68 return Ok(ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE));
69 }
70
71 if let Some(rest) = rest.strip_prefix('-') {
72 let chunk_size: usize = rest.parse().context("invalid chunk size")?;
73 if chunk_size > DEFAULT_CHUNK_SIZE_LIMIT {
74 return Err(anyhow!("chunk size too large"));
75 }
76
77 return Ok(ChunkerConfig::Fixed(chunk_size));
78 }
79 }
80
81 Err(anyhow!("unknown chunker: {}", s))
82 }
83}
84
85impl From<ChunkerConfig> for Chunker {
86 fn from(cfg: ChunkerConfig) -> Self {
87 match cfg {
88 ChunkerConfig::Fixed(chunk_size) => Chunker::Fixed(Fixed::new(chunk_size)),
89 ChunkerConfig::Rabin => Chunker::Rabin(Box::default()),
90 }
91 }
92}
93
94pub enum ChunkerStream<'a> {
95 Fixed(BoxStream<'a, io::Result<Bytes>>),
96 Rabin(BoxStream<'a, io::Result<Bytes>>),
97}
98
99impl<'a> Debug for ChunkerStream<'a> {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Self::Fixed(_) => write!(f, "Fixed(impl Stream<Item=Bytes>)"),
103 Self::Rabin(_) => write!(f, "Rabin(impl Stream<Item=Bytes>)"),
104 }
105 }
106}
107
108impl<'a> Stream for ChunkerStream<'a> {
109 type Item = io::Result<Bytes>;
110
111 fn poll_next(
112 mut self: Pin<&mut Self>,
113 cx: &mut task::Context<'_>,
114 ) -> task::Poll<Option<Self::Item>> {
115 match &mut *self {
116 Self::Fixed(ref mut stream) => Pin::new(stream).poll_next(cx),
117 Self::Rabin(ref mut stream) => Pin::new(stream).poll_next(cx),
118 }
119 }
120
121 fn size_hint(&self) -> (usize, Option<usize>) {
122 match self {
123 Self::Fixed(ref stream) => stream.size_hint(),
124 Self::Rabin(ref stream) => stream.size_hint(),
125 }
126 }
127}
128
129impl Chunker {
130 pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(self, source: R) -> ChunkerStream<'a> {
131 match self {
132 Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)),
133 Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)),
134 }
135 }
136}