cyfs_chunk_lib/
mmap_chunk.rs

1use std::fs::{File, OpenOptions};
2use std::io::{SeekFrom};
3use std::ops::{Deref, DerefMut};
4use std::path::Path;
5use cyfs_base::{BuckyError, BuckyErrorCode, BuckyResult};
6use memmap2::{MmapMut, MmapOptions};
7use crate::{Chunk, ChunkMeta, ChunkMut};
8
9pub struct MMapChunk {
10    file_path: String,
11    mmap: memmap2::Mmap,
12    read_pos: usize,
13    data_len: usize,
14}
15
16impl MMapChunk {
17    pub async fn open<P: AsRef<Path>>(path: P, data_len: Option<u32>) -> BuckyResult<Self> {
18        let file_path = path.as_ref().to_string_lossy().to_string();
19        log::info!("MMapChunk open {}", file_path.as_str());
20        let tmp_path = path.as_ref().to_path_buf();
21        let ret: BuckyResult<Self> = async_std::task::spawn_blocking(move || {
22            unsafe {
23                let file = File::open(tmp_path.as_path()).map_err(|e| {
24                    let msg = format!("[{}:{}] open {} failed.err {}", file!(), line!(), tmp_path.to_string_lossy().to_string(), e);
25                    log::error!("{}", msg.as_str());
26                    BuckyError::new(BuckyErrorCode::Failed, msg)
27                })?;
28                let mmap = MmapOptions::new().map(&file).map_err(|e| {
29                    let msg = format!("[{}:{}] create file {} map failed.err {}", file!(), line!(), tmp_path.to_string_lossy().to_string(), e);
30                    log::error!("{}", msg.as_str());
31                    BuckyError::new(BuckyErrorCode::Failed, msg)
32                })?;
33                let data_len = if data_len.is_some() {
34                    if data_len.unwrap() as usize > mmap.len() {
35                        mmap.len()
36                    } else {
37                        data_len.unwrap() as usize
38                    }
39                } else {
40                    mmap.len()
41                };
42                Ok(Self {
43                    file_path,
44                    mmap,
45                    read_pos: 0,
46                    data_len
47                })
48            }
49        }).await;
50        log::info!("MMapChunk open {} success", path.as_ref().to_string_lossy().to_string());
51        ret
52    }
53}
54
55impl Deref for MMapChunk {
56    type Target = [u8];
57
58    fn deref(&self) -> &Self::Target {
59        &self.mmap
60    }
61}
62
63#[async_trait::async_trait]
64impl Chunk for MMapChunk {
65    fn get_chunk_meta(&self) -> ChunkMeta {
66        ChunkMeta::MMapChunk(self.file_path.clone(), Some(self.data_len as u32))
67    }
68
69    fn get_len(&self) -> usize {
70        self.mmap.len()
71    }
72
73    fn into_vec(self: Box<Self>) -> Vec<u8> {
74        self.mmap.to_vec()
75    }
76
77    async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
78        let this = self;
79        if this.read_pos >= this.data_len {
80            Ok(0)
81        } else if buf.len() > this.data_len - this.read_pos {
82            unsafe {std::ptr::copy::<u8>(this.mmap[this.read_pos..].as_ptr(), buf.as_mut_ptr(), this.data_len - this.read_pos)};
83            let read_len = this.data_len - this.read_pos;
84            this.read_pos = this.data_len;
85            Ok(read_len)
86        } else {
87            unsafe {std::ptr::copy::<u8>(this.mmap[this.read_pos..this.read_pos + buf.len()].as_ptr(), buf.as_mut_ptr(), buf.len())};
88            let read_len = buf.len();
89            this.read_pos = this.read_pos + read_len;
90            Ok(read_len)
91        }
92    }
93
94    async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
95        let this = self;
96        match pos {
97            SeekFrom::Start(pos) => {
98                this.read_pos = pos as usize;
99                Ok(pos)
100            },
101            SeekFrom::End(pos) => {
102                if this.data_len as i64 + pos < 0 {
103                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
104                }
105                this.read_pos = (this.data_len as i64 + pos) as usize;
106                Ok(this.read_pos as u64)
107            },
108            SeekFrom::Current(pos) => {
109                if this.read_pos as i64 + pos < 0 {
110                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
111                }
112                this.read_pos = (this.read_pos as i64 + pos) as usize;
113                Ok(this.read_pos as u64)
114            }
115        }
116    }
117}
118
119pub struct MMapChunkMut {
120    file_path: String,
121    mmap: memmap2::MmapMut,
122    cur_pos: usize,
123    data_len: usize
124}
125
126impl MMapChunkMut {
127    pub async fn open<P: AsRef<Path>>(path: P, capacity: u64, data_len: Option<u64>) -> BuckyResult<Self> {
128        let file_path = path.as_ref().to_string_lossy().to_string();
129        let path = path.as_ref().to_path_buf();
130        async_std::task::spawn_blocking(move || {
131            unsafe {
132                let file = OpenOptions::new().read(true).write(true).create(true).open(path.as_path()).map_err(|e| {
133                    let msg = format!("[{}:{}] open {} failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
134                    log::error!("{}", msg.as_str());
135                    BuckyError::new(BuckyErrorCode::Failed, msg)
136                })?;
137
138                let data_len = if data_len.is_some() {
139                    if data_len.unwrap() > capacity {
140                        capacity
141                    } else {
142                        data_len.unwrap()
143                    }
144                } else {
145                    let mut data_len = file.metadata().map_err(|e| {
146                        let msg = format!("[{}:{}] get {} meta failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
147                        log::error!("{}", msg.as_str());
148                        BuckyError::new(BuckyErrorCode::Failed, msg)
149                    })?.len();
150                    if data_len > capacity {
151                        data_len = capacity;
152                    }
153                    data_len
154                };
155                file.set_len(capacity).map_err(|e| {
156                    let msg = format!("[{}:{}] set file {}  len {} failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), capacity, e);
157                    log::error!("{}", msg.as_str());
158                    BuckyError::new(BuckyErrorCode::Failed, msg)
159                })?;
160                let mmap = MmapMut::map_mut(&file).map_err(|e| {
161                    let msg = format!("[{}:{}] create file {} map failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
162                    log::error!("{}", msg.as_str());
163                    BuckyError::new(BuckyErrorCode::Failed, msg)
164                })?;
165
166                Ok(Self {
167                    file_path,
168                    mmap,
169                    cur_pos: 0,
170                    data_len: data_len as usize
171                })
172            }
173        }).await
174    }
175}
176
177impl Deref for MMapChunkMut {
178    type Target = [u8];
179
180    fn deref(&self) -> &Self::Target {
181        &self.mmap
182    }
183}
184
185impl DerefMut for MMapChunkMut {
186    fn deref_mut(&mut self) -> &mut Self::Target {
187        &mut self.mmap
188    }
189}
190
191#[async_trait::async_trait]
192impl Chunk for MMapChunkMut {
193    fn get_chunk_meta(&self) -> ChunkMeta {
194        ChunkMeta::MMapChunk(self.file_path.clone(), Some(self.data_len as u32))
195    }
196
197    fn get_len(&self) -> usize {
198        self.data_len
199    }
200
201    fn into_vec(self: Box<Self>) -> Vec<u8> {
202        self.mmap[..self.data_len].to_vec()
203    }
204
205    async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
206        let this = self;
207        if this.cur_pos >= this.data_len {
208            Ok(0)
209        } else if buf.len() > this.data_len - this.cur_pos {
210            unsafe {std::ptr::copy::<u8>(this.mmap[this.cur_pos..].as_ptr(), buf.as_mut_ptr(), this.data_len - this.cur_pos)};
211            let read_len = this.data_len - this.cur_pos;
212            this.cur_pos = this.data_len;
213            Ok(read_len)
214        } else {
215            unsafe {std::ptr::copy::<u8>(this.mmap[this.cur_pos..this.cur_pos + buf.len()].as_ptr(), buf.as_mut_ptr(), buf.len())};
216            let read_len = buf.len();
217            this.cur_pos = this.cur_pos + read_len;
218            Ok(read_len)
219        }
220    }
221
222    async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
223        let this = self;
224        match pos {
225            SeekFrom::Start(pos) => {
226                this.cur_pos = pos as usize;
227                Ok(pos)
228            },
229            SeekFrom::End(pos) => {
230                if this.data_len as i64 + pos < 0 {
231                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
232                }
233                this.cur_pos = (this.data_len as i64 + pos) as usize;
234                Ok(this.cur_pos as u64)
235            },
236            SeekFrom::Current(pos) => {
237                if this.cur_pos as i64 + pos < 0 {
238                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
239                }
240                this.cur_pos = (this.cur_pos as i64 + pos) as usize;
241                Ok(this.cur_pos as u64)
242            }
243        }
244    }
245}
246
247#[async_trait::async_trait]
248impl ChunkMut for MMapChunkMut {
249    async fn reset(&mut self) -> BuckyResult<()> {
250        self.cur_pos = 0;
251        self.data_len = 0;
252        Ok(())
253    }
254
255    async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize> {
256        let this = self;
257        unsafe {
258            if this.cur_pos + buf.len() >= this.mmap.len() {
259                let write_size = this.mmap.len() - this.cur_pos;
260                std::ptr::copy(buf.as_ptr(), this.mmap[this.cur_pos..].as_mut_ptr(), write_size);
261                this.cur_pos = this.mmap.len();
262                if this.cur_pos > this.data_len {
263                    this.data_len = this.cur_pos;
264                }
265                Ok(write_size)
266            } else {
267                std::ptr::copy(buf.as_ptr(), this.mmap[this.cur_pos..].as_mut_ptr(), buf.len());
268                this.cur_pos += buf.len();
269                if this.cur_pos > this.data_len {
270                    this.data_len = this.cur_pos;
271                }
272                Ok(buf.len())
273            }
274        }
275    }
276
277    async fn flush(&mut self) -> BuckyResult<()> {
278        self.mmap.flush().map_err(|e| {
279            let msg = format!("flush err {}", e);
280            log::error!("{}", msg.as_str());
281            BuckyError::new(BuckyErrorCode::Failed, msg)
282        })
283    }
284}