1use std::fs::{File, OpenOptions};
2use std::io::{SeekFrom};
3use std::ops::{Deref, DerefMut};
4use std::path::Path;
5use cyfs_base::{BuckyError, BuckyErrorCode, BuckyResult};
6use memmap2::{MmapMut, MmapOptions};
7use crate::{Chunk, ChunkMeta, ChunkMut};
8
9pub struct MMapChunk {
10 file_path: String,
11 mmap: memmap2::Mmap,
12 read_pos: usize,
13 data_len: usize,
14}
15
16impl MMapChunk {
17 pub async fn open<P: AsRef<Path>>(path: P, data_len: Option<u32>) -> BuckyResult<Self> {
18 let file_path = path.as_ref().to_string_lossy().to_string();
19 log::info!("MMapChunk open {}", file_path.as_str());
20 let tmp_path = path.as_ref().to_path_buf();
21 let ret: BuckyResult<Self> = async_std::task::spawn_blocking(move || {
22 unsafe {
23 let file = File::open(tmp_path.as_path()).map_err(|e| {
24 let msg = format!("[{}:{}] open {} failed.err {}", file!(), line!(), tmp_path.to_string_lossy().to_string(), e);
25 log::error!("{}", msg.as_str());
26 BuckyError::new(BuckyErrorCode::Failed, msg)
27 })?;
28 let mmap = MmapOptions::new().map(&file).map_err(|e| {
29 let msg = format!("[{}:{}] create file {} map failed.err {}", file!(), line!(), tmp_path.to_string_lossy().to_string(), e);
30 log::error!("{}", msg.as_str());
31 BuckyError::new(BuckyErrorCode::Failed, msg)
32 })?;
33 let data_len = if data_len.is_some() {
34 if data_len.unwrap() as usize > mmap.len() {
35 mmap.len()
36 } else {
37 data_len.unwrap() as usize
38 }
39 } else {
40 mmap.len()
41 };
42 Ok(Self {
43 file_path,
44 mmap,
45 read_pos: 0,
46 data_len
47 })
48 }
49 }).await;
50 log::info!("MMapChunk open {} success", path.as_ref().to_string_lossy().to_string());
51 ret
52 }
53}
54
55impl Deref for MMapChunk {
56 type Target = [u8];
57
58 fn deref(&self) -> &Self::Target {
59 &self.mmap
60 }
61}
62
63#[async_trait::async_trait]
64impl Chunk for MMapChunk {
65 fn get_chunk_meta(&self) -> ChunkMeta {
66 ChunkMeta::MMapChunk(self.file_path.clone(), Some(self.data_len as u32))
67 }
68
69 fn get_len(&self) -> usize {
70 self.mmap.len()
71 }
72
73 fn into_vec(self: Box<Self>) -> Vec<u8> {
74 self.mmap.to_vec()
75 }
76
77 async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
78 let this = self;
79 if this.read_pos >= this.data_len {
80 Ok(0)
81 } else if buf.len() > this.data_len - this.read_pos {
82 unsafe {std::ptr::copy::<u8>(this.mmap[this.read_pos..].as_ptr(), buf.as_mut_ptr(), this.data_len - this.read_pos)};
83 let read_len = this.data_len - this.read_pos;
84 this.read_pos = this.data_len;
85 Ok(read_len)
86 } else {
87 unsafe {std::ptr::copy::<u8>(this.mmap[this.read_pos..this.read_pos + buf.len()].as_ptr(), buf.as_mut_ptr(), buf.len())};
88 let read_len = buf.len();
89 this.read_pos = this.read_pos + read_len;
90 Ok(read_len)
91 }
92 }
93
94 async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
95 let this = self;
96 match pos {
97 SeekFrom::Start(pos) => {
98 this.read_pos = pos as usize;
99 Ok(pos)
100 },
101 SeekFrom::End(pos) => {
102 if this.data_len as i64 + pos < 0 {
103 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
104 }
105 this.read_pos = (this.data_len as i64 + pos) as usize;
106 Ok(this.read_pos as u64)
107 },
108 SeekFrom::Current(pos) => {
109 if this.read_pos as i64 + pos < 0 {
110 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
111 }
112 this.read_pos = (this.read_pos as i64 + pos) as usize;
113 Ok(this.read_pos as u64)
114 }
115 }
116 }
117}
118
119pub struct MMapChunkMut {
120 file_path: String,
121 mmap: memmap2::MmapMut,
122 cur_pos: usize,
123 data_len: usize
124}
125
126impl MMapChunkMut {
127 pub async fn open<P: AsRef<Path>>(path: P, capacity: u64, data_len: Option<u64>) -> BuckyResult<Self> {
128 let file_path = path.as_ref().to_string_lossy().to_string();
129 let path = path.as_ref().to_path_buf();
130 async_std::task::spawn_blocking(move || {
131 unsafe {
132 let file = OpenOptions::new().read(true).write(true).create(true).open(path.as_path()).map_err(|e| {
133 let msg = format!("[{}:{}] open {} failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
134 log::error!("{}", msg.as_str());
135 BuckyError::new(BuckyErrorCode::Failed, msg)
136 })?;
137
138 let data_len = if data_len.is_some() {
139 if data_len.unwrap() > capacity {
140 capacity
141 } else {
142 data_len.unwrap()
143 }
144 } else {
145 let mut data_len = file.metadata().map_err(|e| {
146 let msg = format!("[{}:{}] get {} meta failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
147 log::error!("{}", msg.as_str());
148 BuckyError::new(BuckyErrorCode::Failed, msg)
149 })?.len();
150 if data_len > capacity {
151 data_len = capacity;
152 }
153 data_len
154 };
155 file.set_len(capacity).map_err(|e| {
156 let msg = format!("[{}:{}] set file {} len {} failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), capacity, e);
157 log::error!("{}", msg.as_str());
158 BuckyError::new(BuckyErrorCode::Failed, msg)
159 })?;
160 let mmap = MmapMut::map_mut(&file).map_err(|e| {
161 let msg = format!("[{}:{}] create file {} map failed.err {}", file!(), line!(), path.to_string_lossy().to_string(), e);
162 log::error!("{}", msg.as_str());
163 BuckyError::new(BuckyErrorCode::Failed, msg)
164 })?;
165
166 Ok(Self {
167 file_path,
168 mmap,
169 cur_pos: 0,
170 data_len: data_len as usize
171 })
172 }
173 }).await
174 }
175}
176
177impl Deref for MMapChunkMut {
178 type Target = [u8];
179
180 fn deref(&self) -> &Self::Target {
181 &self.mmap
182 }
183}
184
185impl DerefMut for MMapChunkMut {
186 fn deref_mut(&mut self) -> &mut Self::Target {
187 &mut self.mmap
188 }
189}
190
191#[async_trait::async_trait]
192impl Chunk for MMapChunkMut {
193 fn get_chunk_meta(&self) -> ChunkMeta {
194 ChunkMeta::MMapChunk(self.file_path.clone(), Some(self.data_len as u32))
195 }
196
197 fn get_len(&self) -> usize {
198 self.data_len
199 }
200
201 fn into_vec(self: Box<Self>) -> Vec<u8> {
202 self.mmap[..self.data_len].to_vec()
203 }
204
205 async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
206 let this = self;
207 if this.cur_pos >= this.data_len {
208 Ok(0)
209 } else if buf.len() > this.data_len - this.cur_pos {
210 unsafe {std::ptr::copy::<u8>(this.mmap[this.cur_pos..].as_ptr(), buf.as_mut_ptr(), this.data_len - this.cur_pos)};
211 let read_len = this.data_len - this.cur_pos;
212 this.cur_pos = this.data_len;
213 Ok(read_len)
214 } else {
215 unsafe {std::ptr::copy::<u8>(this.mmap[this.cur_pos..this.cur_pos + buf.len()].as_ptr(), buf.as_mut_ptr(), buf.len())};
216 let read_len = buf.len();
217 this.cur_pos = this.cur_pos + read_len;
218 Ok(read_len)
219 }
220 }
221
222 async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
223 let this = self;
224 match pos {
225 SeekFrom::Start(pos) => {
226 this.cur_pos = pos as usize;
227 Ok(pos)
228 },
229 SeekFrom::End(pos) => {
230 if this.data_len as i64 + pos < 0 {
231 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
232 }
233 this.cur_pos = (this.data_len as i64 + pos) as usize;
234 Ok(this.cur_pos as u64)
235 },
236 SeekFrom::Current(pos) => {
237 if this.cur_pos as i64 + pos < 0 {
238 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
239 }
240 this.cur_pos = (this.cur_pos as i64 + pos) as usize;
241 Ok(this.cur_pos as u64)
242 }
243 }
244 }
245}
246
247#[async_trait::async_trait]
248impl ChunkMut for MMapChunkMut {
249 async fn reset(&mut self) -> BuckyResult<()> {
250 self.cur_pos = 0;
251 self.data_len = 0;
252 Ok(())
253 }
254
255 async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize> {
256 let this = self;
257 unsafe {
258 if this.cur_pos + buf.len() >= this.mmap.len() {
259 let write_size = this.mmap.len() - this.cur_pos;
260 std::ptr::copy(buf.as_ptr(), this.mmap[this.cur_pos..].as_mut_ptr(), write_size);
261 this.cur_pos = this.mmap.len();
262 if this.cur_pos > this.data_len {
263 this.data_len = this.cur_pos;
264 }
265 Ok(write_size)
266 } else {
267 std::ptr::copy(buf.as_ptr(), this.mmap[this.cur_pos..].as_mut_ptr(), buf.len());
268 this.cur_pos += buf.len();
269 if this.cur_pos > this.data_len {
270 this.data_len = this.cur_pos;
271 }
272 Ok(buf.len())
273 }
274 }
275 }
276
277 async fn flush(&mut self) -> BuckyResult<()> {
278 self.mmap.flush().map_err(|e| {
279 let msg = format!("flush err {}", e);
280 log::error!("{}", msg.as_str());
281 BuckyError::new(BuckyErrorCode::Failed, msg)
282 })
283 }
284}