cyfs_bdt/ndn/chunk/cache/
cache.rs

1use log::*;
2use std::{
3    sync::{Arc, Weak, Mutex}, 
4    ops::Range, 
5};
6use async_std::{
7    task
8};
9use cyfs_base::*;
10use crate::{
11    types::*,
12    stack::{Stack, WeakStack}
13};
14use super::super::super::{
15    types::*, 
16    channel::protocol::v0::PieceData,
17};
18use super::super::{
19    storage::*
20};
21use super::{
22    encode::*, 
23    stream::*, 
24    raw_cache::*
25};
26
27enum CacheState {
28    Loading(StateWaiter),
29    Loaded(bool)
30}
31
32struct CacheImpl {
33    chunk: ChunkId,  
34    state: Mutex<CacheState>, 
35    stream_cache: ChunkStreamCache, 
36}
37
38#[derive(Clone)]
39pub struct ChunkCache(Arc<CacheImpl>);
40
41
42impl std::fmt::Display for CacheImpl {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "ChunkCache{{chunk:{}}}", self.chunk)
45    }
46}
47
48impl Drop for CacheImpl {
49    fn drop(&mut self) {
50        info!("{} released", self);
51    }
52}
53
54
55pub struct WeakChunkCache(Weak<CacheImpl>);
56
57impl WeakChunkCache {
58    pub fn to_strong(&self) -> Option<ChunkCache> {
59        Weak::upgrade(&self.0).map(|arc| ChunkCache(arc))
60    }
61}
62
63impl ChunkCache {
64    pub fn to_weak(&self) -> WeakChunkCache {
65        WeakChunkCache(Arc::downgrade(&self.0))
66    }
67}
68
69
70impl std::fmt::Display for ChunkCache {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(f, "{}", self.0)
73    }
74}
75
76impl ChunkCache {
77    pub fn new(stack: WeakStack, chunk: ChunkId) -> Self {
78        let cache = Self(Arc::new(CacheImpl {
79            stream_cache: ChunkStreamCache::new(&chunk), 
80            chunk, 
81            state: Mutex::new(CacheState::Loading(StateWaiter::new()))
82        }));
83
84        {
85            let stack = Stack::from(&stack);
86            let cache = cache.clone();
87
88            task::spawn(async move {
89                let raw_cache = stack.ndn().chunk_manager().raw_caches().alloc(cache.chunk().len()).await;
90                let finished = cache.load(raw_cache.as_ref(), stack.ndn().chunk_manager().store()).await.is_ok();
91                let _ = cache.stream().load(finished, raw_cache);
92                let waiters = {
93                    let state = &mut *cache.0.state.lock().unwrap();
94                    match state {
95                        CacheState::Loading(waiters) => {
96                            let waiters = waiters.transfer(); 
97                            *state = CacheState::Loaded(finished);
98                            waiters
99                        },
100                        _ => unreachable!()
101                    }
102                };
103                waiters.wake();
104            });
105        }
106        
107        cache
108    }
109
110
111    async fn load(&self, cache: &dyn RawCache, storage: &dyn ChunkReader) -> BuckyResult<()> {
112        let reader = storage.get(self.chunk()).await?;
113
114        let writer = cache.async_writer().await?;
115
116        let written = async_std::io::copy(reader, writer).await? as usize;
117        
118        if written != self.chunk().len() {
119            Err(BuckyError::new(BuckyErrorCode::InvalidInput, ""))
120        } else {
121            Ok(())
122        }
123    } 
124
125    pub async fn wait_loaded(&self) -> bool {
126        let (waiter, finished) = {
127            let state = &mut *self.0.state.lock().unwrap();
128            match state {
129                CacheState::Loading(waiters) => (Some(waiters.new_waiter()), None), 
130                CacheState::Loaded(finished) => (None, Some(*finished))
131            }
132        };
133
134        if let Some(waiter) = waiter {
135            StateWaiter::wait(waiter, || {
136                let state = &*self.0.state.lock().unwrap();
137                if let CacheState::Loaded(finished) = state {
138                    *finished
139                } else {
140                    unreachable!()
141                }
142            }).await
143        } else {
144            finished.unwrap()
145        }
146        
147    }
148
149    pub fn chunk(&self) -> &ChunkId {
150        &self.0.chunk
151    }
152
153    pub fn stream(&self) -> &ChunkStreamCache {
154        &self.0.stream_cache
155    }
156
157    pub fn create_encoder(&self, desc: &ChunkCodecDesc) -> Box<dyn ChunkEncoder> {
158        self.stream().create_encoder(desc).clone_as_encoder()
159    }
160
161    pub fn exists(&self, range: Range<usize>) -> Option<Range<usize>> {
162        if range.start >= self.chunk().len() {
163            return Some(self.chunk().len()..self.chunk().len());
164        }
165        if range.end == 0 {
166            return Some(0..0);
167        }
168        let range = usize::min(range.start, self.chunk().len())..usize::min(range.end, self.chunk().len());
169        let index_start = (range.start / PieceData::max_payload()) as u32;
170        let index_end = ((range.end - 1) / PieceData::max_payload()) as u32;
171        for index in index_start..index_end + 1 {
172            if !self.stream().exists(index).unwrap() {
173                return None;
174            }
175        }
176        return Some(range);
177    }
178
179    pub async fn wait_exists<T: futures::Future<Output=BuckyError>, A: Fn() -> T>(
180        &self, 
181        range: Range<usize>, 
182        abort: A
183    ) -> BuckyResult<Range<usize>> {
184        trace!("{} wait_exists {:?}", self, range);
185        if range.start >= self.chunk().len() {
186            let r = self.chunk().len()..self.chunk().len();
187            trace!("{} wait_exists {:?} return {:?}", self, range, r);
188            return Ok(r);
189        }
190        if range.end == 0 {
191            let r = 0..0;
192            trace!("{} wait_exists {:?} return {:?}", self, range, r);
193            return Ok(r);
194        }
195        let range = usize::min(range.start, self.chunk().len())..usize::min(range.end, self.chunk().len());
196        let index_start = (range.start / PieceData::max_payload()) as u32;
197        let index_end = ((range.end - 1) / PieceData::max_payload()) as u32;
198        for index in index_start..index_end + 1 {
199            self.stream().wait_exists(index, abort()).await?;
200        }
201        trace!("{} wait_exists {:?} return {:?}", self, range, range);
202        Ok(range)
203    }
204    
205    pub async fn read<T: futures::Future<Output=BuckyError>, A: Fn() -> T>(
206        &self, 
207        offset: usize, 
208        buffer: &mut [u8], 
209        abort: A
210    ) -> BuckyResult<usize> {
211        let (desc, mut offset) = PieceDesc::from_stream_offset(PieceData::max_payload(), offset as u32);
212        let (mut index, range) = desc.unwrap_as_stream();
213        let mut read = 0;
214        loop {
215            let this_read = self.stream().async_read(
216                &PieceDesc::Range(index, range), 
217                offset as usize, 
218                &mut buffer[read..], 
219                abort()).await?;
220            read += this_read;
221            if this_read == 0 
222                || read >= buffer.len() {
223                break;
224            }
225            index += 1;
226            offset = 0;
227        }
228        Ok(read)
229    }
230}
231