cyfs_bdt/ndn/chunk/cache/
cache.rs1use log::*;
2use std::{
3 sync::{Arc, Weak, Mutex},
4 ops::Range,
5};
6use async_std::{
7 task
8};
9use cyfs_base::*;
10use crate::{
11 types::*,
12 stack::{Stack, WeakStack}
13};
14use super::super::super::{
15 types::*,
16 channel::protocol::v0::PieceData,
17};
18use super::super::{
19 storage::*
20};
21use super::{
22 encode::*,
23 stream::*,
24 raw_cache::*
25};
26
27enum CacheState {
28 Loading(StateWaiter),
29 Loaded(bool)
30}
31
32struct CacheImpl {
33 chunk: ChunkId,
34 state: Mutex<CacheState>,
35 stream_cache: ChunkStreamCache,
36}
37
38#[derive(Clone)]
39pub struct ChunkCache(Arc<CacheImpl>);
40
41
42impl std::fmt::Display for CacheImpl {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "ChunkCache{{chunk:{}}}", self.chunk)
45 }
46}
47
48impl Drop for CacheImpl {
49 fn drop(&mut self) {
50 info!("{} released", self);
51 }
52}
53
54
55pub struct WeakChunkCache(Weak<CacheImpl>);
56
57impl WeakChunkCache {
58 pub fn to_strong(&self) -> Option<ChunkCache> {
59 Weak::upgrade(&self.0).map(|arc| ChunkCache(arc))
60 }
61}
62
63impl ChunkCache {
64 pub fn to_weak(&self) -> WeakChunkCache {
65 WeakChunkCache(Arc::downgrade(&self.0))
66 }
67}
68
69
70impl std::fmt::Display for ChunkCache {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 write!(f, "{}", self.0)
73 }
74}
75
76impl ChunkCache {
77 pub fn new(stack: WeakStack, chunk: ChunkId) -> Self {
78 let cache = Self(Arc::new(CacheImpl {
79 stream_cache: ChunkStreamCache::new(&chunk),
80 chunk,
81 state: Mutex::new(CacheState::Loading(StateWaiter::new()))
82 }));
83
84 {
85 let stack = Stack::from(&stack);
86 let cache = cache.clone();
87
88 task::spawn(async move {
89 let raw_cache = stack.ndn().chunk_manager().raw_caches().alloc(cache.chunk().len()).await;
90 let finished = cache.load(raw_cache.as_ref(), stack.ndn().chunk_manager().store()).await.is_ok();
91 let _ = cache.stream().load(finished, raw_cache);
92 let waiters = {
93 let state = &mut *cache.0.state.lock().unwrap();
94 match state {
95 CacheState::Loading(waiters) => {
96 let waiters = waiters.transfer();
97 *state = CacheState::Loaded(finished);
98 waiters
99 },
100 _ => unreachable!()
101 }
102 };
103 waiters.wake();
104 });
105 }
106
107 cache
108 }
109
110
111 async fn load(&self, cache: &dyn RawCache, storage: &dyn ChunkReader) -> BuckyResult<()> {
112 let reader = storage.get(self.chunk()).await?;
113
114 let writer = cache.async_writer().await?;
115
116 let written = async_std::io::copy(reader, writer).await? as usize;
117
118 if written != self.chunk().len() {
119 Err(BuckyError::new(BuckyErrorCode::InvalidInput, ""))
120 } else {
121 Ok(())
122 }
123 }
124
125 pub async fn wait_loaded(&self) -> bool {
126 let (waiter, finished) = {
127 let state = &mut *self.0.state.lock().unwrap();
128 match state {
129 CacheState::Loading(waiters) => (Some(waiters.new_waiter()), None),
130 CacheState::Loaded(finished) => (None, Some(*finished))
131 }
132 };
133
134 if let Some(waiter) = waiter {
135 StateWaiter::wait(waiter, || {
136 let state = &*self.0.state.lock().unwrap();
137 if let CacheState::Loaded(finished) = state {
138 *finished
139 } else {
140 unreachable!()
141 }
142 }).await
143 } else {
144 finished.unwrap()
145 }
146
147 }
148
149 pub fn chunk(&self) -> &ChunkId {
150 &self.0.chunk
151 }
152
153 pub fn stream(&self) -> &ChunkStreamCache {
154 &self.0.stream_cache
155 }
156
157 pub fn create_encoder(&self, desc: &ChunkCodecDesc) -> Box<dyn ChunkEncoder> {
158 self.stream().create_encoder(desc).clone_as_encoder()
159 }
160
161 pub fn exists(&self, range: Range<usize>) -> Option<Range<usize>> {
162 if range.start >= self.chunk().len() {
163 return Some(self.chunk().len()..self.chunk().len());
164 }
165 if range.end == 0 {
166 return Some(0..0);
167 }
168 let range = usize::min(range.start, self.chunk().len())..usize::min(range.end, self.chunk().len());
169 let index_start = (range.start / PieceData::max_payload()) as u32;
170 let index_end = ((range.end - 1) / PieceData::max_payload()) as u32;
171 for index in index_start..index_end + 1 {
172 if !self.stream().exists(index).unwrap() {
173 return None;
174 }
175 }
176 return Some(range);
177 }
178
179 pub async fn wait_exists<T: futures::Future<Output=BuckyError>, A: Fn() -> T>(
180 &self,
181 range: Range<usize>,
182 abort: A
183 ) -> BuckyResult<Range<usize>> {
184 trace!("{} wait_exists {:?}", self, range);
185 if range.start >= self.chunk().len() {
186 let r = self.chunk().len()..self.chunk().len();
187 trace!("{} wait_exists {:?} return {:?}", self, range, r);
188 return Ok(r);
189 }
190 if range.end == 0 {
191 let r = 0..0;
192 trace!("{} wait_exists {:?} return {:?}", self, range, r);
193 return Ok(r);
194 }
195 let range = usize::min(range.start, self.chunk().len())..usize::min(range.end, self.chunk().len());
196 let index_start = (range.start / PieceData::max_payload()) as u32;
197 let index_end = ((range.end - 1) / PieceData::max_payload()) as u32;
198 for index in index_start..index_end + 1 {
199 self.stream().wait_exists(index, abort()).await?;
200 }
201 trace!("{} wait_exists {:?} return {:?}", self, range, range);
202 Ok(range)
203 }
204
205 pub async fn read<T: futures::Future<Output=BuckyError>, A: Fn() -> T>(
206 &self,
207 offset: usize,
208 buffer: &mut [u8],
209 abort: A
210 ) -> BuckyResult<usize> {
211 let (desc, mut offset) = PieceDesc::from_stream_offset(PieceData::max_payload(), offset as u32);
212 let (mut index, range) = desc.unwrap_as_stream();
213 let mut read = 0;
214 loop {
215 let this_read = self.stream().async_read(
216 &PieceDesc::Range(index, range),
217 offset as usize,
218 &mut buffer[read..],
219 abort()).await?;
220 read += this_read;
221 if this_read == 0
222 || read >= buffer.len() {
223 break;
224 }
225 index += 1;
226 offset = 0;
227 }
228 Ok(read)
229 }
230}
231