cyfs_bdt/utils/ndn/
mem_tracker.rs

1use std::{
2    collections::{BTreeMap, BTreeSet}, 
3    sync::RwLock, 
4    str::FromStr
5};
6use async_std::{
7    sync::Arc
8};
9use cyfs_base::*;
10use cyfs_util::cache::*;
11
12#[derive(PartialEq, Eq, PartialOrd, Ord)]
13struct ChunkStoreStub {
14    path: String, 
15    range_begin: u64, 
16    range_end: u64
17}
18
19impl ChunkStoreStub {
20    fn to_cache_data(&self) -> TrackerPositionCacheData {
21        TrackerPositionCacheData {
22            direction: TrackerDirection::Store,
23            pos: TrackerPostion::FileRange(PostionFileRange {
24                path: self.path.clone(),
25                range_begin: self.range_begin,
26                range_end: self.range_end,
27            }),
28            insert_time: 0,
29            flags: 0,
30        }
31    }
32}
33
34struct ChunkStub {
35    state: ChunkState, 
36    owners: BTreeSet<ObjectId>, 
37    positions: BTreeSet<ChunkStoreStub>
38}
39
40impl ChunkStub {
41    fn to_cache_data(&self, chunk: &ChunkId) -> ChunkCacheData {
42        ChunkCacheData {
43            chunk_id: chunk.clone(), 
44            state: self.state, 
45            flags: 0, 
46            insert_time: 0, 
47            update_time: 0, 
48            last_access_time: 0, 
49            trans_sessions: None, 
50            ref_objects: if self.owners.len() > 0 {
51                Some(self.owners.iter().map(|o| ChunkObjectRef {
52                    object_id: o.clone(), 
53                    relation: ChunkObjectRelation::FileBody
54                }).collect())
55            } else {
56                None
57            },
58        }
59    }
60}
61
62struct TrackerImpl {
63    chunks: BTreeMap<ChunkId, ChunkStub>, 
64}
65
66#[derive(Clone)]
67pub struct MemTracker(Arc<RwLock<TrackerImpl>>);
68
69impl MemTracker {
70    pub fn new() -> Self {
71        Self(Arc::new(RwLock::new(TrackerImpl {
72            chunks: BTreeMap::new()
73        })))
74    }
75}
76
77#[async_trait::async_trait]
78impl NamedDataCache for MemTracker {
79    fn clone(&self) -> Box<dyn NamedDataCache> {
80        Box::new(Self(self.0.clone()))
81    }
82
83    // file相关接口
84    async fn insert_file(&self, _req: &InsertFileRequest) -> BuckyResult<()> {
85        Ok(())
86    }
87
88    async fn remove_file(&self, _req: &RemoveFileRequest) -> BuckyResult<usize> {
89        Ok(0)
90    }
91
92    async fn file_update_quick_hash(&self, _req: &FileUpdateQuickhashRequest) -> BuckyResult<()> {
93        Ok(())
94    }
95
96    async fn get_file_by_hash(&self, _req: &GetFileByHashRequest) -> BuckyResult<Option<FileCacheData>> {
97        Ok(None)
98    }
99
100    async fn get_file_by_file_id(&self, _req: &GetFileByFileIdRequest) -> BuckyResult<Option<FileCacheData>> {
101        Ok(None)
102    }
103
104    async fn get_files_by_quick_hash(&self, _req: &GetFileByQuickHashRequest) -> BuckyResult<Vec<FileCacheData>> {
105        Ok(vec![])
106    }
107
108    async fn get_files_by_chunk(&self, _req: &GetFileByChunkRequest) -> BuckyResult<Vec<FileCacheData>> {
109        Ok(vec![])
110    }
111
112    async fn get_dirs_by_file(&self, _req: &GetDirByFileRequest) -> BuckyResult<Vec<FileDirRef>> {
113        Ok(vec![])
114    }
115
116
117    async fn insert_chunk(&self, req: &InsertChunkRequest) -> BuckyResult<()> {
118        let mut tracker = self.0.write().unwrap();
119        tracker.chunks.entry(req.chunk_id.clone()).or_insert(ChunkStub {
120            state: req.state, 
121            owners: {
122                let mut owners = BTreeSet::new();
123                if let Some(ref_objects) = req.ref_objects.as_ref() {
124                    for r in ref_objects {
125                        owners.insert(r.object_id.clone());
126                    }
127                } 
128                owners
129            }, 
130            positions: BTreeSet::new()
131        });
132        Ok(())
133    }
134
135    async fn remove_chunk(&self, req: &RemoveChunkRequest) -> BuckyResult<usize> {
136        let mut tracker = self.0.write().unwrap();
137        Ok(if tracker.chunks.remove(&req.chunk_id).is_some() {
138            1
139        } else {
140            0
141        })
142    }
143
144    async fn update_chunk_state(&self, req: &UpdateChunkStateRequest) -> BuckyResult<ChunkState> {
145        let mut tracker = self.0.write().unwrap();
146        if let Some(stub) = tracker.chunks.get_mut(&req.chunk_id) {
147            stub.state = req.state;
148            Ok(req.state)
149        } else {
150            Err(BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
151        }
152    }
153    
154    async fn update_chunk_ref_objects(&self, req: &UpdateChunkRefsRequest) -> BuckyResult<()> {
155        let mut tracker = self.0.write().unwrap();
156        if let Some(stub) = tracker.chunks.get_mut(&req.chunk_id) {
157            for a in &req.add_list {
158                stub.owners.insert(a.object_id.clone());
159            }
160            for r in &req.remove_list {
161                stub.owners.remove(&r.object_id);
162            }
163            Ok(())
164        } else {
165            Err(BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
166        }
167    }
168
169    async fn exists_chunks(&self, req: &ExistsChunkRequest) -> BuckyResult<Vec<bool>> {
170        let inner = self.0.read().unwrap();
171        Ok(req.chunk_list.iter().map(|chunk_id| {
172            if let Some(info) = inner.chunks.get(chunk_id) {
173                req.states.iter().find(|&state| *state == info.state).is_some()
174            } else {
175                false
176            }
177        }).collect())
178    }
179
180    async fn get_chunk(&self, req: &GetChunkRequest) -> BuckyResult<Option<ChunkCacheData>> {
181        Ok(self.0.read().unwrap().chunks.get(&req.chunk_id).map(|stub| stub.to_cache_data(&req.chunk_id)))
182    }
183
184    async fn get_chunks(&self, req: &Vec<GetChunkRequest>) -> BuckyResult<Vec<Option<ChunkCacheData>>> {
185        let tracker = self.0.read().unwrap();
186        Ok(req.iter().map(|r| tracker.chunks.get(&r.chunk_id).map(|stub| stub.to_cache_data(&r.chunk_id))).collect())
187    }
188
189    async fn get_chunk_ref_objects(&self, req: &GetChunkRefObjectsRequest) -> BuckyResult<Vec<ChunkObjectRef>> {
190        self.0.read().unwrap().chunks.get(&req.chunk_id)
191            .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
192            .map(|stub| stub.owners.iter().map(|o| ChunkObjectRef {
193                object_id: o.clone(), 
194                relation: ChunkObjectRelation::FileBody
195            }).collect())
196    }
197
198    async fn select_chunk(&self, _req: &SelectChunkRequest) -> BuckyResult<SelectChunkResponse> {
199        unimplemented!();
200    }
201
202    async fn stat(&self) -> BuckyResult<NamedDataCacheStat> {
203        unimplemented!();
204    }
205}
206
207#[async_trait::async_trait]
208impl TrackerCache for MemTracker {
209    fn clone(&self) -> Box<dyn TrackerCache> {
210        Box::new(Self(self.0.clone()))
211    }
212
213    async fn add_position(&self, req: &AddTrackerPositonRequest) -> BuckyResult<()> {
214        let chunk = ChunkId::from_str(req.id.as_str())?;
215        let mut tracker = self.0.write().unwrap();
216        tracker.chunks.get_mut(&chunk)
217            .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
218            .map(|stub| {
219                match &req.pos {
220                    TrackerPostion::File(p) => {
221                        let _ = stub.positions.insert(ChunkStoreStub {
222                            path: p.clone(), 
223                            range_begin: 0, 
224                            range_end: chunk.len() as u64
225                        });
226                    },
227                    TrackerPostion::FileRange(p) => {
228                        let _ = stub.positions.insert(ChunkStoreStub {
229                            path: p.path.clone(), 
230                            range_begin: p.range_begin, 
231                            range_end: p.range_end
232                        });
233                    },
234                    _ => {}
235                }
236                ()
237            })
238    }
239
240    async fn remove_position(&self, req: &RemoveTrackerPositionRequest) -> BuckyResult<usize> {
241        let chunk = ChunkId::from_str(req.id.as_str())?;
242        let mut tracker = self.0.write().unwrap();
243        tracker.chunks.get_mut(&chunk)
244            .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
245            .map(|stub| if let Some(p) = &req.pos {
246                match p {
247                    TrackerPostion::FileRange(p) => {
248                        if stub.positions.remove(&ChunkStoreStub {
249                            path: p.path.clone(), 
250                            range_begin: p.range_begin, 
251                            range_end: p.range_end
252                        }) {
253                            1
254                        } else {
255                            0
256                        }
257                    },
258                    _ => 0
259                }
260                
261            } else {
262                let mut empty = BTreeSet::new();
263                std::mem::swap(&mut stub.positions, &mut empty);
264                empty.len()
265            })
266    }
267
268    async fn get_position(
269        &self,
270        req: &GetTrackerPositionRequest,
271    ) -> BuckyResult<Vec<TrackerPositionCacheData>> {
272        let chunk = ChunkId::from_str(req.id.as_str())?;
273        self.0.read().unwrap().chunks.get(&chunk)
274            .ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "chunk not cached"))
275            .map(|stub| stub.positions.iter().map(|p| p.to_cache_data()).collect())
276    }
277}