cyfs_bdt/ndn/chunk/cache/raw_cache/
mem.rs1use std::{
2 sync::{Arc, RwLock}, usize,
3 io::SeekFrom,
4};
5use async_std::{
6 pin::{Pin},
7 task::{Context, Poll}
8};
9use cyfs_base::*;
10use cyfs_util::{
11 AsyncWriteWithSeek,
12 AsyncReadWithSeek,
13 SyncWriteWithSeek,
14 SyncReadWithSeek
15};
16use super::{
17 common::*,
18 manager::*
19};
20
21struct CacheImpl {
22 manager: Option<RawCacheManager>,
23 cache: RwLock<Vec<u8>>
24}
25
26impl CacheImpl {
27 fn capacity(&self) -> usize {
28 self.cache.read().unwrap().len()
29 }
30}
31
32impl Drop for CacheImpl {
33 fn drop(&mut self) {
34 if let Some(manager) = self.manager.as_ref() {
35 manager.release_mem(self.capacity())
36 }
37 }
38}
39
40#[derive(Clone)]
41pub struct MemCache(Arc<CacheImpl>);
42
43impl MemCache {
44 pub fn with_capacity(capacity: usize) -> Self {
45 Self::new(capacity, None)
46 }
47
48 pub async fn from_reader(capacity: usize, reader: impl async_std::io::Read + Unpin) -> BuckyResult<Self> {
49 let cache = Self::with_capacity(capacity);
50 let read = async_std::io::copy(reader, SeekWrapper::new(&cache)).await? as usize;
51 if read != capacity {
52 Err(BuckyError::new(BuckyErrorCode::InvalidData, "misatch read length"))
53 } else {
54 Ok(cache)
55 }
56 }
57
58 pub(super) fn new(capacity: usize, manager: Option<RawCacheManager>) -> Self {
59 Self(Arc::new(CacheImpl {
60 manager,
61 cache: RwLock::new(vec![0u8; capacity])
62 }))
63 }
64
65 fn seek(&self, cur: usize, pos: SeekFrom) -> usize {
66 let capacity = self.capacity();
67 match pos {
68 SeekFrom::Start(offset) => capacity.min(offset as usize),
69 SeekFrom::Current(offset) => {
70 let offset = (cur as i64) + offset;
71 let offset = offset.max(0);
72 capacity.min(offset as usize)
73 },
74 SeekFrom::End(offset) => {
75 let offset = (capacity as i64) + offset;
76 let offset = offset.max(0);
77 capacity.min(offset as usize)
78 }
79 }
80 }
81
82 fn read(&self, offset: usize, buffer: &mut [u8]) -> usize {
83 let capacity = self.capacity();
84 let start = offset.min(capacity);
85 let end = (offset + buffer.len()).min(capacity);
86 let len = end - start;
87 if len > 0 {
88 buffer[0..len].copy_from_slice(&self.0.cache.read().unwrap()[start..end]);
89 len
90 } else {
91 0
92 }
93 }
94
95 fn write(&self, offset: usize, buffer: &[u8]) -> usize {
96 let capacity = self.capacity();
97 let start = offset.min(capacity);
98 let end = (offset + buffer.len()).min(capacity);
99 let len = end - start;
100 if len > 0 {
101 self.0.cache.write().unwrap()[start..end].copy_from_slice(&buffer[0..len]);
102 len
103 } else {
104 0
105 }
106 }
107}
108
109struct SeekWrapper {
110 cache: MemCache,
111 offset: usize
112}
113
114impl SeekWrapper {
115 fn new(cache: &MemCache) -> Self {
116 Self {
117 cache: cache.clone(),
118 offset: 0
119 }
120 }
121}
122
123impl async_std::io::Seek for SeekWrapper {
124 fn poll_seek(
125 self: Pin<&mut Self>,
126 _cx: &mut Context<'_>,
127 pos: SeekFrom,
128 ) -> Poll<std::io::Result<u64>> {
129 let pined = self.get_mut();
130 pined.offset = pined.cache.seek(pined.offset, pos);
131 Poll::Ready(Ok(pined.offset as u64))
132 }
133}
134
135impl async_std::io::Read for SeekWrapper {
136 fn poll_read(
137 self: Pin<&mut Self>,
138 _cx: &mut Context<'_>,
139 buf: &mut [u8],
140 ) -> Poll<std::io::Result<usize>> {
141 let pined = self.get_mut();
142 let read = pined.cache.read(pined.offset, buf);
143 pined.offset += read;
144 Poll::Ready(Ok(read))
145 }
146}
147
148impl AsyncReadWithSeek for SeekWrapper {}
149
150impl std::io::Seek for SeekWrapper {
151 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
152 self.offset = self.cache.seek(self.offset, pos);
153 Ok(self.offset as u64)
154 }
155}
156
157impl std::io::Read for SeekWrapper {
158 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
159 let read = self.cache.read(self.offset, buf);
160 self.offset += read;
161 Ok(read)
162 }
163}
164
165impl SyncReadWithSeek for SeekWrapper {}
166
167impl async_std::io::Write for SeekWrapper {
168 fn poll_write(
169 self: Pin<&mut Self>,
170 _cx: &mut Context<'_>,
171 buf: &[u8],
172 ) -> Poll<std::io::Result<usize>> {
173 let pined = self.get_mut();
174 let written = pined.cache.write(pined.offset, buf);
175 pined.offset += written;
176 Poll::Ready(Ok(written))
177 }
178
179 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
180 Poll::Ready(Ok(()))
181 }
182
183 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
184 Poll::Ready(Ok(()))
185 }
186}
187
188impl AsyncWriteWithSeek for SeekWrapper {}
189
190impl std::io::Write for SeekWrapper {
191 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
192 let written = self.cache.write(self.offset, buf);
193 self.offset += written;
194 Ok(written)
195 }
196
197 fn flush(&mut self) -> std::io::Result<()> {
198 Ok(())
199 }
200}
201
202impl SyncWriteWithSeek for SeekWrapper {}
203
204
205#[async_trait::async_trait]
206impl RawCache for MemCache {
207 fn capacity(&self) -> usize {
208 self.0.capacity()
209 }
210
211 fn clone_as_raw_cache(&self) -> Box<dyn RawCache> {
212 Box::new(self.clone())
213 }
214
215 async fn async_reader(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncReadWithSeek>> {
216 Ok(Box::new(SeekWrapper::new(self)))
217 }
218
219 fn sync_reader(&self) -> BuckyResult<Box<dyn SyncReadWithSeek + Send + Sync>> {
220 Ok(Box::new(SeekWrapper::new(self)))
221 }
222
223 async fn async_writer(&self) -> BuckyResult<Box<dyn Unpin + Send + Sync + AsyncWriteWithSeek>> {
224 Ok(Box::new(SeekWrapper::new(self)))
225 }
226
227 fn sync_writer(&self) -> BuckyResult<Box<dyn SyncWriteWithSeek>> {
228 Ok(Box::new(SeekWrapper::new(self)))
229 }
230}
231
232