1pub mod metadata;
19
20use std::fs::File;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22
23use bytes::{Buf, Bytes};
24
25#[allow(clippy::len_without_is_empty)]
27pub trait ChunkReader {
28 type T: Read;
29
30 fn len(&self) -> u64;
34
35 fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>;
37
38 fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Bytes> {
40 let mut bytes = vec![0; length as usize];
41 self.get_read(offset_from_start)?
42 .take(length)
43 .read_exact(&mut bytes)?;
44 Ok(bytes.into())
45 }
46}
47
48impl ChunkReader for File {
49 type T = BufReader<File>;
50
51 fn len(&self) -> u64 {
52 self.metadata().map(|m| m.len()).unwrap_or(0u64)
53 }
54
55 fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
60 let mut reader = self.try_clone()?;
61 reader.seek(SeekFrom::Start(offset_from_start))?;
62 Ok(BufReader::new(self.try_clone()?))
63 }
64}
65
66impl ChunkReader for Bytes {
67 type T = bytes::buf::Reader<Bytes>;
68
69 fn len(&self) -> u64 {
70 self.len() as u64
71 }
72
73 fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
74 Ok(self.slice(offset_from_start as usize..).reader())
75 }
76}
77
78#[cfg(feature = "async")]
79mod async_chunk_reader {
80 use super::*;
81
82 use futures_util::future::BoxFuture;
83 use futures_util::FutureExt;
84 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
85
86 #[allow(clippy::len_without_is_empty)]
87 pub trait AsyncChunkReader: Send {
88 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>>;
90
91 fn get_bytes(
92 &mut self,
93 offset_from_start: u64,
94 length: u64,
95 ) -> BoxFuture<'_, std::io::Result<Bytes>>;
96 }
97
98 impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncChunkReader for T {
99 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
100 async move { self.seek(SeekFrom::End(0)).await }.boxed()
101 }
102
103 fn get_bytes(
104 &mut self,
105 offset_from_start: u64,
106 length: u64,
107 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
108 async move {
109 self.seek(SeekFrom::Start(offset_from_start)).await?;
110 let mut buffer = vec![0; length as usize];
111 self.read_exact(&mut buffer).await?;
112 Ok(buffer.into())
113 }
114 .boxed()
115 }
116 }
117
118 impl AsyncChunkReader for Box<dyn AsyncChunkReader> {
119 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
120 self.as_mut().len()
121 }
122
123 fn get_bytes(
124 &mut self,
125 offset_from_start: u64,
126 length: u64,
127 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
128 self.as_mut().get_bytes(offset_from_start, length)
129 }
130 }
131}
132
133#[cfg(feature = "async")]
134pub use async_chunk_reader::AsyncChunkReader;
135
136#[cfg(all(feature = "async", feature = "opendal"))]
137mod async_opendal_reader {
138 use crate::reader::AsyncChunkReader;
139 use bytes::Bytes;
140 use futures_util::future::BoxFuture;
141 use opendal::Operator;
142 use std::sync::Arc;
143
144 pub struct AsyncOpendalReader {
164 op: Operator,
165 path: Arc<String>,
166 }
167
168 impl AsyncOpendalReader {
169 pub fn new(op: Operator, path: &str) -> Self {
171 Self {
172 op,
173 path: Arc::new(path.to_string()),
174 }
175 }
176 }
177
178 impl AsyncChunkReader for AsyncOpendalReader {
179 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
180 let path = self.path.clone();
181 Box::pin(async move {
182 let meta = self.op.stat(&path).await?;
183 Ok(meta.content_length())
184 })
185 }
186
187 fn get_bytes(
188 &mut self,
189 offset_from_start: u64,
190 length: u64,
191 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
192 let path = self.path.clone();
193
194 Box::pin(async move {
195 let reader = self
196 .op
197 .read_with(&path)
198 .range(offset_from_start..offset_from_start + length)
199 .await?;
200 Ok(reader.to_bytes())
201 })
202 }
203 }
204}
205
206#[cfg(all(feature = "async", feature = "opendal"))]
207pub use async_opendal_reader::AsyncOpendalReader;