1#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
2use crate::SharedMemChunk;
3use crate::{MMapChunk, MemChunk};
4use cyfs_base::*;
5use cyfs_util::AsyncReadWithSeek;
6use cyfs_debug::Mutex;
7
8use std::future::Future;
9use std::io::SeekFrom;
10use std::ops::Deref;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pub const CHUNK_SIZE: u64 = 4 * 1024 * 1024;
15
16#[derive(RawEncode, RawDecode)]
17pub enum ChunkMeta {
18 MMapChunk(String, Option<u32>),
19 SharedMemChunk(String, u32, u32),
20 MemChunk(Vec<u8>),
21}
22
23impl ChunkMeta {
24 #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
25 pub async fn to_chunk(self) -> BuckyResult<Box<dyn Chunk>> {
26 match self {
27 ChunkMeta::SharedMemChunk(share_id, capacity, data_len) => Ok(Box::new(
28 SharedMemChunk::new(capacity as usize, data_len as usize, share_id.as_str())?,
29 )),
30 ChunkMeta::MMapChunk(mmap_id, len) => {
31 Ok(Box::new(MMapChunk::open(mmap_id, len).await?))
32 }
33 ChunkMeta::MemChunk(data) => Ok(Box::new(MemChunk::from(data))),
34 }
35 }
36
37 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
38 pub async fn to_chunk(self) -> BuckyResult<Box<dyn Chunk>> {
39 match self {
40 ChunkMeta::SharedMemChunk(share_id, capacity, data_len) => {
41 let msg = format!("unsupport share mem chunk in android or ios");
42 log::error!("{}", msg.as_str());
43 Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
44 }
45 ChunkMeta::MMapChunk(mmap_id, len) => {
46 Ok(Box::new(MMapChunk::open(mmap_id, len).await?))
47 }
48 ChunkMeta::MemChunk(data) => Ok(Box::new(MemChunk::from(data))),
49 }
50 }
51}
52
53#[async_trait::async_trait]
54pub trait Chunk: Deref<Target = [u8]> + Send + Sync {
55 fn calculate_id(&self) -> ChunkId {
56 let hash = hash_data(&self[..self.get_len() as usize]);
57 ChunkId::new(&hash, self.get_len() as u32)
58 }
59
60 fn get_chunk_meta(&self) -> ChunkMeta;
61 fn get_len(&self) -> usize;
62 fn into_vec(self: Box<Self>) -> Vec<u8>;
63
64 async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize>;
65 async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64>;
66}
67
68pub struct ChunkRead {
69 chunk: Box<dyn Chunk>,
70 read_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<usize>> + Send>>>>,
71 seek_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<u64>> + Send>>>>,
72}
73
74impl ChunkRead {
75 pub fn new(chunk: Box<dyn Chunk>) -> Self {
76 Self {
77 chunk,
78 read_future: Mutex::new(None),
79 seek_future: Mutex::new(None),
80 }
81 }
82}
83
84impl async_std::io::Read for ChunkRead {
85 fn poll_read(
86 self: Pin<&mut Self>,
87 cx: &mut Context<'_>,
88 buf: &mut [u8],
89 ) -> Poll<std::io::Result<usize>> {
90 unsafe {
91 let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
92 let buf: &'static mut [u8] = std::mem::transmute(buf);
93 let mut future = this.read_future.lock().unwrap();
94 if future.is_none() {
95 *future = Some(Box::pin(this.chunk.read(buf)));
96 }
97 match future.as_mut().unwrap().as_mut().poll(cx) {
98 Poll::Ready(ret) => {
99 *future = None;
100 match ret {
101 Ok(ret) => Poll::Ready(Ok(ret)),
102 Err(e) => Poll::Ready(Err(e.into())),
103 }
104 }
105 Poll::Pending => Poll::Pending,
106 }
107 }
108 }
109}
110
111impl async_std::io::Seek for ChunkRead {
112 fn poll_seek(
113 self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 pos: SeekFrom,
116 ) -> Poll<std::io::Result<u64>> {
117 unsafe {
118 let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
119 let mut future = this.seek_future.lock().unwrap();
120 if future.is_none() {
121 *future = Some(Box::pin(this.chunk.seek(pos)));
122 }
123 match future.as_mut().unwrap().as_mut().poll(cx) {
124 Poll::Ready(ret) => {
125 *future = None;
126 match ret {
127 Ok(ret) => Poll::Ready(Ok(ret)),
128 Err(e) => Poll::Ready(Err(e.into())),
129 }
130 }
131 Poll::Pending => Poll::Pending,
132 }
133 }
134 }
135}
136
137use async_std::io::{Seek, Read};
138use std::ops::Range;
139
140impl AsyncReadWithSeek for ChunkRead {}
141
142pub struct ChunkReadWithRanges {
143 reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync + 'static>,
144 ranges: Vec<Range<u64>>,
145}
146
147impl ChunkReadWithRanges {
148 pub fn new(
149 reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync + 'static>,
150 ranges: Vec<Range<u64>>,
151 ) -> Self {
152 Self { reader, ranges }
153 }
154}
155
156impl Read for ChunkReadWithRanges {
157 fn poll_read(
158 mut self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 buf: &mut [u8],
161 ) -> Poll<std::io::Result<usize>> {
162 loop {
163 if self.ranges.len() == 0 {
164 break Poll::Ready(Ok(0));
165 }
166
167 let mut range = self.as_ref().ranges[0].clone();
168 if range.is_empty() {
169 self.ranges.remove(0);
170 continue;
171 }
172
173 match Pin::new(&mut self.reader).poll_seek(cx, SeekFrom::Start(range.start)) {
175 Poll::Ready(ret) => {
176 match ret {
177 Ok(pos) => {
178 if pos != range.start {
179 let msg = format!("poll seek with range but ret pos not match! range={:?}, pos={}", range, pos);
180 log::error!("{}", msg);
181
182 let e = BuckyError::new(BuckyErrorCode::IoError, msg);
183 break Poll::Ready(Err(e.into()));
184 }
185 }
186 Err(e) => {
187 break Poll::Ready(Err(e));
188 }
189 }
190 }
191 Poll::Pending => {
192 break Poll::Pending;
193 }
194 }
195
196 let range_len = (range.end - range.start) as usize;
198 let range_buf = if buf.len() > range_len {
199 &mut buf[..range_len]
200 } else {
201 buf
202 };
203
204 match Pin::new(&mut self.reader).poll_read(cx, range_buf) {
205 Poll::Ready(ret) => match ret {
206 Ok(mut size) => {
207 assert!(size <= range_len);
208
209 if size > range_len {
210 size = range_len;
211 }
212
213 range.start += size as u64;
214 if range.is_empty() {
215 self.ranges.remove(0);
217 } else {
218 self.ranges[0] = range;
220 }
221
222 break Poll::Ready(Ok(size));
223 }
224 Err(e) => {
225 break Poll::Ready(Err(e));
226 }
227 },
228 Poll::Pending => {
229 break Poll::Pending;
230 }
231 }
232 }
233 }
234}
235
236#[async_trait::async_trait]
237pub trait ChunkMut: Chunk {
238 async fn reset(&mut self) -> BuckyResult<()>;
239 async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize>;
240 async fn flush(&mut self) -> BuckyResult<()>;
241}
242
243pub struct ChunkWrite {
244 chunk: Box<dyn ChunkMut>,
245 future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<usize>>>>>>,
246 flush_future: Mutex<Option<Pin<Box<dyn Future<Output = BuckyResult<()>>>>>>,
247}
248
249impl ChunkWrite {
250 pub fn new(chunk: Box<dyn ChunkMut>) -> Self {
251 Self {
252 chunk,
253 future: Mutex::new(None),
254 flush_future: Mutex::new(None),
255 }
256 }
257}
258
259impl async_std::io::Write for ChunkWrite {
260 fn poll_write(
261 self: Pin<&mut Self>,
262 cx: &mut Context<'_>,
263 buf: &[u8],
264 ) -> Poll<std::io::Result<usize>> {
265 unsafe {
266 let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
267 let buf: &'static [u8] = std::mem::transmute(buf);
268 let mut future = this.future.lock().unwrap();
269 if future.is_none() {
270 *future = Some(Box::pin(this.chunk.write(buf)));
271 }
272 match future.as_mut().unwrap().as_mut().poll(cx) {
273 Poll::Ready(ret) => {
274 *future = None;
275 match ret {
276 Ok(ret) => Poll::Ready(Ok(ret)),
277 Err(e) => Poll::Ready(Err(std::io::Error::new(
278 std::io::ErrorKind::InvalidInput,
279 format!("{}", e),
280 ))),
281 }
282 }
283 Poll::Pending => Poll::Pending,
284 }
285 }
286 }
287
288 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
289 unsafe {
290 let this: &'static mut Self = std::mem::transmute(self.get_unchecked_mut());
291 let mut flush_future = this.flush_future.lock().unwrap();
292 if flush_future.is_none() {
293 *flush_future = Some(Box::pin(this.chunk.flush()));
294 }
295 match flush_future.as_mut().unwrap().as_mut().poll(cx) {
296 Poll::Ready(ret) => {
297 *flush_future = None;
298 match ret {
299 Ok(ret) => Poll::Ready(Ok(ret)),
300 Err(e) => Poll::Ready(Err(e.into())),
301 }
302 }
303 Poll::Pending => Poll::Pending,
304 }
305 }
306 }
307
308 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
309 Poll::Ready(Ok(()))
310 }
311}