cyfs_lib/traversal/
traverser.rs1use super::adapter::ObjectMapNOCCacheTranverseAdapter;
2use super::object::*;
3use crate::*;
4use cyfs_base::*;
5use cyfs_util::AsyncReadWithSeek;
6
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::sync::{Arc, Mutex};
10
11#[async_trait::async_trait]
12pub trait ObjectTraverserFilter {}
13
14pub type ObjectTraverserFilterRef = Arc<Box<dyn ObjectTraverserFilter>>;
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum ObjectTraverseFilterResult {
18 Skip,
19 Keep(Option<u32>),
20}
21
22#[async_trait::async_trait]
23pub trait ObjectTraverserHandler: Send + Sync {
24 async fn filter_path(&self, path: &str) -> ObjectTraverseFilterResult;
25 async fn filter_object(
26 &self,
27 object: &NONObjectInfo,
28 meta: Option<&NamedObjectMetaData>,
29 ) -> ObjectTraverseFilterResult;
30
31 async fn on_error(&self, id: &ObjectId, e: BuckyError) -> BuckyResult<()>;
32 async fn on_missing(&self, id: &ObjectId) -> BuckyResult<()>;
33
34 async fn on_object(
35 &self,
36 object: &NONObjectInfo,
37 meta: &Option<NamedObjectMetaData>,
38 ) -> BuckyResult<()>;
39 async fn on_chunk(&self, chunk_id: &ChunkId) -> BuckyResult<()>;
40}
41
42pub type ObjectTraverserHandlerRef = Arc<Box<dyn ObjectTraverserHandler>>;
43
44pub struct ObjectTraverserLoaderObjectData {
45 pub object: NONObjectInfo,
46 pub meta: Option<NamedObjectMetaData>,
47}
48
49#[async_trait::async_trait]
50pub trait ObjectTraverserLoader: Send + Sync {
51 async fn get_object(
52 &self,
53 object_id: &ObjectId,
54 ) -> BuckyResult<Option<ObjectTraverserLoaderObjectData>>;
55 async fn get_chunk(
56 &self,
57 chunk_id: &ChunkId,
58 ) -> BuckyResult<Option<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>>>;
59}
60
61pub type ObjectTraverserLoaderRef = Arc<Box<dyn ObjectTraverserLoader>>;
62
63struct ObjectTraverserColls {
64 index: HashSet<ObjectId>,
65 pending_items: VecDeque<NormalObject>,
66}
67
68impl ObjectTraverserColls {
69 pub fn new() -> Self {
70 Self {
71 index: HashSet::new(),
72 pending_items: VecDeque::new(),
73 }
74 }
75}
76
77#[derive(Clone)]
78pub struct ObjectTraverser {
79 coll: Arc<Mutex<ObjectTraverserColls>>,
80
81 loader: ObjectTraverserLoaderRef,
82 handler: ObjectTraverserHandlerRef,
83
84 objectmap_cache: ObjectMapRootCacheRef,
85}
86
87impl ObjectTraverser {
88 pub fn new(loader: ObjectTraverserLoaderRef, handler: ObjectTraverserHandlerRef) -> Self {
89 let cache = ObjectMapNOCCacheTranverseAdapter::new_noc_cache(loader.clone());
90 let objectmap_cache = ObjectMapRootMemoryCache::new_default_ref(None, cache);
91
92 Self {
93 coll: Arc::new(Mutex::new(ObjectTraverserColls::new())),
94 loader,
95 handler,
96 objectmap_cache,
97 }
98 }
99
100 pub async fn run(&self, root: ObjectId) -> BuckyResult<()> {
101 assert_eq!(root.obj_type_code(), ObjectTypeCode::ObjectMap);
102
103 let ret = self.loader.get_object(&root).await?;
104 if ret.is_none() {
105 warn!("root object missing! root={}", root);
106 self.handler.on_missing(&root).await?;
107
108 return Ok(());
109 }
110
111 let data = ret.unwrap();
112 self.handler.on_object(&data.object, &data.meta).await?;
113
114 let filter_ret = self.handler.filter_path("/").await;
115 let config_ref_depth = match filter_ret {
116 ObjectTraverseFilterResult::Keep(config_ref_depth) => config_ref_depth.unwrap_or(1),
117 ObjectTraverseFilterResult::Skip => {
118 return Ok(());
119 }
120 };
121
122 let item = NormalObject {
123 pos: NormalObjectPostion::Middle,
124 path: "/".to_owned(),
125 object: data.object.into(),
126 config_ref_depth,
127 ref_depth: 0,
128 };
129 self.append(item);
130
131 let op_env_cache = ObjectMapOpEnvMemoryCache::new_ref(self.objectmap_cache.clone());
132 let cb = Arc::new(Box::new(self.clone()) as Box<dyn ObjectTraverserCallBack>);
133
134 loop {
135 let next = self.fetch();
136 if next.is_none() {
137 break Ok(());
138 }
139
140 let item = next.unwrap();
141 match item.object.object_id.obj_type_code() {
142 ObjectTypeCode::ObjectMap => {
143 let traverser = ObjectMapTraverser::new(op_env_cache.clone(), item, cb.clone());
144 traverser.tranverse().await?;
145 }
146 _ => {
147 assert!(!item.object.is_empty());
148 let traverser = CommonObjectTraverser::new(item, cb.clone());
165 traverser.tranverse().await?;
166
167 let item = traverser.finish();
168
169 match item.object.object_id.obj_type_code() {
170 ObjectTypeCode::File => {
171 let traverser = FileObjectTraverser::new(item, cb.clone());
172 traverser.tranverse().await?;
173 }
174 ObjectTypeCode::Dir => {
175 let mut traverser =
176 DirObjectTraverser::new(self.loader.clone(), item, cb.clone());
177 traverser.tranverse().await?;
178 }
179 _ => {}
180 }
181 }
182 }
183 }
184 }
185
186 fn dedup(&self, object_id: &ObjectId) -> bool {
187 let mut coll = self.coll.lock().unwrap();
188 !coll.index.insert(object_id.to_owned())
189 }
190
191 fn append(&self, item: NormalObject) {
192 assert!(!item.object.is_empty());
193
194 self.coll.lock().unwrap().pending_items.push_back(item);
195 }
196
197 fn fetch(&self) -> Option<NormalObject> {
198 self.coll.lock().unwrap().pending_items.pop_front()
199 }
200
201 async fn process_object(&self) {}
202}
203
204#[async_trait::async_trait]
205impl ObjectTraverserCallBack for ObjectTraverser {
206 async fn on_object(&self, item: TraverseObjectItem) -> BuckyResult<()> {
207 let id = item.object_id();
208 if !self.dedup(id) {
209 return Ok(());
210 }
211
212 match self.loader.get_object(id).await {
213 Ok(Some(data)) => {
214 self.handler.on_object(&data.object, &data.meta).await?;
215
216 match item {
217 TraverseObjectItem::Normal(mut item) => {
218 assert!(item.object.is_empty());
219
220 let filter_ret = match item.pos {
221 NormalObjectPostion::Middle => {
222 self.handler.filter_path(&item.path).await
223 }
224 NormalObjectPostion::Leaf | NormalObjectPostion::Assoc => {
225 if item.ref_depth >= item.config_ref_depth {
226 info!(
227 "will skip object on ref_depth > config_ref_depth: {:?}",
228 item
229 );
230 return Ok(());
231 }
232
233 self.handler
234 .filter_object(&data.object, data.meta.as_ref())
235 .await
236 }
237 };
238
239 match filter_ret {
240 ObjectTraverseFilterResult::Keep(config_ref_depth) => {
241 if let Some(config_ref_depth) = config_ref_depth {
242 if config_ref_depth == 0 {
243 info!("will skip object on filter's config_ref_depth is 0: {:?}", item);
244 return Ok(());
245 }
246
247 item.config_ref_depth = config_ref_depth;
248 if item.ref_depth >= item.config_ref_depth {
249 info!("will skip object on ref_depth >= config_ref_depth: {:?}", item);
250 return Ok(());
251 }
252 }
253
254 item.object = data.object.into();
255 self.append(item);
256 }
257 ObjectTraverseFilterResult::Skip => {
258 info!("will skip object: {:?}", item);
259 }
260 }
261 }
262 TraverseObjectItem::Sub(_) => {
263 }
265 }
266 }
267 Ok(None) => {
268 self.handler.on_missing(id).await?;
269 }
270 Err(e) => {
271 self.handler.on_error(id, e).await?;
272 }
273 }
274
275 Ok(())
276 }
277
278 async fn on_chunk(&self, item: TraverseChunkItem) -> BuckyResult<()> {
279 self.handler.on_chunk(&item.chunk_id).await
280 }
281
282 async fn on_error(&self, id: &ObjectId, e: BuckyError) -> BuckyResult<()> {
283 self.handler.on_error(id, e).await
284 }
285
286 async fn on_missing(&self, id: &ObjectId) -> BuckyResult<()> {
287 self.handler.on_missing(id).await
288 }
289}