cyfs_bdt/ndn/chunk/
manager.rs

1use std::{
2    collections::{BTreeMap, LinkedList}, 
3    sync::{Mutex},
4};
5use async_std::{
6    io::Cursor
7};
8use async_trait::async_trait;
9use cyfs_base::*;
10use cyfs_util::*;
11use crate::{
12    types::*, 
13    stack::{WeakStack, Stack},
14};
15use super::super::{
16    download::*
17};
18use super::{
19    storage::*,  
20    cache::*,
21    download::*
22};
23
24#[derive(Clone)]
25pub struct Config {
26    pub raw_caches: RawCacheConfig
27}
28
29struct Downloaders(LinkedList<WeakChunkDownloader>);
30
31impl Downloaders {
32    fn new() -> Self {
33        Self(Default::default())
34    }
35
36    fn create_downloader(
37        &mut self, 
38        stack: &WeakStack, 
39        cache: ChunkCache, 
40        task: Box<dyn LeafDownloadTask>
41    ) -> ChunkDownloader {
42        let downloader = ChunkDownloader::new(stack.clone(), cache, task);
43        self.0.push_back(downloader.to_weak());
44        downloader
45    }
46
47    fn get_all(&mut self) -> LinkedList<ChunkDownloader> {
48        let mut all = LinkedList::new();
49        let mut remain = LinkedList::new();
50        for weak in &self.0 {
51            if let Some(downloader) = weak.to_strong() {
52                remain.push_back(weak.clone());
53                all.push_back(downloader);
54            } 
55        }
56        std::mem::swap(&mut self.0, &mut remain);
57        all
58    }
59}
60
61
62pub struct ChunkManager {
63    stack: WeakStack, 
64    store: Box<dyn ChunkReader>, 
65    raw_caches: RawCacheManager, 
66    caches: Mutex<BTreeMap<ChunkId, WeakChunkCache>>, 
67    downloaders: Mutex<Downloaders>
68}
69
70impl std::fmt::Display for ChunkManager {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(f, "ChunkManager:{{local:{}}}", Stack::from(&self.stack).local_device_id())
73    }
74}
75
76
77struct EmptyChunkWrapper(Box<dyn ChunkReader>);
78
79impl EmptyChunkWrapper {
80    fn new(non_empty: Box<dyn ChunkReader>) -> Self {
81        Self(non_empty)
82    }
83}
84
85#[async_trait]
86impl ChunkReader for EmptyChunkWrapper {
87    fn clone_as_reader(&self) -> Box<dyn ChunkReader> {
88        Box::new(Self(self.0.clone_as_reader()))
89    }
90
91    async fn exists(&self, chunk: &ChunkId) -> bool {
92        if chunk.len() == 0 {
93            true
94        } else {
95            self.0.exists(chunk).await
96        }
97    }
98
99    async fn get(&self, chunk: &ChunkId) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
100        if chunk.len() == 0 {
101            Ok(Box::new(Cursor::new(vec![0u8; 0])))
102        } else {
103            self.0.get(chunk).await
104        }
105    }
106}
107
108
109
110impl ChunkManager {
111    pub(crate) fn new(
112        weak_stack: WeakStack, 
113        store: Box<dyn ChunkReader>
114    ) -> Self {
115        let stack = Stack::from(&weak_stack);
116        Self { 
117            stack: weak_stack, 
118            store: Box::new(EmptyChunkWrapper::new(store)), 
119            raw_caches: RawCacheManager::new(stack.local_device_id().clone(), stack.config().ndn.chunk.raw_caches.clone()), 
120            caches: Mutex::new(Default::default()), 
121            downloaders: Mutex::new(Downloaders::new())
122        }
123    }
124
125    pub(crate) fn on_statistic(&self) -> String {
126        format!("ChunkCacheCount:{}, UsedMem: {}",  self.caches.lock().unwrap().len(), self.raw_caches().used_mem())
127    }
128
129    pub fn store(&self) -> &dyn ChunkReader {
130        self.store.as_ref()
131    }
132
133    pub fn raw_caches(&self) -> &RawCacheManager {
134        &self.raw_caches
135    }
136
137    pub fn create_cache(&self, chunk: &ChunkId) -> ChunkCache {
138        let mut caches = self.caches.lock().unwrap();
139        if let Some(weak) = caches.get(chunk) {
140            if let Some(cache) = weak.to_strong().clone() {
141                return cache;
142            }
143            caches.remove(chunk);
144        } 
145        let cache = ChunkCache::new(self.stack.clone(), chunk.clone());
146        info!("{} create new cache {}", self, cache);
147        caches.insert(chunk.clone(), cache.to_weak());
148        cache
149    }
150
151    pub fn create_downloader(&self, chunk: &ChunkId, task: Box<dyn LeafDownloadTask>) -> ChunkDownloader {
152        let cache = self.create_cache(chunk);
153        let mut downloaders = self.downloaders.lock().unwrap();
154        let downloader = downloaders.create_downloader(&self.stack, cache, task);
155        info!("{} create new downloader {}", self, downloader);
156        downloader
157    }
158
159    pub(in super::super) fn on_schedule(&self, _now: Timestamp) {
160        let downloaders = {
161            let mut downloaders = self.downloaders.lock().unwrap();
162            downloaders.get_all()
163        };
164        for downloader in downloaders {
165            downloader.on_drain(0);
166        }
167
168        {
169            let mut remove = LinkedList::new();
170            let mut caches = self.caches.lock().unwrap();
171            for (chunk, cache) in caches.iter() {
172                if cache.to_strong().is_none() {
173                    remove.push_back(chunk.clone());
174                }
175            }
176
177            for chunk in remove {
178                caches.remove(&chunk);
179            }
180        }
181    } 
182}