Skip to main content

ipld_car/bounded_reader/sync/
chained_bounded_reader.rs

1use crate::{
2	bounded_reader::{
3		error::BoundedReaderErr,
4		sync::BoundedReader,
5		traits::{Bounded, BoundedIndex, CloneAndRewind},
6	},
7	ensure,
8};
9
10use derivative::Derivative;
11use std::{
12	cmp::min,
13	io::{self, Read, Seek, SeekFrom},
14	ops::{Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive},
15};
16
17/// A sequential composition of multiple [`BoundedReader`]s that presents them as a single
18/// contiguous stream.
19///
20/// Implements [`Read`] and [`Seek`] as if all inner readers were concatenated. Reads advance
21/// through segments in order; seeks operate on the virtual flat offset space.
22#[derive(derive_more::Debug, Derivative)]
23#[derivative(Clone(bound = ""))]
24pub struct ChainedBoundedReader<T> {
25	#[debug(skip)]
26	readers: Vec<BoundedReader<T>>,
27	/// Current position in the composed virtual stream.
28	curr: u64,
29	/// Cached sum of all inner readers' lengths.
30	total_len: u64,
31}
32
33impl<T> ChainedBoundedReader<T> {
34	/// Creates a new composed reader from a list of bounded readers.
35	pub fn new(readers: Vec<BoundedReader<T>>) -> Self {
36		let total_len = readers.iter().map(BoundedReader::bound_len).sum();
37		Self { readers, curr: 0, total_len }
38	}
39
40	/// Creates an empty reader.
41	pub const fn empty() -> Self {
42		Self { readers: vec![], curr: 0, total_len: 0 }
43	}
44}
45
46impl<T> ChainedBoundedReader<T> {
47	/// Slices this reader to virtual offset range `[start, end)`, returning a new
48	/// `ChainedBoundedReader` covering only that portion.
49	fn slice(readers: &[BoundedReader<T>], start: u64, end: u64) -> Result<Self, BoundedReaderErr> {
50		let mut result = Vec::new();
51		let mut cursor = 0u64;
52		for reader in readers {
53			let rlen = reader.bound_len();
54			let reader_vend = cursor + rlen;
55			if cursor >= end {
56				break;
57			}
58			if reader_vend > start {
59				let sub_start = start.saturating_sub(cursor);
60				let sub_end = end.saturating_sub(cursor).min(rlen);
61				result.push(reader.sub(sub_start..sub_end)?);
62			}
63			cursor += rlen;
64		}
65		Ok(Self::new(result))
66	}
67}
68
69impl<T> From<BoundedReader<T>> for ChainedBoundedReader<T> {
70	fn from(r: BoundedReader<T>) -> Self {
71		let total_len = r.bound_len();
72		Self { readers: vec![r], curr: 0, total_len }
73	}
74}
75
76impl<T> Bounded for ChainedBoundedReader<T> {
77	fn bounds(&self) -> std::ops::Range<u64> {
78		let (starts, ends): (Vec<u64>, Vec<u64>) = self
79			.readers
80			.iter()
81			.map(|r| {
82				let b = r.bounds();
83				(b.start, b.end)
84			})
85			.unzip();
86
87		let min_start = starts.into_iter().min().unwrap_or_default();
88		let max_end = ends.into_iter().max().unwrap_or_default();
89		min_start..max_end
90	}
91
92	fn bound_len(&self) -> u64 {
93		self.total_len
94	}
95
96	/// Creates a new bounded reader that is a sub-range of this one.
97	fn sub<R: BoundedIndex<Self>>(&self, range: R) -> Result<Self, BoundedReaderErr> {
98		range.get(self)
99	}
100
101	fn clamped_sub<R: BoundedIndex<Self>>(&self, range: R) -> Self {
102		range.clamped_get(self)
103	}
104}
105
106impl<T> CloneAndRewind for ChainedBoundedReader<T> {
107	fn clone_and_rewind(&self) -> Self {
108		Self { readers: self.readers.clone(), curr: 0, total_len: self.total_len }
109	}
110}
111
112impl<T: Read + Seek> Read for ChainedBoundedReader<T> {
113	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
114		if buf.is_empty() || self.curr >= self.total_len {
115			return Ok(0);
116		}
117
118		let mut offset = self.curr;
119		for reader in &mut self.readers {
120			let len = reader.bound_len();
121			if offset < len {
122				reader.seek(SeekFrom::Start(offset))?;
123				let max_read = min(buf.len(), (len - offset) as usize);
124				let n = reader.read(&mut buf[..max_read])?;
125				self.curr += n as u64;
126				return Ok(n);
127			}
128			offset -= len;
129		}
130
131		Ok(0)
132	}
133}
134
135impl<T: Seek> Seek for ChainedBoundedReader<T> {
136	fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
137		let new_pos = match pos {
138			SeekFrom::Start(n) => n,
139			SeekFrom::End(n) =>
140				if n >= 0 {
141					self.total_len.saturating_add(n as u64)
142				} else {
143					self.total_len.saturating_sub(n.unsigned_abs())
144				},
145			SeekFrom::Current(n) =>
146				if n >= 0 {
147					self.curr.saturating_add(n as u64)
148				} else {
149					self.curr.saturating_sub(n.unsigned_abs())
150				},
151		};
152		self.curr = new_pos.min(self.total_len);
153		Ok(self.curr)
154	}
155}
156
157impl<T> BoundedIndex<ChainedBoundedReader<T>> for Range<u64> {
158	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
159		ensure!(self.start <= self.end, BoundedReaderErr::invalid_range(self.clone()));
160		ensure!(self.start <= bounded.total_len, BoundedReaderErr::sub_start_exceed(bounded, self.start));
161		ensure!(self.end <= bounded.total_len, BoundedReaderErr::sub_end_exceed(bounded, self.end));
162		ChainedBoundedReader::slice(&bounded.readers, self.start, self.end)
163	}
164
165	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
166		let start = self.start.min(bounded.total_len);
167		let end = self.end.min(bounded.total_len);
168		let start = start.min(end);
169		ChainedBoundedReader::slice(&bounded.readers, start, end).unwrap_or_else(|_| ChainedBoundedReader::empty())
170	}
171}
172
173impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeFrom<u64> {
174	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
175		ensure!(self.start <= bounded.total_len, BoundedReaderErr::sub_start_exceed(bounded, self.start));
176		ChainedBoundedReader::slice(&bounded.readers, self.start, bounded.total_len)
177	}
178
179	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
180		let start = self.start.min(bounded.total_len);
181		ChainedBoundedReader::slice(&bounded.readers, start, bounded.total_len)
182			.unwrap_or_else(|_| ChainedBoundedReader::empty())
183	}
184}
185
186impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeTo<u64> {
187	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
188		ensure!(self.end <= bounded.total_len, BoundedReaderErr::sub_end_exceed(bounded, self.end));
189		ChainedBoundedReader::slice(&bounded.readers, 0, self.end)
190	}
191
192	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
193		let end = self.end.min(bounded.total_len);
194		ChainedBoundedReader::slice(&bounded.readers, 0, end).unwrap_or_else(|_| ChainedBoundedReader::empty())
195	}
196}
197
198impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeInclusive<u64> {
199	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
200		let (start, end) = self.into_inner();
201		let inc_end = end.checked_add(1).ok_or_else(|| BoundedReaderErr::file_too_large(bounded, start, end))?;
202		(start..inc_end).get(bounded)
203	}
204
205	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
206		let (start, end) = self.into_inner();
207		(start..(end.saturating_add(1))).clamped_get(bounded)
208	}
209}
210
211impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeToInclusive<u64> {
212	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
213		let inc_end = self.end.checked_add(1).ok_or_else(|| BoundedReaderErr::file_too_large(bounded, 0, self.end))?;
214		(0..inc_end).get(bounded)
215	}
216
217	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
218		(0..(self.end.saturating_add(1))).clamped_get(bounded)
219	}
220}
221
222impl<T> BoundedIndex<ChainedBoundedReader<T>> for RangeFull {
223	fn get(self, bounded: &ChainedBoundedReader<T>) -> Result<ChainedBoundedReader<T>, BoundedReaderErr> {
224		Ok(self.clamped_get(bounded))
225	}
226
227	fn clamped_get(self, bounded: &ChainedBoundedReader<T>) -> ChainedBoundedReader<T> {
228		bounded.clone_and_rewind()
229	}
230}
231
232#[cfg(test)]
233mod tests {
234	use super::*;
235	use std::io::Cursor;
236	use test_case::test_case;
237
238	const LOREM_IPSUM: &[&[u8]] = &[
239		b"Lorem ipsum dolor sit amet, ",      // [0,28)
240		b"consectetur adipiscing elit, ",     // [28, 57)
241		b"sed do eiusmod tempor incididunt ", // [57, 90)
242		b"ut labore et dolore magna aliqua.", // [90, 123)
243	];
244
245	fn lorem_ipsum_joined() -> Vec<u8> {
246		LOREM_IPSUM.concat()
247	}
248
249	#[test_case( LOREM_IPSUM, (..) => lorem_ipsum_joined(); "Full")]
250	#[test_case( LOREM_IPSUM, 28..57 => b"consectetur adipiscing elit, ".to_vec(); "2nd slice")]
251	#[test_case( LOREM_IPSUM, 57..90 => b"sed do eiusmod tempor incididunt ".to_vec(); "3rd slice")]
252	#[test_case( LOREM_IPSUM, 40..63 => b"adipiscing elit, sed do".to_vec(); "partial 2nd & 3rd")]
253	#[test_case( LOREM_IPSUM, 40..=62 => b"adipiscing elit, sed do".to_vec(); "partial 2nd & 3rd ToInclusive")]
254	#[test_case( LOREM_IPSUM, ..11 => b"Lorem ipsum".to_vec(); "To")]
255	#[test_case( LOREM_IPSUM, ..=10 => b"Lorem ipsum".to_vec(); "ToInclusive")]
256	#[test_case( LOREM_IPSUM, (103..) => b"dolore magna aliqua.".to_vec(); "From")]
257	#[allow(unused_parens)]
258	fn bounded_index_as_chained<D, R>(segments: &[D], range: R) -> Vec<u8>
259	where
260		D: AsRef<[u8]>,
261		R: BoundedIndex<ChainedBoundedReader<Cursor<Vec<u8>>>>,
262	{
263		let readers = segments
264			.iter()
265			.map(|data| BoundedReader::from_reader(Cursor::new(data.as_ref().to_vec())).unwrap())
266			.collect::<Vec<_>>();
267		let bounded = ChainedBoundedReader::new(readers);
268		let mut sub = bounded.sub(range).unwrap();
269
270		let mut sub_content = vec![];
271		let _ = sub.read_to_end(&mut sub_content).unwrap();
272		sub_content
273	}
274
275	#[test_case( LOREM_IPSUM, &[(28..55), (12..22)] => b"adipiscing".to_vec(); "Nested (28..55)(12..22)")]
276	#[test_case( LOREM_IPSUM, &[(28..55), (0..11)] => b"consectetur".to_vec(); "Nested (28..55)(0..11)")]
277	#[test_case( LOREM_IPSUM, &[28..=55, 23..=26] => b"elit".to_vec(); "Nested 28..=55 23..=26")]
278	#[test_case( LOREM_IPSUM, &[..=55, ..=4] => b"Lorem".to_vec(); "Nested ..=55 ..=4")]
279	#[test_case( LOREM_IPSUM, &[(28..), (88..)] => b"aliqua.".to_vec(); "Nested (28..) (88..)")]
280	#[test_case( LOREM_IPSUM, &[(..), (..)] => lorem_ipsum_joined(); "Nested (..) (..)")]
281	fn nested_bounded_index_as_chained<D, R>(segments: &[D], nested_ranges: &[R]) -> Vec<u8>
282	where
283		D: AsRef<[u8]>,
284		R: BoundedIndex<ChainedBoundedReader<Cursor<Vec<u8>>>> + Clone,
285	{
286		let readers = segments
287			.iter()
288			.map(|data| BoundedReader::from_reader(Cursor::new(data.as_ref().to_vec())).unwrap())
289			.collect::<Vec<_>>();
290		let mut bounded = ChainedBoundedReader::new(readers);
291		for range in nested_ranges {
292			bounded = bounded.sub(range.clone()).unwrap();
293		}
294
295		let mut content = vec![];
296		let _ = bounded.read_to_end(&mut content).unwrap();
297		content
298	}
299}