cyfs_chunk_lib/
chunk.rs

1#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
2use crate::SharedMemChunk;
3use crate::{MMapChunk, MemChunk};
4use cyfs_base::*;
5use cyfs_util::AsyncReadWithSeek;
6use cyfs_debug::Mutex;
7
8use std::future::Future;
9use std::io::SeekFrom;
10use std::ops::Deref;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pub const CHUNK_SIZE: u64 = 4 * 1024 * 1024;
15
16#[derive(RawEncode, RawDecode)]
17pub enum ChunkMeta {
18    MMapChunk(String, Option<u32>),
19    SharedMemChunk(String, u32, u32),
20    MemChunk(Vec<u8>),
21}
22
23impl ChunkMeta {
24    #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
25    pub async fn to_chunk(self) -> BuckyResult<Box<dyn Chunk>> {
26        match self {
27            ChunkMeta::SharedMemChunk(share_id, capacity, data_len) => Ok(Box::new(
28                SharedMemChunk::new(capacity as usize, data_len as usize, share_id.as_str())?,
29            )),
30            ChunkMeta::MMapChunk(mmap_id, len) => {
31                Ok(Box::new(MMapChunk::open(mmap_id, len).await?))
32            }
33            ChunkMeta::MemChunk(data) => Ok(Box::new(MemChunk::from(data))),
34        }
35    }
36
37    #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
38    pub async fn to_chunk(self) -> BuckyResult<Box<dyn Chunk>> {
39        match self {
40            ChunkMeta::SharedMemChunk(share_id, capacity, data_len) => {
41                let msg = format!("unsupport share mem chunk in android or ios");
42                log::error!("{}", msg.as_str());
43                Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
44            }
45            ChunkMeta::MMapChunk(mmap_id, len) => {
46                Ok(Box::new(MMapChunk::open(mmap_id, len).await?))
47            }
48            ChunkMeta::MemChunk(data) => Ok(Box::new(MemChunk::from(data))),
49        }
50    }
51}
52
53#[async_trait::async_trait]
54pub trait Chunk: Deref<Target = [u8]> + Send + Sync {
55    fn calculate_id(&self) -> ChunkId {
56        let hash = hash_data(&self[..self.get_len() as usize]);
57        ChunkId::new(&hash, self.get_len() as u32)
58    }
59
60    fn get_chunk_meta(&self) -> ChunkMeta;
61    fn get_len(&self) -> usize;
62    fn into_vec(self: Box<Self>) -> Vec<u8>;
63
64    async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize>;
65    async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64>;
66}
67
68pub struct ChunkRead {
69    chunk: Box<dyn Chunk>,
70    read_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<usize>> + Send>>>>,
71    seek_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<u64>> + Send>>>>,
72}
73
74impl ChunkRead {
75    pub fn new(chunk: Box<dyn Chunk>) -> Self {
76        Self {
77            chunk,
78            read_future: Mutex::new(None),
79            seek_future: Mutex::new(None),
80        }
81    }
82}
83
84impl async_std::io::Read for ChunkRead {
85    fn poll_read(
86        self: Pin<&mut Self>,
87        cx: &mut Context<'_>,
88        buf: &mut [u8],
89    ) -> Poll<std::io::Result<usize>> {
90        unsafe {
91            let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
92            let buf: &'static mut [u8] = std::mem::transmute(buf);
93            let mut future = this.read_future.lock().unwrap();
94            if future.is_none() {
95                *future = Some(Box::pin(this.chunk.read(buf)));
96            }
97            match future.as_mut().unwrap().as_mut().poll(cx) {
98                Poll::Ready(ret) => {
99                    *future = None;
100                    match ret {
101                        Ok(ret) => Poll::Ready(Ok(ret)),
102                        Err(e) => Poll::Ready(Err(e.into())),
103                    }
104                }
105                Poll::Pending => Poll::Pending,
106            }
107        }
108    }
109}
110
111impl async_std::io::Seek for ChunkRead {
112    fn poll_seek(
113        self: Pin<&mut Self>,
114        cx: &mut Context<'_>,
115        pos: SeekFrom,
116    ) -> Poll<std::io::Result<u64>> {
117        unsafe {
118            let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
119            let mut future = this.seek_future.lock().unwrap();
120            if future.is_none() {
121                *future = Some(Box::pin(this.chunk.seek(pos)));
122            }
123            match future.as_mut().unwrap().as_mut().poll(cx) {
124                Poll::Ready(ret) => {
125                    *future = None;
126                    match ret {
127                        Ok(ret) => Poll::Ready(Ok(ret)),
128                        Err(e) => Poll::Ready(Err(e.into())),
129                    }
130                }
131                Poll::Pending => Poll::Pending,
132            }
133        }
134    }
135}
136
137use async_std::io::{Seek, Read};
138use std::ops::Range;
139
140impl AsyncReadWithSeek for ChunkRead {}
141
142pub struct ChunkReadWithRanges {
143    reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync + 'static>,
144    ranges: Vec<Range<u64>>,
145}
146
147impl ChunkReadWithRanges {
148    pub fn new(
149        reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync + 'static>,
150        ranges: Vec<Range<u64>>,
151    ) -> Self {
152        Self { reader, ranges }
153    }
154}
155
156impl Read for ChunkReadWithRanges {
157    fn poll_read(
158        mut self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160        buf: &mut [u8],
161    ) -> Poll<std::io::Result<usize>> {
162        loop {
163            if self.ranges.len() == 0 {
164                break Poll::Ready(Ok(0));
165            }
166
167            let mut range = self.as_ref().ranges[0].clone();
168            if range.is_empty() {
169                self.ranges.remove(0);
170                continue;
171            }
172
173            // seek with the current range
174            match Pin::new(&mut self.reader).poll_seek(cx, SeekFrom::Start(range.start)) {
175                Poll::Ready(ret) => {
176                    match ret {
177                        Ok(pos) => {
178                            if pos != range.start {
179                                let msg = format!("poll seek with range but ret pos not match! range={:?}, pos={}", range, pos);
180                                log::error!("{}", msg);
181
182                                let e = BuckyError::new(BuckyErrorCode::IoError, msg);
183                                break Poll::Ready(Err(e.into()));
184                            }
185                        }
186                        Err(e) => {
187                            break Poll::Ready(Err(e));
188                        }
189                    }
190                }
191                Poll::Pending => {
192                    break Poll::Pending;
193                }
194            }
195
196            // read max bytes as range_len
197            let range_len = (range.end - range.start) as usize;
198            let range_buf = if buf.len() > range_len {
199                &mut buf[..range_len]
200            } else {
201                buf
202            };
203
204            match Pin::new(&mut self.reader).poll_read(cx, range_buf) {
205                Poll::Ready(ret) => match ret {
206                    Ok(mut size) => {
207                        assert!(size <= range_len);
208
209                        if size > range_len {
210                            size = range_len;
211                        }
212
213                        range.start += size as u64;
214                        if range.is_empty() {
215                            // current range is completed
216                            self.ranges.remove(0);
217                        } else {
218                            // current range updated
219                            self.ranges[0] = range;
220                        }
221
222                        break Poll::Ready(Ok(size));
223                    }
224                    Err(e) => {
225                        break Poll::Ready(Err(e));
226                    }
227                },
228                Poll::Pending => {
229                    break Poll::Pending;
230                }
231            }
232        }
233    }
234}
235
236#[async_trait::async_trait]
237pub trait ChunkMut: Chunk {
238    async fn reset(&mut self) -> BuckyResult<()>;
239    async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize>;
240    async fn flush(&mut self) -> BuckyResult<()>;
241}
242
243pub struct ChunkWrite {
244    chunk: Box<dyn ChunkMut>,
245    future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<usize>>>>>>,
246    flush_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<()>>>>>>,
247}
248
249impl ChunkWrite {
250    pub fn new(chunk: Box<dyn ChunkMut>) -> Self {
251        Self {
252            chunk,
253            future: Mutex::new(None),
254            flush_future: Mutex::new(None),
255        }
256    }
257}
258
259impl async_std::io::Write for ChunkWrite {
260    fn poll_write(
261        self: Pin<&mut Self>,
262        cx: &mut Context<'_>,
263        buf: &[u8],
264    ) -> Poll<std::io::Result<usize>> {
265        unsafe {
266            let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
267            let buf: &'static [u8] = std::mem::transmute(buf);
268            let mut future = this.future.lock().unwrap();
269            if future.is_none() {
270                *future = Some(Box::pin(this.chunk.write(buf)));
271            }
272            match future.as_mut().unwrap().as_mut().poll(cx) {
273                Poll::Ready(ret) => {
274                    *future = None;
275                    match ret {
276                        Ok(ret) => Poll::Ready(Ok(ret)),
277                        Err(e) => Poll::Ready(Err(std::io::Error::new(
278                            std::io::ErrorKind::InvalidInput,
279                            format!("{}", e),
280                        ))),
281                    }
282                }
283                Poll::Pending => Poll::Pending,
284            }
285        }
286    }
287
288    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
289        unsafe {
290            let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
291            let mut flush_future = this.flush_future.lock().unwrap();
292            if flush_future.is_none() {
293                *flush_future = Some(Box::pin(this.chunk.flush()));
294            }
295            match flush_future.as_mut().unwrap().as_mut().poll(cx) {
296                Poll::Ready(ret) => {
297                    *flush_future = None;
298                    match ret {
299                        Ok(ret) => Poll::Ready(Ok(ret)),
300                        Err(e) => Poll::Ready(Err(e.into())),
301                    }
302                }
303                Poll::Pending => Poll::Pending,
304            }
305        }
306    }
307
308    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
309        Poll::Ready(Ok(()))
310    }
311}