cyfs_util/util/
read_helper.rs

1use super::read_seek::AsyncReadWithSeek;
2use cyfs_base::*;
3
4use cyfs_sha2 as sha2;
5use futures::AsyncSeekExt;
6use sha2::Digest;
7use std::io::SeekFrom;
8use std::ops::Range;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12pub struct ReaderWithLimit {
13    limit: u64,
14    range: Range<u64>,
15    reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
16}
17
18impl ReaderWithLimit {
19    pub async fn new(
20        limit: u64,
21        mut reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
22    ) -> BuckyResult<Self> {
23        let start = reader.stream_position().await?;
24        let range = Range {
25            start,
26            end: start + limit,
27        };
28
29        Ok(Self {
30            limit,
31            range,
32            reader,
33        })
34    }
35}
36
37impl async_std::io::Read for ReaderWithLimit {
38    fn poll_read(
39        mut self: Pin<&mut Self>,
40        cx: &mut Context<'_>,
41        buf: &mut [u8],
42    ) -> Poll<std::io::Result<usize>> {
43        if self.limit == 0 {
44            return Poll::Ready(Ok(0));
45        }
46
47        let max = std::cmp::min(buf.len() as u64, self.limit) as usize;
48        let ret = Pin::new(self.reader.as_mut()).poll_read(cx, &mut buf[..max]);
49        match ret {
50            Poll::Ready(Ok(n)) => {
51                self.limit -= n as u64;
52                Poll::Ready(Ok(n))
53            }
54            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
55            Poll::Pending => Poll::Pending,
56        }
57    }
58}
59
60impl async_std::io::Seek for ReaderWithLimit {
61    fn poll_seek(
62        mut self: Pin<&mut Self>,
63        cx: &mut Context<'_>,
64        pos: SeekFrom,
65    ) -> Poll<std::io::Result<u64>> {
66        let pos = match pos {
67            SeekFrom::Start(pos) => SeekFrom::Start(self.range.start + pos),
68            SeekFrom::End(offset) => SeekFrom::Start((self.range.end as i64 + offset) as u64),
69            SeekFrom::Current(offset) => SeekFrom::Current(offset),
70        };
71
72        // println!("pos={:?}, range={:?}", pos, self.range);
73        match Pin::new(self.reader.as_mut()).poll_seek(cx, pos) {
74            Poll::Pending => Poll::Pending,
75            Poll::Ready(Ok(mut pos)) => {
76                // println!("pos ret={}", pos);
77                if pos < self.range.start {
78                    let msg = format!("seek beyond the begin: {} < {}", pos, self.range.start);
79                    let err = BuckyError::new(BuckyErrorCode::InvalidInput, msg);
80                    return Poll::Ready(Err(err.into()));
81                } else if pos > self.range.end {
82                    pos = self.range.end;
83                }
84
85                self.limit = self.range.end - pos;
86                Poll::Ready(Ok(pos - self.range.start))
87            }
88            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
89        }
90    }
91}
92
93impl AsyncReadWithSeek for ReaderWithLimit {}
94
95#[async_trait::async_trait]
96pub trait ChunkHashErrorHandler: Send + Sync {
97    fn on_hash_error(&self, chunk_id: &ChunkId, path: &str);
98}
99
100pub struct ChunkReaderWithHash {
101    path: String,
102    chunk_id: ChunkId,
103    reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
104    hash: sha2::Sha256,
105    error_handler: Option<Box<dyn ChunkHashErrorHandler>>,
106    seeked: bool,
107    hashed_len: usize,
108}
109
110impl ChunkReaderWithHash {
111    pub fn new(
112        path: String,
113        chunk_id: ChunkId,
114        reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
115        error_handler: Option<Box<dyn ChunkHashErrorHandler>>,
116    ) -> Self {
117        Self {
118            path,
119            chunk_id,
120            reader,
121            hash: sha2::Sha256::new(),
122            error_handler,
123            seeked: false,
124            hashed_len: 0,
125        }
126    }
127}
128
129impl async_std::io::Read for ChunkReaderWithHash {
130    fn poll_read(
131        mut self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133        buf: &mut [u8],
134    ) -> Poll<std::io::Result<usize>> {
135        let ret = Pin::new(self.reader.as_mut()).poll_read(cx, buf);
136        match ret {
137            Poll::Ready(ret) => match ret {
138                Ok(size) => {
139                    if size > 0 {
140                        self.hashed_len += size;
141                        self.hash.input(&buf[0..size]);
142                        Poll::Ready(Ok(size))
143                    } else {
144                        if self.seeked {
145                            warn!(
146                                "read chunk with hash but seeked already! chunk={}",
147                                self.chunk_id
148                            );
149                            Poll::Ready(Ok(0))
150                        } else if self.hashed_len != self.chunk_id.len() {
151                            error!("read chunk with hash but ended with unmatched length! chunk={}, len={}, read len={}", 
152                                self.chunk_id, self.chunk_id.len(), self.hashed_len,);
153                            // FIXME what should we do?
154                            Poll::Ready(Ok(0))
155                        } else {
156                            let hash_value = self.hash.clone().result().into();
157                            let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32);
158
159                            if actual_id.eq(&self.chunk_id) {
160                                debug!(
161                                    "read chunk from file complete! chunk={}, file={}",
162                                    self.chunk_id, self.path
163                                );
164                                Poll::Ready(Ok(0))
165                            } else {
166                                let msg = format!(
167                                    "content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}",
168                                    self.chunk_id, self.path, self.chunk_id, actual_id
169                                );
170                                error!("{}", msg);
171
172                                if let Some(error_handler) = self.error_handler.take() {
173                                    error_handler.on_hash_error(&self.chunk_id, &self.path);
174                                }
175
176                                let err = BuckyError::new(BuckyErrorCode::InvalidData, msg);
177                                Poll::Ready(Err(err.into()))
178                            }
179                        }
180                    }
181                }
182                Err(e) => Poll::Ready(Err(e)),
183            },
184            Poll::Pending => Poll::Pending,
185        }
186    }
187}
188
189impl async_std::io::Seek for ChunkReaderWithHash {
190    fn poll_seek(
191        mut self: Pin<&mut Self>,
192        cx: &mut Context<'_>,
193        pos: SeekFrom,
194    ) -> Poll<std::io::Result<u64>> {
195        self.seeked = true;
196        Pin::new(self.reader.as_mut()).poll_seek(cx, pos)
197    }
198}
199
200impl AsyncReadWithSeek for ChunkReaderWithHash {}
201
202#[cfg(test)]
203mod tests {
204    use super::{ChunkReaderWithHash, ReaderWithLimit};
205    use async_std::io::prelude::*;
206    use cyfs_base::*;
207    use std::io::SeekFrom;
208    use std::str::FromStr;
209
210    async fn test_file() {
211        // let file = "C:\\cyfs\\data\\app\\cyfs-stack-test\\root\\test-chunk-in-bundle";
212        // let chunk_id = ChunkId::from_str("7C8WUcPdJGHvGxWou3HoABNe41Xhm9m3aEsSHfj1zeWG").unwrap();
213
214        let file = "C:\\cyfs\\data\\test\\2JtHrtiW4J";
215        let chunk_id = ChunkId::from_str("7C8WXUGiYVyag6WXdsFz6B8JgpedMMgkng3MRM4XoPrX").unwrap();
216
217        //let buf = std::fs::read(file).unwrap();
218        //let real_id = ChunkId::calculate_sync(&buf).unwrap();
219        //assert_eq!(real_id, chunk_id);
220
221        let reader = async_std::fs::File::open(file).await.unwrap();
222        let mut reader =
223            ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);
224
225        let mut buf2 = vec![];
226        reader.read_to_end(&mut buf2).await.unwrap_err();
227    }
228
229    async fn test1() {
230        let buf: Vec<u8> = (0..3000).map(|_| rand::random::<u8>()).collect();
231        let chunk_id = ChunkId::calculate(&buf).await.unwrap();
232
233        {
234            let chunk_id = ChunkId::calculate(&buf[0..1000]).await.unwrap();
235            let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
236
237            let sub_reader = ReaderWithLimit::new(1000, buf_reader).await.unwrap();
238            let mut reader = ChunkReaderWithHash::new(
239                "test2".to_owned(),
240                chunk_id.clone(),
241                Box::new(sub_reader),
242                None,
243            );
244
245            let mut buf2 = vec![];
246            reader.read_to_end(&mut buf2).await.unwrap();
247            assert_eq!(buf2.len(), 1000);
248        }
249
250        {
251            let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
252            let mut reader =
253                ChunkReaderWithHash::new("test1".to_owned(), chunk_id.clone(), buf_reader, None);
254
255            let mut buf2 = vec![];
256            reader.read_to_end(&mut buf2).await.unwrap();
257            assert_eq!(buf, buf2);
258        }
259
260        let sub = &buf[1000..2000];
261        let sub_chunk_id = ChunkId::calculate(&sub).await.unwrap();
262
263        {
264            let mut buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
265            buf_reader.seek(SeekFrom::Start(1000)).await.unwrap();
266
267            let mut sub_reader = ReaderWithLimit::new(1000, buf_reader).await.unwrap();
268            sub_reader.seek(SeekFrom::End(500)).await.unwrap();
269            sub_reader.seek(SeekFrom::End(0)).await.unwrap();
270            sub_reader.seek(SeekFrom::Start(0)).await.unwrap();
271
272            let mut reader = ChunkReaderWithHash::new(
273                "test2".to_owned(),
274                sub_chunk_id.clone(),
275                Box::new(sub_reader),
276                None,
277            );
278
279            let mut buf2 = vec![];
280            reader.read_to_end(&mut buf2).await.unwrap();
281            assert_eq!(sub, buf2);
282        }
283
284        {
285            let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
286
287            let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
288            sub_reader.seek(SeekFrom::End(500)).await.unwrap();
289            sub_reader.seek(SeekFrom::End(0)).await.unwrap();
290            sub_reader.seek(SeekFrom::Start(1000)).await.unwrap();
291
292            let mut reader = ChunkReaderWithHash::new(
293                "test2".to_owned(),
294                sub_chunk_id.clone(),
295                Box::new(sub_reader),
296                None,
297            );
298
299            let mut buf2 = vec![];
300            reader.read_to_end(&mut buf2).await.unwrap();
301            assert_eq!(sub, buf2);
302        }
303
304        {
305            let buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
306
307            let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
308            let pos = sub_reader.seek(SeekFrom::End(-500)).await.unwrap();
309            assert_eq!(pos, 1500);
310
311            let mut buf2 = vec![];
312            sub_reader.read_to_end(&mut buf2).await.unwrap();
313
314            let sub = &buf[1500..2000];
315            assert_eq!(sub, buf2);
316        }
317
318        {
319            let mut buf_reader = Box::new(async_std::io::Cursor::new(buf.clone()));
320            buf_reader.seek(SeekFrom::Start(500)).await.unwrap();
321
322            let mut sub_reader = ReaderWithLimit::new(2000, buf_reader).await.unwrap();
323
324            let pos = sub_reader.seek(SeekFrom::Start(0)).await.unwrap();
325            assert_eq!(pos, 0);
326            let pos = sub_reader.seek(SeekFrom::Current(1000)).await.unwrap();
327            assert_eq!(pos, 1000);
328            let pos = sub_reader.seek(SeekFrom::Current(1000)).await.unwrap();
329            assert_eq!(pos, 2000);
330
331            let pos = sub_reader.seek(SeekFrom::End(-500)).await.unwrap();
332            assert_eq!(pos, 1500);
333
334            let mut buf2 = vec![];
335            sub_reader.read_to_end(&mut buf2).await.unwrap();
336
337            let sub = &buf[2000..2500];
338            assert_eq!(sub, buf2);
339        }
340        // sub_reader.seek(SeekFrom::Start(500)).await.unwrap();
341    }
342
343    #[test]
344    fn test() {
345        async_std::task::block_on(async move {
346            test1().await;
347            // test_file().await;
348        });
349    }
350}