cyfs_bdt/utils/ndn/
mem_tracker.rs1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::RwLock,
4 str::FromStr
5};
6use async_std::{
7 sync::Arc
8};
9use cyfs_base::*;
10use cyfs_util::cache::*;
11
12#[derive(PartialEq, Eq, PartialOrd, Ord)]
13struct ChunkStoreStub {
14 path: String,
15 range_begin: u64,
16 range_end: u64
17}
18
19impl ChunkStoreStub {
20 fn to_cache_data(&self) -> TrackerPositionCacheData {
21 TrackerPositionCacheData {
22 direction: TrackerDirection::Store,
23 pos: TrackerPostion::FileRange(PostionFileRange {
24 path: self.path.clone(),
25 range_begin: self.range_begin,
26 range_end: self.range_end,
27 }),
28 insert_time: 0,
29 flags: 0,
30 }
31 }
32}
33
34struct ChunkStub {
35 state: ChunkState,
36 owners: BTreeSet<ObjectId>,
37 positions: BTreeSet<ChunkStoreStub>
38}
39
40impl ChunkStub {
41 fn to_cache_data(&self, chunk: &ChunkId) -> ChunkCacheData {
42 ChunkCacheData {
43 chunk_id: chunk.clone(),
44 state: self.state,
45 flags: 0,
46 insert_time: 0,
47 update_time: 0,
48 last_access_time: 0,
49 trans_sessions: None,
50 ref_objects: if self.owners.len() > 0 {
51 Some(self.owners.iter().map(|o| ChunkObjectRef {
52 object_id: o.clone(),
53 relation: ChunkObjectRelation::FileBody
54 }).collect())
55 } else {
56 None
57 },
58 }
59 }
60}
61
62struct TrackerImpl {
63 chunks: BTreeMap<ChunkId, ChunkStub>,
64}
65
66#[derive(Clone)]
67pub struct MemTracker(Arc<RwLock<TrackerImpl>>);
68
69impl MemTracker {
70 pub fn new() -> Self {
71 Self(Arc::new(RwLock::new(TrackerImpl {
72 chunks: BTreeMap::new()
73 })))
74 }
75}
76
77#[async_trait::async_trait]
78impl NamedDataCache for MemTracker {
79 fn clone(&self) -> Box<dyn NamedDataCache> {
80 Box::new(Self(self.0.clone()))
81 }
82
83 async fn insert_file(&self, _req: &InsertFileRequest) -> BuckyResult<()> {
85 Ok(())
86 }
87
88 async fn remove_file(&self, _req: &RemoveFileRequest) -> BuckyResult<usize> {
89 Ok(0)
90 }
91
92 async fn file_update_quick_hash(&self, _req: &FileUpdateQuickhashRequest) -> BuckyResult<()> {
93 Ok(())
94 }
95
96 async fn get_file_by_hash(&self, _req: &GetFileByHashRequest) -> BuckyResult<Option<FileCacheData>> {
97 Ok(None)
98 }
99
100 async fn get_file_by_file_id(&self, _req: &GetFileByFileIdRequest) -> BuckyResult<Option<FileCacheData>> {
101 Ok(None)
102 }
103
104 async fn get_files_by_quick_hash(&self, _req: &GetFileByQuickHashRequest) -> BuckyResult<Vec<FileCacheData>> {
105 Ok(vec![])
106 }
107
108 async fn get_files_by_chunk(&self, _req: &GetFileByChunkRequest) -> BuckyResult<Vec<FileCacheData>> {
109 Ok(vec![])
110 }
111
112 async fn get_dirs_by_file(&self, _req: &GetDirByFileRequest) -> BuckyResult<Vec<FileDirRef>> {
113 Ok(vec![])
114 }
115
116
117 async fn insert_chunk(&self, req: &InsertChunkRequest) -> BuckyResult<()> {
118 let mut tracker = self.0.write().unwrap();
119 tracker.chunks.entry(req.chunk_id.clone()).or_insert(ChunkStub {
120 state: req.state,
121 owners: {
122 let mut owners = BTreeSet::new();
123 if let Some(ref_objects) = req.ref_objects.as_ref() {
124 for r in ref_objects {
125 owners.insert(r.object_id.clone());
126 }
127 }
128 owners
129 },
130 positions: BTreeSet::new()
131 });
132 Ok(())
133 }
134
135 async fn remove_chunk(&self, req: &RemoveChunkRequest) -> BuckyResult<usize> {
136 let mut tracker = self.0.write().unwrap();
137 Ok(if tracker.chunks.remove(&req.chunk_id).is_some() {
138 1
139 } else {
140 0
141 })
142 }
143
144 async fn update_chunk_state(&self, req: &UpdateChunkStateRequest) -> BuckyResult<ChunkState> {
145 let mut tracker = self.0.write().unwrap();
146 if let Some(stub) = tracker.chunks.get_mut(&req.chunk_id) {
147 stub.state = req.state;
148 Ok(req.state)
149 } else {
150 Err(BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
151 }
152 }
153
154 async fn update_chunk_ref_objects(&self, req: &UpdateChunkRefsRequest) -> BuckyResult<()> {
155 let mut tracker = self.0.write().unwrap();
156 if let Some(stub) = tracker.chunks.get_mut(&req.chunk_id) {
157 for a in &req.add_list {
158 stub.owners.insert(a.object_id.clone());
159 }
160 for r in &req.remove_list {
161 stub.owners.remove(&r.object_id);
162 }
163 Ok(())
164 } else {
165 Err(BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
166 }
167 }
168
169 async fn exists_chunks(&self, req: &ExistsChunkRequest) -> BuckyResult<Vec<bool>> {
170 let inner = self.0.read().unwrap();
171 Ok(req.chunk_list.iter().map(|chunk_id| {
172 if let Some(info) = inner.chunks.get(chunk_id) {
173 req.states.iter().find(|&state| *state == info.state).is_some()
174 } else {
175 false
176 }
177 }).collect())
178 }
179
180 async fn get_chunk(&self, req: &GetChunkRequest) -> BuckyResult<Option<ChunkCacheData>> {
181 Ok(self.0.read().unwrap().chunks.get(&req.chunk_id).map(|stub| stub.to_cache_data(&req.chunk_id)))
182 }
183
184 async fn get_chunks(&self, req: &Vec<GetChunkRequest>) -> BuckyResult<Vec<Option<ChunkCacheData>>> {
185 let tracker = self.0.read().unwrap();
186 Ok(req.iter().map(|r| tracker.chunks.get(&r.chunk_id).map(|stub| stub.to_cache_data(&r.chunk_id))).collect())
187 }
188
189 async fn get_chunk_ref_objects(&self, req: &GetChunkRefObjectsRequest) -> BuckyResult<Vec<ChunkObjectRef>> {
190 self.0.read().unwrap().chunks.get(&req.chunk_id)
191 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
192 .map(|stub| stub.owners.iter().map(|o| ChunkObjectRef {
193 object_id: o.clone(),
194 relation: ChunkObjectRelation::FileBody
195 }).collect())
196 }
197
198 async fn select_chunk(&self, _req: &SelectChunkRequest) -> BuckyResult<SelectChunkResponse> {
199 unimplemented!();
200 }
201
202 async fn stat(&self) -> BuckyResult<NamedDataCacheStat> {
203 unimplemented!();
204 }
205}
206
207#[async_trait::async_trait]
208impl TrackerCache for MemTracker {
209 fn clone(&self) -> Box<dyn TrackerCache> {
210 Box::new(Self(self.0.clone()))
211 }
212
213 async fn add_position(&self, req: &AddTrackerPositonRequest) -> BuckyResult<()> {
214 let chunk = ChunkId::from_str(req.id.as_str())?;
215 let mut tracker = self.0.write().unwrap();
216 tracker.chunks.get_mut(&chunk)
217 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
218 .map(|stub| {
219 match &req.pos {
220 TrackerPostion::File(p) => {
221 let _ = stub.positions.insert(ChunkStoreStub {
222 path: p.clone(),
223 range_begin: 0,
224 range_end: chunk.len() as u64
225 });
226 },
227 TrackerPostion::FileRange(p) => {
228 let _ = stub.positions.insert(ChunkStoreStub {
229 path: p.path.clone(),
230 range_begin: p.range_begin,
231 range_end: p.range_end
232 });
233 },
234 _ => {}
235 }
236 ()
237 })
238 }
239
240 async fn remove_position(&self, req: &RemoveTrackerPositionRequest) -> BuckyResult<usize> {
241 let chunk = ChunkId::from_str(req.id.as_str())?;
242 let mut tracker = self.0.write().unwrap();
243 tracker.chunks.get_mut(&chunk)
244 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
245 .map(|stub| if let Some(p) = &req.pos {
246 match p {
247 TrackerPostion::FileRange(p) => {
248 if stub.positions.remove(&ChunkStoreStub {
249 path: p.path.clone(),
250 range_begin: p.range_begin,
251 range_end: p.range_end
252 }) {
253 1
254 } else {
255 0
256 }
257 },
258 _ => 0
259 }
260
261 } else {
262 let mut empty = BTreeSet::new();
263 std::mem::swap(&mut stub.positions, &mut empty);
264 empty.len()
265 })
266 }
267
268 async fn get_position(
269 &self,
270 req: &GetTrackerPositionRequest,
271 ) -> BuckyResult<Vec<TrackerPositionCacheData>> {
272 let chunk = ChunkId::from_str(req.id.as_str())?;
273 self.0.read().unwrap().chunks.get(&chunk)
274 .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
275 .map(|stub| stub.positions.iter().map(|p| p.to_cache_data()).collect())
276 }
277}