cyfs_lib/traversal/
traverser.rs

1use super::adapter::ObjectMapNOCCacheTranverseAdapter;
2use super::object::*;
3use crate::*;
4use cyfs_base::*;
5use cyfs_util::AsyncReadWithSeek;
6
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::sync::{Arc, Mutex};
10
11#[async_trait::async_trait]
12pub trait ObjectTraverserFilter {}
13
14pub type ObjectTraverserFilterRef = Arc<Box<dyn ObjectTraverserFilter>>;
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum ObjectTraverseFilterResult {
18    Skip,
19    Keep(Option<u32>),
20}
21
22#[async_trait::async_trait]
23pub trait ObjectTraverserHandler: Send + Sync {
24    async fn filter_path(&self, path: &str) -> ObjectTraverseFilterResult;
25    async fn filter_object(
26        &self,
27        object: &NONObjectInfo,
28        meta: Option<&NamedObjectMetaData>,
29    ) -> ObjectTraverseFilterResult;
30
31    async fn on_error(&self, id: &ObjectId, e: BuckyError) -> BuckyResult<()>;
32    async fn on_missing(&self, id: &ObjectId) -> BuckyResult<()>;
33
34    async fn on_object(
35        &self,
36        object: &NONObjectInfo,
37        meta: &Option<NamedObjectMetaData>,
38    ) -> BuckyResult<()>;
39    async fn on_chunk(&self, chunk_id: &ChunkId) -> BuckyResult<()>;
40}
41
42pub type ObjectTraverserHandlerRef = Arc<Box<dyn ObjectTraverserHandler>>;
43
44pub struct ObjectTraverserLoaderObjectData {
45    pub object: NONObjectInfo,
46    pub meta: Option<NamedObjectMetaData>,
47}
48
49#[async_trait::async_trait]
50pub trait ObjectTraverserLoader: Send + Sync {
51    async fn get_object(
52        &self,
53        object_id: &ObjectId,
54    ) -> BuckyResult<Option<ObjectTraverserLoaderObjectData>>;
55    async fn get_chunk(
56        &self,
57        chunk_id: &ChunkId,
58    ) -> BuckyResult<Option<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>>>;
59}
60
61pub type ObjectTraverserLoaderRef = Arc<Box<dyn ObjectTraverserLoader>>;
62
63struct ObjectTraverserColls {
64    index: HashSet<ObjectId>,
65    pending_items: VecDeque<NormalObject>,
66}
67
68impl ObjectTraverserColls {
69    pub fn new() -> Self {
70        Self {
71            index: HashSet::new(),
72            pending_items: VecDeque::new(),
73        }
74    }
75}
76
77#[derive(Clone)]
78pub struct ObjectTraverser {
79    coll: Arc<Mutex<ObjectTraverserColls>>,
80
81    loader: ObjectTraverserLoaderRef,
82    handler: ObjectTraverserHandlerRef,
83
84    objectmap_cache: ObjectMapRootCacheRef,
85}
86
87impl ObjectTraverser {
88    pub fn new(loader: ObjectTraverserLoaderRef, handler: ObjectTraverserHandlerRef) -> Self {
89        let cache = ObjectMapNOCCacheTranverseAdapter::new_noc_cache(loader.clone());
90        let objectmap_cache = ObjectMapRootMemoryCache::new_default_ref(None, cache);
91
92        Self {
93            coll: Arc::new(Mutex::new(ObjectTraverserColls::new())),
94            loader,
95            handler,
96            objectmap_cache,
97        }
98    }
99
100    pub async fn run(&self, root: ObjectId) -> BuckyResult<()> {
101        assert_eq!(root.obj_type_code(), ObjectTypeCode::ObjectMap);
102
103        let ret = self.loader.get_object(&root).await?;
104        if ret.is_none() {
105            warn!("root object missing! root={}", root);
106            self.handler.on_missing(&root).await?;
107
108            return Ok(());
109        }
110
111        let data = ret.unwrap();
112        self.handler.on_object(&data.object, &data.meta).await?;
113
114        let filter_ret = self.handler.filter_path("/").await;
115        let config_ref_depth = match filter_ret {
116            ObjectTraverseFilterResult::Keep(config_ref_depth) => config_ref_depth.unwrap_or(1),
117            ObjectTraverseFilterResult::Skip => {
118                return Ok(());
119            }
120        };
121
122        let item = NormalObject {
123            pos: NormalObjectPostion::Middle,
124            path: "/".to_owned(),
125            object: data.object.into(),
126            config_ref_depth,
127            ref_depth: 0,
128        };
129        self.append(item);
130
131        let op_env_cache = ObjectMapOpEnvMemoryCache::new_ref(self.objectmap_cache.clone());
132        let cb = Arc::new(Box::new(self.clone()) as Box<dyn ObjectTraverserCallBack>);
133
134        loop {
135            let next = self.fetch();
136            if next.is_none() {
137                break Ok(());
138            }
139
140            let item = next.unwrap();
141            match item.object.object_id.obj_type_code() {
142                ObjectTypeCode::ObjectMap => {
143                    let traverser = ObjectMapTraverser::new(op_env_cache.clone(), item, cb.clone());
144                    traverser.tranverse().await?;
145                }
146                _ => {
147                    assert!(!item.object.is_empty());
148                    /*
149                    let data = match self.load_object(&item.object.object_id).await {
150                        Ok(Some(info)) => info,
151                        Ok(None) => {
152                            self.handler.on_missing(&item.object.object_id).await;
153                            continue;
154                        }
155                        Err(e) => {
156                            self.handler.on_error(&item.object.object_id, e).await;
157                            continue;
158                        }
159                    };
160
161                    item.object = data.object.into();
162                    */
163
164                    let traverser = CommonObjectTraverser::new(item, cb.clone());
165                    traverser.tranverse().await?;
166
167                    let item = traverser.finish();
168
169                    match item.object.object_id.obj_type_code() {
170                        ObjectTypeCode::File => {
171                            let traverser = FileObjectTraverser::new(item, cb.clone());
172                            traverser.tranverse().await?;
173                        }
174                        ObjectTypeCode::Dir => {
175                            let mut traverser =
176                                DirObjectTraverser::new(self.loader.clone(), item, cb.clone());
177                            traverser.tranverse().await?;
178                        }
179                        _ => {}
180                    }
181                }
182            }
183        }
184    }
185
186    fn dedup(&self, object_id: &ObjectId) -> bool {
187        let mut coll = self.coll.lock().unwrap();
188        !coll.index.insert(object_id.to_owned())
189    }
190
191    fn append(&self, item: NormalObject) {
192        assert!(!item.object.is_empty());
193
194        self.coll.lock().unwrap().pending_items.push_back(item);
195    }
196
197    fn fetch(&self) -> Option<NormalObject> {
198        self.coll.lock().unwrap().pending_items.pop_front()
199    }
200
201    async fn process_object(&self) {}
202}
203
204#[async_trait::async_trait]
205impl ObjectTraverserCallBack for ObjectTraverser {
206    async fn on_object(&self, item: TraverseObjectItem) -> BuckyResult<()> {
207        let id = item.object_id();
208        if !self.dedup(id) {
209            return Ok(());
210        }
211
212        match self.loader.get_object(id).await {
213            Ok(Some(data)) => {
214                self.handler.on_object(&data.object, &data.meta).await?;
215
216                match item {
217                    TraverseObjectItem::Normal(mut item) => {
218                        assert!(item.object.is_empty());
219
220                        let filter_ret = match item.pos {
221                            NormalObjectPostion::Middle => {
222                                self.handler.filter_path(&item.path).await
223                            }
224                            NormalObjectPostion::Leaf | NormalObjectPostion::Assoc => {
225                                if item.ref_depth >= item.config_ref_depth {
226                                    info!(
227                                        "will skip object on ref_depth > config_ref_depth: {:?}",
228                                        item
229                                    );
230                                    return Ok(());
231                                }
232
233                                self.handler
234                                    .filter_object(&data.object, data.meta.as_ref())
235                                    .await
236                            }
237                        };
238
239                        match filter_ret {
240                            ObjectTraverseFilterResult::Keep(config_ref_depth) => {
241                                if let Some(config_ref_depth) = config_ref_depth {
242                                    if config_ref_depth == 0 {
243                                        info!("will skip object on filter's config_ref_depth is 0: {:?}", item);
244                                        return Ok(());
245                                    }
246
247                                    item.config_ref_depth = config_ref_depth;
248                                    if item.ref_depth >= item.config_ref_depth {
249                                        info!("will skip object on ref_depth >= config_ref_depth: {:?}", item);
250                                        return Ok(());
251                                    }
252                                }
253
254                                item.object = data.object.into();
255                                self.append(item);
256                            }
257                            ObjectTraverseFilterResult::Skip => {
258                                info!("will skip object: {:?}", item);
259                            }
260                        }
261                    }
262                    TraverseObjectItem::Sub(_) => {
263                        // do nothing
264                    }
265                }
266            }
267            Ok(None) => {
268                self.handler.on_missing(id).await?;
269            }
270            Err(e) => {
271                self.handler.on_error(id, e).await?;
272            }
273        }
274
275        Ok(())
276    }
277
278    async fn on_chunk(&self, item: TraverseChunkItem) -> BuckyResult<()> {
279        self.handler.on_chunk(&item.chunk_id).await
280    }
281
282    async fn on_error(&self, id: &ObjectId, e: BuckyError) -> BuckyResult<()> {
283        self.handler.on_error(id, e).await
284    }
285
286    async fn on_missing(&self, id: &ObjectId) -> BuckyResult<()> {
287        self.handler.on_missing(id).await
288    }
289}