cyfs_bdt/ndn/chunk/cache/raw_cache/
mem.rs

1use std::{
2    sync::{Arc, RwLock}, usize, 
3    io::SeekFrom, 
4};
5use async_std::{
6    pin::{Pin}, 
7    task::{Context, Poll}
8};
9use cyfs_base::*;
10use cyfs_util::{
11    AsyncWriteWithSeek, 
12    AsyncReadWithSeek, 
13    SyncWriteWithSeek, 
14    SyncReadWithSeek
15};
16use super::{
17    common::*, 
18    manager::*
19};
20
21struct CacheImpl {
22    manager: Option<RawCacheManager>, 
23    cache: RwLock<Vec<u8>>
24}
25
26impl CacheImpl {
27    fn capacity(&self) -> usize {
28        self.cache.read().unwrap().len()
29    } 
30}
31
32impl Drop for CacheImpl {
33    fn drop(&mut self) {
34        if let Some(manager) = self.manager.as_ref() {
35            manager.release_mem(self.capacity())
36        }
37    }
38}
39
40#[derive(Clone)]
41pub struct MemCache(Arc<CacheImpl>);
42
43impl MemCache {
44    pub fn with_capacity(capacity: usize) -> Self {
45        Self::new(capacity, None)
46    }
47
48    pub async fn from_reader(capacity: usize, reader: impl async_std::io::Read + Unpin) -> BuckyResult<Self> {
49        let cache = Self::with_capacity(capacity);
50        let read = async_std::io::copy(reader, SeekWrapper::new(&cache)).await? as usize;
51        if read != capacity {
52            Err(BuckyError::new(BuckyErrorCode::InvalidData, "misatch read length"))
53        } else {
54            Ok(cache)
55        }
56    }
57
58    pub(super) fn new(capacity: usize, manager: Option<RawCacheManager>) -> Self {
59        Self(Arc::new(CacheImpl {
60            manager, 
61            cache: RwLock::new(vec![0u8; capacity])
62        }))
63    }
64
65    fn seek(&self, cur: usize, pos: SeekFrom) -> usize {
66        let capacity = self.capacity();
67        match pos {
68            SeekFrom::Start(offset) => capacity.min(offset as usize), 
69            SeekFrom::Current(offset) => {
70                let offset = (cur as i64) + offset;
71                let offset = offset.max(0);
72                capacity.min(offset as usize)
73            },
74            SeekFrom::End(offset) => {
75                let offset = (capacity as i64) + offset;
76                let offset = offset.max(0);
77                capacity.min(offset as usize)
78            }
79        }
80    }
81
82    fn read(&self, offset: usize, buffer: &mut [u8]) -> usize {
83        let capacity = self.capacity();
84        let start = offset.min(capacity);
85        let end = (offset + buffer.len()).min(capacity);
86        let len = end - start;
87        if len > 0 {
88            buffer[0..len].copy_from_slice(&self.0.cache.read().unwrap()[start..end]);
89            len
90        } else {
91            0
92        }
93    }
94
95    fn write(&self, offset: usize, buffer: &[u8]) -> usize {
96        let capacity = self.capacity();
97        let start = offset.min(capacity);
98        let end = (offset + buffer.len()).min(capacity);
99        let len = end - start;
100        if len > 0 {
101            self.0.cache.write().unwrap()[start..end].copy_from_slice(&buffer[0..len]);
102            len
103        } else {
104            0
105        }
106    }
107}
108
109struct SeekWrapper {
110    cache: MemCache, 
111    offset: usize
112}
113
114impl SeekWrapper {
115    fn new(cache: &MemCache) -> Self {
116        Self {
117            cache: cache.clone(), 
118            offset: 0
119        }
120    }
121}
122
123impl async_std::io::Seek for SeekWrapper {
124    fn poll_seek(
125        self: Pin<&mut Self>,
126        _cx: &mut Context<'_>,
127        pos: SeekFrom,
128    ) -> Poll<std::io::Result<u64>> {
129        let pined = self.get_mut();
130        pined.offset = pined.cache.seek(pined.offset, pos);
131        Poll::Ready(Ok(pined.offset as u64)) 
132    }
133}
134
135impl async_std::io::Read for SeekWrapper {
136    fn poll_read(
137            self: Pin<&mut Self>,
138            _cx: &mut Context<'_>,
139            buf: &mut [u8],
140        ) -> Poll<std::io::Result<usize>> {
141        let pined = self.get_mut();
142        let read = pined.cache.read(pined.offset, buf);
143        pined.offset += read;
144        Poll::Ready(Ok(read))
145    }
146}
147
148impl AsyncReadWithSeek for SeekWrapper {}
149
150impl std::io::Seek for SeekWrapper {
151    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
152        self.offset = self.cache.seek(self.offset, pos);
153        Ok(self.offset as u64)
154    }
155}
156
157impl std::io::Read for SeekWrapper {
158    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
159        let read = self.cache.read(self.offset, buf);
160        self.offset += read;
161        Ok(read)
162    }
163}
164
165impl SyncReadWithSeek for SeekWrapper {}
166
167impl async_std::io::Write for SeekWrapper {
168    fn poll_write(
169        self: Pin<&mut Self>,
170        _cx: &mut Context<'_>,
171        buf: &[u8],
172    ) -> Poll<std::io::Result<usize>> {
173        let pined = self.get_mut();
174        let written = pined.cache.write(pined.offset, buf);
175        pined.offset += written;
176        Poll::Ready(Ok(written))
177    }
178
179    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
180        Poll::Ready(Ok(()))
181    }
182
183    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
184        Poll::Ready(Ok(()))
185    }
186}
187
188impl AsyncWriteWithSeek for SeekWrapper {}
189
190impl std::io::Write for SeekWrapper {
191    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
192        let written = self.cache.write(self.offset, buf);
193        self.offset += written;
194        Ok(written)
195    }
196
197    fn flush(&mut self) -> std::io::Result<()> {
198        Ok(())
199    }
200}
201
202impl SyncWriteWithSeek for SeekWrapper {}
203
204
205#[async_trait::async_trait]
206impl RawCache for MemCache {
207    fn capacity(&self) -> usize {
208        self.0.capacity()
209    }
210
211    fn clone_as_raw_cache(&self) -> Box<dyn RawCache> {
212        Box::new(self.clone())
213    }
214
215    async fn async_reader(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncReadWithSeek>> {
216        Ok(Box::new(SeekWrapper::new(self)))
217    }
218
219    fn sync_reader(&self) -> BuckyResult<Box<dyn SyncReadWithSeek + Send + Sync>> {
220        Ok(Box::new(SeekWrapper::new(self)))
221    }
222    
223    async fn async_writer(&self) -> BuckyResult<Box<dyn  Unpin + Send + Sync + AsyncWriteWithSeek>> {
224        Ok(Box::new(SeekWrapper::new(self)))
225    }   
226
227    fn sync_writer(&self) -> BuckyResult<Box<dyn SyncWriteWithSeek>> {
228        Ok(Box::new(SeekWrapper::new(self)))
229    }
230}
231
232