cyfs_bdt/ndn/chunk/
manager.rs1use std::{
2 collections::{BTreeMap, LinkedList},
3 sync::{Mutex},
4};
5use async_std::{
6 io::Cursor
7};
8use async_trait::async_trait;
9use cyfs_base::*;
10use cyfs_util::*;
11use crate::{
12 types::*,
13 stack::{WeakStack, Stack},
14};
15use super::super::{
16 download::*
17};
18use super::{
19 storage::*,
20 cache::*,
21 download::*
22};
23
24#[derive(Clone)]
25pub struct Config {
26 pub raw_caches: RawCacheConfig
27}
28
29struct Downloaders(LinkedList<WeakChunkDownloader>);
30
31impl Downloaders {
32 fn new() -> Self {
33 Self(Default::default())
34 }
35
36 fn create_downloader(
37 &mut self,
38 stack: &WeakStack,
39 cache: ChunkCache,
40 task: Box<dyn LeafDownloadTask>
41 ) -> ChunkDownloader {
42 let downloader = ChunkDownloader::new(stack.clone(), cache, task);
43 self.0.push_back(downloader.to_weak());
44 downloader
45 }
46
47 fn get_all(&mut self) -> LinkedList<ChunkDownloader> {
48 let mut all = LinkedList::new();
49 let mut remain = LinkedList::new();
50 for weak in &self.0 {
51 if let Some(downloader) = weak.to_strong() {
52 remain.push_back(weak.clone());
53 all.push_back(downloader);
54 }
55 }
56 std::mem::swap(&mut self.0, &mut remain);
57 all
58 }
59}
60
61
62pub struct ChunkManager {
63 stack: WeakStack,
64 store: Box<dyn ChunkReader>,
65 raw_caches: RawCacheManager,
66 caches: Mutex<BTreeMap<ChunkId, WeakChunkCache>>,
67 downloaders: Mutex<Downloaders>
68}
69
70impl std::fmt::Display for ChunkManager {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 write!(f, "ChunkManager:{{local:{}}}", Stack::from(&self.stack).local_device_id())
73 }
74}
75
76
77struct EmptyChunkWrapper(Box<dyn ChunkReader>);
78
79impl EmptyChunkWrapper {
80 fn new(non_empty: Box<dyn ChunkReader>) -> Self {
81 Self(non_empty)
82 }
83}
84
85#[async_trait]
86impl ChunkReader for EmptyChunkWrapper {
87 fn clone_as_reader(&self) -> Box<dyn ChunkReader> {
88 Box::new(Self(self.0.clone_as_reader()))
89 }
90
91 async fn exists(&self, chunk: &ChunkId) -> bool {
92 if chunk.len() == 0 {
93 true
94 } else {
95 self.0.exists(chunk).await
96 }
97 }
98
99 async fn get(&self, chunk: &ChunkId) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
100 if chunk.len() == 0 {
101 Ok(Box::new(Cursor::new(vec![0u8; 0])))
102 } else {
103 self.0.get(chunk).await
104 }
105 }
106}
107
108
109
110impl ChunkManager {
111 pub(crate) fn new(
112 weak_stack: WeakStack,
113 store: Box<dyn ChunkReader>
114 ) -> Self {
115 let stack = Stack::from(&weak_stack);
116 Self {
117 stack: weak_stack,
118 store: Box::new(EmptyChunkWrapper::new(store)),
119 raw_caches: RawCacheManager::new(stack.local_device_id().clone(), stack.config().ndn.chunk.raw_caches.clone()),
120 caches: Mutex::new(Default::default()),
121 downloaders: Mutex::new(Downloaders::new())
122 }
123 }
124
125 pub(crate) fn on_statistic(&self) -> String {
126 format!("ChunkCacheCount:{}, UsedMem: {}", self.caches.lock().unwrap().len(), self.raw_caches().used_mem())
127 }
128
129 pub fn store(&self) -> &dyn ChunkReader {
130 self.store.as_ref()
131 }
132
133 pub fn raw_caches(&self) -> &RawCacheManager {
134 &self.raw_caches
135 }
136
137 pub fn create_cache(&self, chunk: &ChunkId) -> ChunkCache {
138 let mut caches = self.caches.lock().unwrap();
139 if let Some(weak) = caches.get(chunk) {
140 if let Some(cache) = weak.to_strong().clone() {
141 return cache;
142 }
143 caches.remove(chunk);
144 }
145 let cache = ChunkCache::new(self.stack.clone(), chunk.clone());
146 info!("{} create new cache {}", self, cache);
147 caches.insert(chunk.clone(), cache.to_weak());
148 cache
149 }
150
151 pub fn create_downloader(&self, chunk: &ChunkId, task: Box<dyn LeafDownloadTask>) -> ChunkDownloader {
152 let cache = self.create_cache(chunk);
153 let mut downloaders = self.downloaders.lock().unwrap();
154 let downloader = downloaders.create_downloader(&self.stack, cache, task);
155 info!("{} create new downloader {}", self, downloader);
156 downloader
157 }
158
159 pub(in super::super) fn on_schedule(&self, _now: Timestamp) {
160 let downloaders = {
161 let mut downloaders = self.downloaders.lock().unwrap();
162 downloaders.get_all()
163 };
164 for downloader in downloaders {
165 downloader.on_drain(0);
166 }
167
168 {
169 let mut remove = LinkedList::new();
170 let mut caches = self.caches.lock().unwrap();
171 for (chunk, cache) in caches.iter() {
172 if cache.to_strong().is_none() {
173 remove.push_back(chunk.clone());
174 }
175 }
176
177 for chunk in remove {
178 caches.remove(&chunk);
179 }
180 }
181 }
182}