cyfs_bdt/ndn/chunk/cache/raw_cache/
file.rs

1use std::{
2    sync::{Arc, RwLock}, usize, 
3    io::SeekFrom, 
4    ops::Range, 
5    path::{Path, PathBuf}
6};
7use async_std::{
8    pin::Pin, 
9    task::{self, Context, Poll},
10    fs
11};
12use cyfs_base::*;
13use cyfs_util::{
14    AsyncWriteWithSeek, 
15    AsyncReadWithSeek, 
16    SyncWriteWithSeek, 
17    SyncReadWithSeek
18};
19use crate::{
20    types::*
21};
22use super::{
23    common::*, 
24};
25
26enum CacheState {
27    Creating(StateWaiter),
28    Created(async_std::fs::File), 
29    Error(BuckyError)
30}
31
32struct CacheImpl {
33    state: RwLock<CacheState>, 
34    path: PathBuf,
35    range: Range<u64>, 
36    to_remove: bool
37}
38
39
40impl Drop for FileCache {
41    fn drop(&mut self) {
42        if self.0.to_remove {
43            let to_remove = {
44                let state = &mut *self.0.state.write().unwrap();
45                let to_remove = match state {
46                    CacheState::Created(_) => true, 
47                    _ => false
48                };
49                *state = CacheState::Error(BuckyError::new(BuckyErrorCode::NotInit, "closed"));
50                to_remove
51            };
52            
53            if to_remove {
54                let path = self.0.path.clone();
55                task::spawn(async move {
56                    let _ = fs::remove_file(path).await;
57                });
58            }
59        }
60    }
61}
62
63
64#[derive(Clone)]
65pub struct FileCache(Arc<CacheImpl>);
66
67
68impl FileCache {
69    pub fn from_path(path: PathBuf, range: Range<u64>) -> Self {
70        Self::new(path, range, false)
71    }
72
73    pub(super) fn new(path: PathBuf, range: Range<u64>, to_remove: bool) -> Self {
74        let cache = Self(Arc::new(CacheImpl {
75            state: RwLock::new(CacheState::Creating(StateWaiter::new())), 
76            path,
77            range, 
78            to_remove
79        }));
80        
81        {
82            let cache = cache.clone();
83            task::spawn(async move {
84                let ret = cache.create().await;
85
86                let new_state = match ret {
87                    Ok(file) => CacheState::Created(file), 
88                    Err(err) => CacheState::Error(err)
89                };
90                let waiters = {
91                    let state = &mut *cache.0.state.write().unwrap();
92                    match state {
93                        CacheState::Creating(waiters) => {
94                            let waiters = waiters.transfer();
95                            *state = new_state;
96                            waiters
97                        },
98                        _ => unreachable!()
99                    }
100                };
101                
102                waiters.wake();
103            });
104        }
105        
106        cache
107    }
108
109    async fn create(&self) -> BuckyResult<async_std::fs::File> {
110        let mut file = async_std::fs::File::open(self.path()).await?;
111        use async_std::io::prelude::SeekExt;
112        let offset = file.seek(SeekFrom::Start(self.0.range.start)).await?;
113        if offset == self.range().start {
114            Ok(file)
115        } else {
116            Err(BuckyError::new(BuckyErrorCode::InvalidData,"offset to range failed"))
117        }
118    }
119
120    fn seek(&self, cur: usize, pos: SeekFrom) -> usize {
121        let capacity = self.capacity();
122        match pos {
123            SeekFrom::Start(offset) => capacity.min(offset as usize), 
124            SeekFrom::Current(offset) => {
125                let offset = (cur as i64) + offset;
126                let offset = offset.max(0);
127                capacity.min(offset as usize)
128            },
129            SeekFrom::End(offset) => {
130                let offset = (capacity as i64) + offset;
131                let offset = offset.max(0);
132                capacity.min(offset as usize)
133            }
134        }
135    }
136
137    fn path(&self) -> &Path {
138        self.0.path.as_path()
139    }
140
141    fn range(&self) -> &Range<u64> {
142        &self.0.range
143    }
144
145
146    fn is_created(&self) -> BuckyResult<async_std::fs::File> {
147        match &*self.0.state.read().unwrap() {
148            CacheState::Creating(_) => Err(BuckyError::new(BuckyErrorCode::WouldBlock, "")), 
149            CacheState::Created(file) => Ok(file.clone()), 
150            CacheState::Error(err) => Err(err.clone())
151        }
152    }
153
154    async fn wait_created(&self) -> BuckyResult<async_std::fs::File> {
155        let (ret, waiter) = {
156            match &mut *self.0.state.write().unwrap() {
157                CacheState::Creating(waiters) => (None, Some(waiters.new_waiter())),
158                CacheState::Created(file) => (Some(Ok(file.clone())), None), 
159                CacheState::Error(err) => (Some(Err(err.clone())), None)
160            }
161        };
162        if let Some(ret) = ret {
163            ret 
164        } else if let Some(waiter) = waiter {
165            StateWaiter::wait(waiter, || {
166                match &*self.0.state.read().unwrap() {
167                    CacheState::Creating(_) => unreachable!(),
168                    CacheState::Created(file) => Ok(file.clone()), 
169                    CacheState::Error(err) => Err(err.clone())
170                }
171            }).await
172        } else {
173            unreachable!()
174        }
175    }
176}
177
178
179pub struct FileCacheAsyncReader {
180    file: async_std::fs::File, 
181    cache: FileCache, 
182    offset: usize
183}
184
185
186impl async_std::io::Seek for FileCacheAsyncReader {
187    fn poll_seek(
188        self: Pin<&mut Self>,
189        cx: &mut Context<'_>,
190        pos: SeekFrom,
191    ) -> Poll<std::io::Result<u64>> {
192        let reader = self.get_mut();
193        let file_offset = reader.cache.seek(reader.offset, pos) as u64 + reader.cache.range().start;
194
195        let ret = async_std::io::Seek::poll_seek(Pin::new(&mut reader.file), cx, SeekFrom::Start(file_offset));
196
197        match ret {
198            Poll::Ready(ret) => {
199                match ret {
200                    Ok(file_offset) => {
201                        let offset = file_offset - reader.cache.range().start;
202                        reader.offset = offset as usize;
203                        Poll::Ready(Ok(offset))
204                    }, 
205                    Err(err) => Poll::Ready(Err(err))
206                } 
207            },
208            Poll::Pending => Poll::Pending
209        }
210    }
211}
212
213impl async_std::io::Read for FileCacheAsyncReader {
214    fn poll_read(
215            self: Pin<&mut Self>,
216            cx: &mut Context<'_>,
217            buf: &mut [u8],
218        ) -> Poll<std::io::Result<usize>> {
219        let reader = self.get_mut();
220        let new_offset = reader.cache.seek(reader.offset, SeekFrom::Current(buf.len() as i64));
221        let cliped = &mut buf[0..new_offset - reader.offset];
222
223        let ret = async_std::io::Read::poll_read(Pin::new(&mut reader.file), cx, cliped);
224
225        match ret {
226            Poll::Ready(ret) => {
227                match ret {
228                    Ok(read) => {
229                        reader.offset += read;
230                        Poll::Ready(Ok(read))
231                    }, 
232                    Err(err) => Poll::Ready(Err(err))
233                }
234            },
235            Poll::Pending => Poll::Pending
236        }
237    }
238}
239
240impl AsyncReadWithSeek for FileCacheAsyncReader {}
241
242
243pub struct FileCacheSyncReader {
244    file: std::fs::File, 
245    cache: FileCache, 
246    offset: usize
247}
248
249
250impl std::io::Seek for FileCacheSyncReader {
251    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
252        let file_offset = self.cache.seek(self.offset, pos) as u64 + self.cache.range().start;
253
254        let file_offset = std::io::Seek::seek(&mut self.file, SeekFrom::Start(file_offset))?;
255
256        let offset = file_offset - self.cache.range().start;
257
258        self.offset = offset as usize;
259
260        Ok(offset)
261    }
262}
263
264impl std::io::Read for FileCacheSyncReader {
265    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266        let new_offset = self.cache.seek(self.offset, SeekFrom::Current(buf.len() as i64));
267        let cliped = &mut buf[0..new_offset - self.offset];
268
269        let read = std::io::Read::read(&mut self.file, cliped)?;
270
271        self.offset += read;
272        Ok(read)
273    }
274}
275
276impl SyncReadWithSeek for FileCacheSyncReader {}
277
278#[async_trait::async_trait]
279impl RawCache for FileCache {
280    fn capacity(&self) -> usize {
281        (self.range().end - self.range().start) as usize
282    }
283
284    fn clone_as_raw_cache(&self) -> Box<dyn RawCache> {
285        Box::new(self.clone())
286    }
287
288    async fn async_reader(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncReadWithSeek>> {
289        let file = self.wait_created().await?;
290        
291        Ok(Box::new(FileCacheAsyncReader {
292            file, 
293            cache: self.clone(),
294            offset: 0
295        }))
296    }
297
298    fn sync_reader(&self) -> BuckyResult<Box<dyn SyncReadWithSeek + Send + Sync>> {
299        let _ = self.is_created()?;
300
301        let mut file = std::fs::File::open(self.path())?;
302
303        use std::io::Seek;
304        let offset = file.seek(SeekFrom::Start(self.range().start))?;
305        if offset == self.range().start {
306            Ok(Box::new(FileCacheSyncReader {
307                file, 
308                cache: self.clone(),
309                offset: 0
310            }))
311        } else {
312            Err(BuckyError::new(BuckyErrorCode::InvalidData,"offset to range failed"))
313        }   
314    }
315    
316    async fn async_writer(&self) -> BuckyResult<Box<dyn  Unpin + Send + Sync + AsyncWriteWithSeek>> {
317        Err(BuckyError::new(BuckyErrorCode::NotSupport, "file cache does not support sync reader"))
318    }   
319
320    fn sync_writer(&self) -> BuckyResult<Box<dyn SyncWriteWithSeek>> {
321        Err(BuckyError::new(BuckyErrorCode::NotSupport, "file cache does not support sync reader"))
322    }
323}
324
325