ipld_car/bounded_reader/sync/
chained_bounded_reader.rs1use crate::{
2 bounded_reader::{
3 error::BoundedReaderErr,
4 sync::BoundedReader,
5 traits::{Bounded, BoundedIndex, CloneAndRewind},
6 },
7 ensure,
8};
9
10use derivative::Derivative;
11use std::{
12 cmp::min,
13 io::{self, Read, Seek, SeekFrom},
14 ops::{Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive},
15};
16
17#[derive(derive_more::Debug, Derivative)]
23#[derivative(Clone(bound = ""))]
24pub struct ChainedBoundedReader<T> {
25 #[debug(skip)]
26 readers: Vec<BoundedReader<T>>,
27 curr: u64,
29 total_len: u64,
31}
32
33impl<T> ChainedBoundedReader<T> {
34 pub fn new(readers: Vec<BoundedReader<T>>) -> Self {
36 let total_len = readers.iter().map(BoundedReader::bound_len).sum();
37 Self { readers, curr: 0, total_len }
38 }
39
40 pub const fn empty() -> Self {
42 Self { readers: vec![], curr: 0, total_len: 0 }
43 }
44}
45
46impl<T> ChainedBoundedReader<T> {
47 fn slice(readers: &[BoundedReader<T>], start: u64, end: u64) -> Result<Self, BoundedReaderErr> {
50 let mut result = Vec::new();
51 let mut cursor = 0u64;
52 for reader in readers {
53 let rlen = reader.bound_len();
54 let reader_vend = cursor + rlen;
55 if cursor >= end {
56 break;
57 }
58 if reader_vend > start {
59 let sub_start = start.saturating_sub(cursor);
60 let sub_end = end.saturating_sub(cursor).min(rlen);
61 result.push(reader.sub(sub_start..sub_end)?);
62 }
63 cursor += rlen;
64 }
65 Ok(Self::new(result))
66 }
67}
68
69impl<T> From<BoundedReader<T>> for ChainedBoundedReader<T> {
70 fn from(r: BoundedReader<T>) -> Self {
71 let total_len = r.bound_len();
72 Self { readers: vec![r], curr: 0, total_len }
73 }
74}
75
76impl<T> Bounded for ChainedBoundedReader<T> {
77 fn bounds(&self) -> std::ops::Range<u64> {
78 let (starts, ends): (Vec<u64>, Vec<u64>) = self
79 .readers
80 .iter()
81 .map(|r| {
82 let b = r.bounds();
83 (b.start, b.end)
84 })
85 .unzip();
86
87 let min_start = starts.into_iter().min().unwrap_or_default();
88 let max_end = ends.into_iter().max().unwrap_or_default();
89 min_start..max_end
90 }
91
92 fn bound_len(&self) -> u64 {
93 self.total_len
94 }
95
96 fn sub<R: BoundedIndex<Self>>(&self, range: R) -> Result<Self, BoundedReaderErr> {
98 range.get(self)
99 }
100
101 fn clamped_sub<R: BoundedIndex<Self>>(&self, range: R) -> Self {
102 range.clamped_get(self)
103 }
104}
105
106impl<T> CloneAndRewind for ChainedBoundedReader<T> {
107 fn clone_and_rewind(&self) -> Self {
108 Self { readers: self.readers.clone(), curr: 0, total_len: self.total_len }
109 }
110}
111
112impl<T: Read + Seek> Read for ChainedBoundedReader<T> {
113 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
114 if buf.is_empty() || self.curr >= self.total_len {
115 return Ok(0);
116 }
117
118 let mut offset = self.curr;
119 for reader in &mut self.readers {
120 let len = reader.bound_len();
121 if offset < len {
122 reader.seek(SeekFrom::Start(offset))?;
123 let max_read = min(buf.len(), (len - offset) as usize);
124 let n = reader.read(&mut buf[..max_read])?;
125 self.curr += n as u64;
126 return Ok(n);
127 }
128 offset -= len;
129 }
130
131 Ok(0)
132 }
133}
134
135impl<T: Seek> Seek for ChainedBoundedReader<T> {
136 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
137 let new_pos = match pos {
138 SeekFrom::Start(n) => n,
139 SeekFrom::End(n) =>
140 if n >= 0 {
141 self.total_len.saturating_add(n as u64)
142 } else {
143 self.total_len.saturating_sub(n.unsigned_abs())
144 },
145 SeekFrom::Current(n) =>
146 if n >= 0 {
147 self.curr.saturating_add(n as u64)
148 } else {
149 self.curr.saturating_sub(n.unsigned_abs())
150 },
151 };
152 self.curr = new_pos.min(self.total_len);
153 Ok(self.curr)
154 }
155}
156
157impl<T> BoundedIndex<ChainedBoundedReader<T>> for Range<u64> {
158 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
159 ensure!(self.start <= self.end, BoundedReaderErr::invalid_range(self.clone()));
160 ensure!(self.start <= bounded.total_len, BoundedReaderErr::sub_start_exceed(bounded, self.start));
161 ensure!(self.end <= bounded.total_len, BoundedReaderErr::sub_end_exceed(bounded, self.end));
162 ChainedBoundedReader::slice(&bounded.readers, self.start, self.end)
163 }
164
165 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
166 let start = self.start.min(bounded.total_len);
167 let end = self.end.min(bounded.total_len);
168 let start = start.min(end);
169 ChainedBoundedReader::slice(&bounded.readers, start, end).unwrap_or_else(|_| ChainedBoundedReader::empty())
170 }
171}
172
173impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeFrom<u64> {
174 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
175 ensure!(self.start <= bounded.total_len, BoundedReaderErr::sub_start_exceed(bounded, self.start));
176 ChainedBoundedReader::slice(&bounded.readers, self.start, bounded.total_len)
177 }
178
179 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
180 let start = self.start.min(bounded.total_len);
181 ChainedBoundedReader::slice(&bounded.readers, start, bounded.total_len)
182 .unwrap_or_else(|_| ChainedBoundedReader::empty())
183 }
184}
185
186impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeTo<u64> {
187 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
188 ensure!(self.end <= bounded.total_len, BoundedReaderErr::sub_end_exceed(bounded, self.end));
189 ChainedBoundedReader::slice(&bounded.readers, 0, self.end)
190 }
191
192 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
193 let end = self.end.min(bounded.total_len);
194 ChainedBoundedReader::slice(&bounded.readers, 0, end).unwrap_or_else(|_| ChainedBoundedReader::empty())
195 }
196}
197
198impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeInclusive<u64> {
199 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
200 let (start, end) = self.into_inner();
201 let inc_end = end.checked_add(1).ok_or_else(|| BoundedReaderErr::file_too_large(bounded, start, end))?;
202 (start..inc_end).get(bounded)
203 }
204
205 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
206 let (start, end) = self.into_inner();
207 (start..(end.saturating_add(1))).clamped_get(bounded)
208 }
209}
210
211impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeToInclusive<u64> {
212 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
213 let inc_end = self.end.checked_add(1).ok_or_else(|| BoundedReaderErr::file_too_large(bounded, 0, self.end))?;
214 (0..inc_end).get(bounded)
215 }
216
217 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
218 (0..(self.end.saturating_add(1))).clamped_get(bounded)
219 }
220}
221
222impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeFull {
223 fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
224 Ok(self.clamped_get(bounded))
225 }
226
227 fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
228 bounded.clone_and_rewind()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use std::io::Cursor;
236 use test_case::test_case;
237
238 const LOREM_IPSUM: &[&[u8]] = &[
239 b"Lorem ipsum dolor sit amet, ", b"consectetur adipiscing elit, ", b"sed do eiusmod tempor incididunt ", b"ut labore et dolore magna aliqua.", ];
244
245 fn lorem_ipsum_joined() -> Vec<u8> {
246 LOREM_IPSUM.concat()
247 }
248
249 #[test_case( LOREM_IPSUM, (..) => lorem_ipsum_joined(); "Full")]
250 #[test_case( LOREM_IPSUM, 28..57 => b"consectetur adipiscing elit, ".to_vec(); "2nd slice")]
251 #[test_case( LOREM_IPSUM, 57..90 => b"sed do eiusmod tempor incididunt ".to_vec(); "3rd slice")]
252 #[test_case( LOREM_IPSUM, 40..63 => b"adipiscing elit, sed do".to_vec(); "partial 2nd & 3rd")]
253 #[test_case( LOREM_IPSUM, 40..=62 => b"adipiscing elit, sed do".to_vec(); "partial 2nd & 3rd ToInclusive")]
254 #[test_case( LOREM_IPSUM, ..11 => b"Lorem ipsum".to_vec(); "To")]
255 #[test_case( LOREM_IPSUM, ..=10 => b"Lorem ipsum".to_vec(); "ToInclusive")]
256 #[test_case( LOREM_IPSUM, (103..) => b"dolore magna aliqua.".to_vec(); "From")]
257 #[allow(unused_parens)]
258 fn bounded_index_as_chained<D, R>(segments: &[D], range: R) -> Vec<u8>
259 where
260 D: AsRef<[u8]>,
261 R: BoundedIndex<ChainedBoundedReader<Cursor<Vec<u8>>>>,
262 {
263 let readers = segments
264 .iter()
265 .map(|data| BoundedReader::from_reader(Cursor::new(data.as_ref().to_vec())).unwrap())
266 .collect::<Vec<_>>();
267 let bounded = ChainedBoundedReader::new(readers);
268 let mut sub = bounded.sub(range).unwrap();
269
270 let mut sub_content = vec![];
271 let _ = sub.read_to_end(&mut sub_content).unwrap();
272 sub_content
273 }
274
275 #[test_case( LOREM_IPSUM, &[(28..55), (12..22)] => b"adipiscing".to_vec(); "Nested (28..55)(12..22)")]
276 #[test_case( LOREM_IPSUM, &[(28..55), (0..11)] => b"consectetur".to_vec(); "Nested (28..55)(0..11)")]
277 #[test_case( LOREM_IPSUM, &[28..=55, 23..=26] => b"elit".to_vec(); "Nested 28..=55 23..=26")]
278 #[test_case( LOREM_IPSUM, &[..=55, ..=4] => b"Lorem".to_vec(); "Nested ..=55 ..=4")]
279 #[test_case( LOREM_IPSUM, &[(28..), (88..)] => b"aliqua.".to_vec(); "Nested (28..) (88..)")]
280 #[test_case( LOREM_IPSUM, &[(..), (..)] => lorem_ipsum_joined(); "Nested (..) (..)")]
281 fn nested_bounded_index_as_chained<D, R>(segments: &[D], nested_ranges: &[R]) -> Vec<u8>
282 where
283 D: AsRef<[u8]>,
284 R: BoundedIndex<ChainedBoundedReader<Cursor<Vec<u8>>>> + Clone,
285 {
286 let readers = segments
287 .iter()
288 .map(|data| BoundedReader::from_reader(Cursor::new(data.as_ref().to_vec())).unwrap())
289 .collect::<Vec<_>>();
290 let mut bounded = ChainedBoundedReader::new(readers);
291 for range in nested_ranges {
292 bounded = bounded.sub(range.clone()).unwrap();
293 }
294
295 let mut content = vec![];
296 let _ = bounded.read_to_end(&mut content).unwrap();
297 content
298 }
299}