cyfs_bdt/ndn/chunk/cache/raw_cache/
file.rs1use std::{
2 sync::{Arc, RwLock}, usize,
3 io::SeekFrom,
4 ops::Range,
5 path::{Path, PathBuf}
6};
7use async_std::{
8 pin::Pin,
9 task::{self, Context, Poll},
10 fs
11};
12use cyfs_base::*;
13use cyfs_util::{
14 AsyncWriteWithSeek,
15 AsyncReadWithSeek,
16 SyncWriteWithSeek,
17 SyncReadWithSeek
18};
19use crate::{
20 types::*
21};
22use super::{
23 common::*,
24};
25
26enum CacheState {
27 Creating(StateWaiter),
28 Created(async_std::fs::File),
29 Error(BuckyError)
30}
31
32struct CacheImpl {
33 state: RwLock<CacheState>,
34 path: PathBuf,
35 range: Range<u64>,
36 to_remove: bool
37}
38
39
40impl Drop for FileCache {
41 fn drop(&mut self) {
42 if self.0.to_remove {
43 let to_remove = {
44 let state = &mut *self.0.state.write().unwrap();
45 let to_remove = match state {
46 CacheState::Created(_) => true,
47 _ => false
48 };
49 *state = CacheState::Error(BuckyError::new(BuckyErrorCode::NotInit, "closed"));
50 to_remove
51 };
52
53 if to_remove {
54 let path = self.0.path.clone();
55 task::spawn(async move {
56 let _ = fs::remove_file(path).await;
57 });
58 }
59 }
60 }
61}
62
63
64#[derive(Clone)]
65pub struct FileCache(Arc<CacheImpl>);
66
67
68impl FileCache {
69 pub fn from_path(path: PathBuf, range: Range<u64>) -> Self {
70 Self::new(path, range, false)
71 }
72
73 pub(super) fn new(path: PathBuf, range: Range<u64>, to_remove: bool) -> Self {
74 let cache = Self(Arc::new(CacheImpl {
75 state: RwLock::new(CacheState::Creating(StateWaiter::new())),
76 path,
77 range,
78 to_remove
79 }));
80
81 {
82 let cache = cache.clone();
83 task::spawn(async move {
84 let ret = cache.create().await;
85
86 let new_state = match ret {
87 Ok(file) => CacheState::Created(file),
88 Err(err) => CacheState::Error(err)
89 };
90 let waiters = {
91 let state = &mut *cache.0.state.write().unwrap();
92 match state {
93 CacheState::Creating(waiters) => {
94 let waiters = waiters.transfer();
95 *state = new_state;
96 waiters
97 },
98 _ => unreachable!()
99 }
100 };
101
102 waiters.wake();
103 });
104 }
105
106 cache
107 }
108
109 async fn create(&self) -> BuckyResult<async_std::fs::File> {
110 let mut file = async_std::fs::File::open(self.path()).await?;
111 use async_std::io::prelude::SeekExt;
112 let offset = file.seek(SeekFrom::Start(self.0.range.start)).await?;
113 if offset == self.range().start {
114 Ok(file)
115 } else {
116 Err(BuckyError::new(BuckyErrorCode::InvalidData,"offset to range failed"))
117 }
118 }
119
120 fn seek(&self, cur: usize, pos: SeekFrom) -> usize {
121 let capacity = self.capacity();
122 match pos {
123 SeekFrom::Start(offset) => capacity.min(offset as usize),
124 SeekFrom::Current(offset) => {
125 let offset = (cur as i64) + offset;
126 let offset = offset.max(0);
127 capacity.min(offset as usize)
128 },
129 SeekFrom::End(offset) => {
130 let offset = (capacity as i64) + offset;
131 let offset = offset.max(0);
132 capacity.min(offset as usize)
133 }
134 }
135 }
136
137 fn path(&self) -> &Path {
138 self.0.path.as_path()
139 }
140
141 fn range(&self) -> &Range<u64> {
142 &self.0.range
143 }
144
145
146 fn is_created(&self) -> BuckyResult<async_std::fs::File> {
147 match &*self.0.state.read().unwrap() {
148 CacheState::Creating(_) => Err(BuckyError::new(BuckyErrorCode::WouldBlock, "")),
149 CacheState::Created(file) => Ok(file.clone()),
150 CacheState::Error(err) => Err(err.clone())
151 }
152 }
153
154 async fn wait_created(&self) -> BuckyResult<async_std::fs::File> {
155 let (ret, waiter) = {
156 match &mut *self.0.state.write().unwrap() {
157 CacheState::Creating(waiters) => (None, Some(waiters.new_waiter())),
158 CacheState::Created(file) => (Some(Ok(file.clone())), None),
159 CacheState::Error(err) => (Some(Err(err.clone())), None)
160 }
161 };
162 if let Some(ret) = ret {
163 ret
164 } else if let Some(waiter) = waiter {
165 StateWaiter::wait(waiter, || {
166 match &*self.0.state.read().unwrap() {
167 CacheState::Creating(_) => unreachable!(),
168 CacheState::Created(file) => Ok(file.clone()),
169 CacheState::Error(err) => Err(err.clone())
170 }
171 }).await
172 } else {
173 unreachable!()
174 }
175 }
176}
177
178
179pub struct FileCacheAsyncReader {
180 file: async_std::fs::File,
181 cache: FileCache,
182 offset: usize
183}
184
185
186impl async_std::io::Seek for FileCacheAsyncReader {
187 fn poll_seek(
188 self: Pin<&mut Self>,
189 cx: &mut Context<'_>,
190 pos: SeekFrom,
191 ) -> Poll<std::io::Result<u64>> {
192 let reader = self.get_mut();
193 let file_offset = reader.cache.seek(reader.offset, pos) as u64 + reader.cache.range().start;
194
195 let ret = async_std::io::Seek::poll_seek(Pin::new(&mut reader.file), cx, SeekFrom::Start(file_offset));
196
197 match ret {
198 Poll::Ready(ret) => {
199 match ret {
200 Ok(file_offset) => {
201 let offset = file_offset - reader.cache.range().start;
202 reader.offset = offset as usize;
203 Poll::Ready(Ok(offset))
204 },
205 Err(err) => Poll::Ready(Err(err))
206 }
207 },
208 Poll::Pending => Poll::Pending
209 }
210 }
211}
212
213impl async_std::io::Read for FileCacheAsyncReader {
214 fn poll_read(
215 self: Pin<&mut Self>,
216 cx: &mut Context<'_>,
217 buf: &mut [u8],
218 ) -> Poll<std::io::Result<usize>> {
219 let reader = self.get_mut();
220 let new_offset = reader.cache.seek(reader.offset, SeekFrom::Current(buf.len() as i64));
221 let cliped = &mut buf[0..new_offset - reader.offset];
222
223 let ret = async_std::io::Read::poll_read(Pin::new(&mut reader.file), cx, cliped);
224
225 match ret {
226 Poll::Ready(ret) => {
227 match ret {
228 Ok(read) => {
229 reader.offset += read;
230 Poll::Ready(Ok(read))
231 },
232 Err(err) => Poll::Ready(Err(err))
233 }
234 },
235 Poll::Pending => Poll::Pending
236 }
237 }
238}
239
240impl AsyncReadWithSeek for FileCacheAsyncReader {}
241
242
243pub struct FileCacheSyncReader {
244 file: std::fs::File,
245 cache: FileCache,
246 offset: usize
247}
248
249
250impl std::io::Seek for FileCacheSyncReader {
251 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
252 let file_offset = self.cache.seek(self.offset, pos) as u64 + self.cache.range().start;
253
254 let file_offset = std::io::Seek::seek(&mut self.file, SeekFrom::Start(file_offset))?;
255
256 let offset = file_offset - self.cache.range().start;
257
258 self.offset = offset as usize;
259
260 Ok(offset)
261 }
262}
263
264impl std::io::Read for FileCacheSyncReader {
265 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
266 let new_offset = self.cache.seek(self.offset, SeekFrom::Current(buf.len() as i64));
267 let cliped = &mut buf[0..new_offset - self.offset];
268
269 let read = std::io::Read::read(&mut self.file, cliped)?;
270
271 self.offset += read;
272 Ok(read)
273 }
274}
275
276impl SyncReadWithSeek for FileCacheSyncReader {}
277
278#[async_trait::async_trait]
279impl RawCache for FileCache {
280 fn capacity(&self) -> usize {
281 (self.range().end - self.range().start) as usize
282 }
283
284 fn clone_as_raw_cache(&self) -> Box<dyn RawCache> {
285 Box::new(self.clone())
286 }
287
288 async fn async_reader(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncReadWithSeek>> {
289 let file = self.wait_created().await?;
290
291 Ok(Box::new(FileCacheAsyncReader {
292 file,
293 cache: self.clone(),
294 offset: 0
295 }))
296 }
297
298 fn sync_reader(&self) -> BuckyResult<Box<dyn SyncReadWithSeek + Send + Sync>> {
299 let _ = self.is_created()?;
300
301 let mut file = std::fs::File::open(self.path())?;
302
303 use std::io::Seek;
304 let offset = file.seek(SeekFrom::Start(self.range().start))?;
305 if offset == self.range().start {
306 Ok(Box::new(FileCacheSyncReader {
307 file,
308 cache: self.clone(),
309 offset: 0
310 }))
311 } else {
312 Err(BuckyError::new(BuckyErrorCode::InvalidData,"offset to range failed"))
313 }
314 }
315
316 async fn async_writer(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncWriteWithSeek>> {
317 Err(BuckyError::new(BuckyErrorCode::NotSupport, "file cache does not support sync reader"))
318 }
319
320 fn sync_writer(&self) -> BuckyResult<Box<dyn SyncWriteWithSeek>> {
321 Err(BuckyError::new(BuckyErrorCode::NotSupport, "file cache does not support sync reader"))
322 }
323}
324
325