1use super::read_seek::AsyncReadWithSeek;
2use cyfs_base::*;
3
4use cyfs_sha2 as sha2;
5use futures::AsyncSeekExt;
6use sha2::Digest;
7use std::io::SeekFrom;
8use std::ops::Range;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12pub struct ReaderWithLimit {
13 limit: u64,
14 range: Range<u64>,
15 reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
16}
17
18impl ReaderWithLimit {
19 pub async fn new(
20 limit: u64,
21 mut reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
22 ) -> BuckyResult<Self> {
23 let start = reader.stream_position().await?;
24 let range = Range {
25 start,
26 end: start + limit,
27 };
28
29 Ok(Self {
30 limit,
31 range,
32 reader,
33 })
34 }
35}
36
37impl async_std::io::Read for ReaderWithLimit {
38 fn poll_read(
39 mut self: Pin<&mut Self>,
40 cx: &mut Context<'_>,
41 buf: &mut [u8],
42 ) -> Poll<std::io::Result<usize>> {
43 if self.limit == 0 {
44 return Poll::Ready(Ok(0));
45 }
46
47 let max = std::cmp::min(buf.len() as u64, self.limit) as usize;
48 let ret = Pin::new(self.reader.as_mut()).poll_read(cx, &mut buf[..max]);
49 match ret {
50 Poll::Ready(Ok(n)) => {
51 self.limit -= n as u64;
52 Poll::Ready(Ok(n))
53 }
54 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
55 Poll::Pending => Poll::Pending,
56 }
57 }
58}
59
60impl async_std::io::Seek for ReaderWithLimit {
61 fn poll_seek(
62 mut self: Pin<&mut Self>,
63 cx: &mut Context<'_>,
64 pos: SeekFrom,
65 ) -> Poll<std::io::Result<u64>> {
66 let pos = match pos {
67 SeekFrom::Start(pos) => SeekFrom::Start(self.range.start + pos),
68 SeekFrom::End(offset) => SeekFrom::Start((self.range.end as i64 + offset) as u64),
69 SeekFrom::Current(offset) => SeekFrom::Current(offset),
70 };
71
72 match Pin::new(self.reader.as_mut()).poll_seek(cx, pos) {
74 Poll::Pending => Poll::Pending,
75 Poll::Ready(Ok(mut pos)) => {
76 if pos < self.range.start {
78 let msg = format!("seek beyond the begin: {} < {}", pos, self.range.start);
79 let err = BuckyError::new(BuckyErrorCode::InvalidInput, msg);
80 return Poll::Ready(Err(err.into()));
81 } else if pos > self.range.end {
82 pos = self.range.end;
83 }
84
85 self.limit = self.range.end - pos;
86 Poll::Ready(Ok(pos - self.range.start))
87 }
88 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
89 }
90 }
91}
92
93impl AsyncReadWithSeek for ReaderWithLimit {}
94
95#[async_trait::async_trait]
96pub trait ChunkHashErrorHandler: Send + Sync {
97 fn on_hash_error(&self, chunk_id: &ChunkId, path: &str);
98}
99
100pub struct ChunkReaderWithHash {
101 path: String,
102 chunk_id: ChunkId,
103 reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
104 hash: sha2::Sha256,
105 error_handler: Option<Box<dyn ChunkHashErrorHandler>>,
106 seeked: bool,
107 hashed_len: usize,
108}
109
110impl ChunkReaderWithHash {
111 pub fn new(
112 path: String,
113 chunk_id: ChunkId,
114 reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
115 error_handler: Option<Box<dyn ChunkHashErrorHandler>>,
116 ) -> Self {
117 Self {
118 path,
119 chunk_id,
120 reader,
121 hash: sha2::Sha256::new(),
122 error_handler,
123 seeked: false,
124 hashed_len: 0,
125 }
126 }
127}
128
129impl async_std::io::Read for ChunkReaderWithHash {
130 fn poll_read(
131 mut self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 buf: &mut [u8],
134 ) -> Poll<std::io::Result<usize>> {
135 let ret = Pin::new(self.reader.as_mut()).poll_read(cx, buf);
136 match ret {
137 Poll::Ready(ret) => match ret {
138 Ok(size) => {
139 if size > 0 {
140 self.hashed_len += size;
141 self.hash.input(&buf[0..size]);
142 Poll::Ready(Ok(size))
143 } else {
144 if self.seeked {
145 warn!(
146 "read chunk with hash but seeked already! chunk={}",
147 self.chunk_id
148 );
149 Poll::Ready(Ok(0))
150 } else if self.hashed_len != self.chunk_id.len() {
151 error!("read chunk with hash but ended with unmatched length! chunk={}, len={}, read len={}",
152 self.chunk_id, self.chunk_id.len(), self.hashed_len,);
153 Poll::Ready(Ok(0))
155 } else {
156 let hash_value = self.hash.clone().result().into();
157 let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32);
158
159 if actual_id.eq(&self.chunk_id) {
160 debug!(
161 "read chunk from file complete! chunk={}, file={}",
162 self.chunk_id, self.path
163 );
164 Poll::Ready(Ok(0))
165 } else {
166 let msg = format!(
167 "content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}",
168 self.chunk_id, self.path, self.chunk_id, actual_id
169 );
170 error!("{}", msg);
171
172 if let Some(error_handler) = self.error_handler.take() {
173 error_handler.on_hash_error(&self.chunk_id, &self.path);
174 }
175
176 let err = BuckyError::new(BuckyErrorCode::InvalidData, msg);
177 Poll::Ready(Err(err.into()))
178 }
179 }
180 }
181 }
182 Err(e) => Poll::Ready(Err(e)),
183 },
184 Poll::Pending => Poll::Pending,
185 }
186 }
187}
188
189impl async_std::io::Seek for ChunkReaderWithHash {
190 fn poll_seek(
191 mut self: Pin<&mut Self>,
192 cx: &mut Context<'_>,
193 pos: SeekFrom,
194 ) -> Poll<std::io::Result<u64>> {
195 self.seeked = true;
196 Pin::new(self.reader.as_mut()).poll_seek(cx, pos)
197 }
198}
199
200impl AsyncReadWithSeek for ChunkReaderWithHash {}
201
202#[cfg(test)]
203mod tests {
204 use super::{ChunkReaderWithHash, ReaderWithLimit};
205 use async_std::io::prelude::*;
206 use cyfs_base::*;
207 use std::io::SeekFrom;
208 use std::str::FromStr;
209
210 async fn test_file() {
211 let file = "C:\\cyfs\\data\\test\\2JtHrtiW4J";
215 let chunk_id = ChunkId::from_str("7C8WXUGiYVyag6WXdsFz6B8JgpedMMgkng3MRM4XoPrX").unwrap();
216
217 let reader = async_std::fs::File::open(file).await.unwrap();
222 let mut reader =
223 ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);
224
225 let mut buf2 = vec![];
226 reader.read_to_end(&mut buf2).await.unwrap_err();
227 }
228
229 async fn test1() {
230 let buf: Vec<u8> = (0..3000).map(|_| rand::random::<u8>()).collect();
231 let chunk_id = ChunkId::calculate(&buf).await.unwrap();
232
233 {
234 let chunk_id = ChunkId::calculate(&buf[0..1000]).await.unwrap();
235 let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
236
237 let sub_reader = ReaderWithLimit::new(1000, buf_reader).await.unwrap();
238 let mut reader = ChunkReaderWithHash::new(
239 "test2".to_owned(),
240 chunk_id.clone(),
241 Box::new(sub_reader),
242 None,
243 );
244
245 let mut buf2 = vec![];
246 reader.read_to_end(&mut buf2).await.unwrap();
247 assert_eq!(buf2.len(), 1000);
248 }
249
250 {
251 let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
252 let mut reader =
253 ChunkReaderWithHash::new("test1".to_owned(), chunk_id.clone(), buf_reader, None);
254
255 let mut buf2 = vec![];
256 reader.read_to_end(&mut buf2).await.unwrap();
257 assert_eq!(buf, buf2);
258 }
259
260 let sub = &buf[1000..2000];
261 let sub_chunk_id = ChunkId::calculate(&sub).await.unwrap();
262
263 {
264 let mut buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
265 buf_reader.seek(SeekFrom::Start(1000)).await.unwrap();
266
267 let mut sub_reader = ReaderWithLimit::new(1000, buf_reader).await.unwrap();
268 sub_reader.seek(SeekFrom::End(500)).await.unwrap();
269 sub_reader.seek(SeekFrom::End(0)).await.unwrap();
270 sub_reader.seek(SeekFrom::Start(0)).await.unwrap();
271
272 let mut reader = ChunkReaderWithHash::new(
273 "test2".to_owned(),
274 sub_chunk_id.clone(),
275 Box::new(sub_reader),
276 None,
277 );
278
279 let mut buf2 = vec![];
280 reader.read_to_end(&mut buf2).await.unwrap();
281 assert_eq!(sub, buf2);
282 }
283
284 {
285 let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
286
287 let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
288 sub_reader.seek(SeekFrom::End(500)).await.unwrap();
289 sub_reader.seek(SeekFrom::End(0)).await.unwrap();
290 sub_reader.seek(SeekFrom::Start(1000)).await.unwrap();
291
292 let mut reader = ChunkReaderWithHash::new(
293 "test2".to_owned(),
294 sub_chunk_id.clone(),
295 Box::new(sub_reader),
296 None,
297 );
298
299 let mut buf2 = vec![];
300 reader.read_to_end(&mut buf2).await.unwrap();
301 assert_eq!(sub, buf2);
302 }
303
304 {
305 let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
306
307 let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
308 let pos = sub_reader.seek(SeekFrom::End(-500)).await.unwrap();
309 assert_eq!(pos, 1500);
310
311 let mut buf2 = vec![];
312 sub_reader.read_to_end(&mut buf2).await.unwrap();
313
314 let sub = &buf[1500..2000];
315 assert_eq!(sub, buf2);
316 }
317
318 {
319 let mut buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
320 buf_reader.seek(SeekFrom::Start(500)).await.unwrap();
321
322 let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
323
324 let pos = sub_reader.seek(SeekFrom::Start(0)).await.unwrap();
325 assert_eq!(pos, 0);
326 let pos = sub_reader.seek(SeekFrom::Current(1000)).await.unwrap();
327 assert_eq!(pos, 1000);
328 let pos = sub_reader.seek(SeekFrom::Current(1000)).await.unwrap();
329 assert_eq!(pos, 2000);
330
331 let pos = sub_reader.seek(SeekFrom::End(-500)).await.unwrap();
332 assert_eq!(pos, 1500);
333
334 let mut buf2 = vec![];
335 sub_reader.read_to_end(&mut buf2).await.unwrap();
336
337 let sub = &buf[2000..2500];
338 assert_eq!(sub, buf2);
339 }
340 }
342
343 #[test]
344 fn test() {
345 async_std::task::block_on(async move {
346 test1().await;
347 });
349 }
350}