cyfs_lib/traversal/object/
dir.rs

1use super::super::traverser::ObjectTraverserLoaderRef;
2use super::def::*;
3use cyfs_base::*;
4
5use async_std::io::prelude::*;
6use std::borrow::Cow;
7
8pub struct DirObjectTraverser {
9    loader: ObjectTraverserLoaderRef,
10    current: NormalObject,
11    cb: ObjectTraverserCallBackRef,
12
13    body_obj_list: Option<DirBodyContentObjectList>,
14}
15
16impl DirObjectTraverser {
17    pub fn new(
18        loader: ObjectTraverserLoaderRef,
19        current: NormalObject,
20        cb: ObjectTraverserCallBackRef,
21    ) -> Self {
22        Self {
23            loader,
24            current,
25            cb,
26            body_obj_list: None,
27        }
28    }
29
30    fn dir_id(&self) -> &ObjectId {
31        &self.current.object.object_id
32    }
33
34    fn dir(&self) -> &Dir {
35        let object = self.current.object.object.as_ref().unwrap();
36        object.as_dir()
37    }
38
39    pub async fn tranverse(&mut self) -> BuckyResult<()> {
40        // First init body chunk
41        if let Err(e) = self.load_body_chunk().await {
42            self.cb.on_error(self.dir_id(), e).await?;
43            return Ok(());
44        }
45
46        // Init desc obj list
47        let dir = self.dir();
48        let desc_obj_list = match dir.desc().content().obj_list() {
49            NDNObjectInfo::Chunk(chunk_id) => {
50                match self.load_chunk_from_body_and_reader(chunk_id).await {
51                    Ok(chunk) => {
52                        match &chunk {
53                            Cow::Owned(_) => {
54                                let item = TraverseChunkItem {
55                                    chunk_id: chunk_id.to_owned(),
56                                };
57                                self.cb.on_chunk(item).await?;
58                            }
59                            Cow::Borrowed(_) => {}
60                        }
61
62                        match NDNObjectList::clone_from_slice(&chunk) {
63                            Ok(obj_list) => Cow::Owned(obj_list),
64                            Err(e) => {
65                                let msg = format!("decode dir desc chunk to obj list failed! dir={}, chunk={}, {}", self.current.object.object_id, chunk_id, e);
66                                error!("{}", msg);
67                                let e = BuckyError::new(BuckyErrorCode::InvalidData, msg);
68                                self.cb.on_error(self.dir_id(), e).await?;
69                                return Ok(());
70                            }
71                        }
72                    }
73                    Err(e) => {
74                        self.cb.on_error(self.dir_id(), e).await?;
75                        return Ok(());
76                    }
77                }
78            }
79            NDNObjectInfo::ObjList(list) => Cow::Borrowed(list),
80        };
81
82        self.append_desc_obj_list(&desc_obj_list).await
83    }
84
85    async fn load_body_chunk(&mut self) -> BuckyResult<()> {
86        let dir = self.dir();
87
88        if let Some(body) = dir.body() {
89            match body.content() {
90                DirBodyContent::Chunk(chunk_id) => {
91                    let item = TraverseChunkItem {
92                        chunk_id: chunk_id.to_owned(),
93                    };
94                    self.cb.on_chunk(item).await?;
95
96                    match self.loader.get_chunk(chunk_id).await {
97                        Ok(Some(mut reader)) => {
98                            let mut buf = Vec::with_capacity(chunk_id.len());
99                            reader.read_to_end(&mut buf).await.map_err(|e| {
100                                let msg = format!(
101                                    "read dir body's chunk to buf failed! dir={}, chunk={}, {}",
102                                    self.dir_id(),
103                                    chunk_id,
104                                    e
105                                );
106                                error!("{}", msg);
107                                let e: BuckyError = e.into();
108                                BuckyError::new(e.code(), msg)
109                            })?;
110
111                            let obj_list = DirBodyContentObjectList::clone_from_slice(&buf).map_err(|e|{
112                                let msg = format!(
113                                    "decode dir body's chunk to object list failed! dir={}, chunk={}, {}",
114                                    self.dir_id(),
115                                    chunk_id,
116                                    e
117                                );
118                                error!("{}", msg);
119                                BuckyError::new(e.code(), msg)
120                            })?;
121
122                            assert!(self.body_obj_list.is_none());
123                            self.body_obj_list = Some(obj_list);
124                            Ok(())
125                        }
126                        Ok(None) => {
127                            let msg = format!(
128                                "get dir body's chunk but not found! dir={}, chunk={}",
129                                self.dir_id(),
130                                chunk_id
131                            );
132                            error!("{}", msg);
133                            self.cb.on_missing(chunk_id.as_object_id()).await?;
134
135                            Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
136                        }
137                        Err(e) => {
138                            let msg = format!(
139                                "get dir body's chunk failed! dir={}, chunk={}, {}",
140                                self.dir_id(),
141                                chunk_id,
142                                e
143                            );
144                            error!("{}", msg);
145                            Err(BuckyError::new(e.code(), msg))
146                        }
147                    }
148                }
149                DirBodyContent::ObjList(_list) => Ok(()),
150            }
151        } else {
152            Ok(())
153        }
154    }
155
156    async fn load_chunk_from_body_and_reader<'a>(
157        &'a self,
158        chunk_id: &ChunkId,
159    ) -> BuckyResult<Cow<'a, Vec<u8>>> {
160        let dir = self.dir();
161        let chunk = if let Some(body) = dir.body() {
162            match body.content() {
163                DirBodyContent::Chunk(chunk_id) => self
164                    .body_obj_list
165                    .as_ref()
166                    .unwrap()
167                    .get(chunk_id.as_object_id()),
168                DirBodyContent::ObjList(list) => list.get(chunk_id.as_object_id()),
169            }
170        } else {
171            None
172        };
173
174        if let Some(chunk) = chunk {
175            return Ok(Cow::Borrowed(chunk));
176        }
177
178        // load from reader
179        match self.loader.get_chunk(chunk_id).await {
180            Ok(Some(mut reader)) => {
181                let mut buf = Vec::with_capacity(chunk_id.len());
182                reader.read_to_end(&mut buf).await.map_err(|e| {
183                    let msg = format!(
184                        "read dir desc's chunk to buf failed! dir={}, chunk={}, {}",
185                        self.dir_id(),
186                        chunk_id,
187                        e
188                    );
189                    error!("{}", msg);
190                    let e: BuckyError = e.into();
191                    BuckyError::new(e.code(), msg)
192                })?;
193
194                Ok(Cow::Owned(buf))
195            }
196            Ok(None) => {
197                let msg = format!(
198                    "get dir body's chunk but not found! dir={}, chunk={}",
199                    self.dir_id(),
200                    chunk_id
201                );
202                error!("{}", msg);
203                self.cb.on_missing(chunk_id.as_object_id()).await?;
204
205                Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
206            }
207            Err(e) => {
208                let msg = format!(
209                    "get dir body's chunk failed! dir={}, chunk={}, {}",
210                    self.dir_id(),
211                    chunk_id,
212                    e
213                );
214                error!("{}", msg);
215                Err(BuckyError::new(e.code(), msg))
216            }
217        }
218    }
219
220    async fn append_desc_obj_list(&self, list: &NDNObjectList) -> BuckyResult<()> {
221        let body_map = self.body_map();
222
223        if let Some(ref parent) = list.parent_chunk {
224            self.append_object(parent.as_object_id(), &body_map).await?;
225        }
226
227        for (_k, v) in &list.object_map {
228            match v.node() {
229                InnerNode::ObjId(id) => {
230                    self.append_object(id, &body_map).await?;
231                }
232                InnerNode::Chunk(id) => {
233                    self.append_object(id.as_object_id(), &body_map).await?;
234                }
235                InnerNode::IndexInParentChunk(_, _) => {}
236            }
237        }
238
239        Ok(())
240    }
241
242    async fn append_object(
243        &self,
244        id: &ObjectId,
245        body_map: &Option<&DirBodyContentObjectList>,
246    ) -> BuckyResult<()> {
247        if let Some(body_map) = body_map {
248            if body_map.contains_key(id) {
249                return Ok(());
250            }
251        }
252
253        debug!(
254            "dir assoc object not exists in body and local: dir={}, object={}",
255            self.dir_id(),
256            id
257        );
258
259        match id.obj_type_code() {
260            ObjectTypeCode::Chunk => {
261                let item = TraverseChunkItem {
262                    chunk_id: id.as_chunk_id().to_owned(),
263                };
264
265                self.cb.on_chunk(item).await
266            }
267            _ => {
268                let item = self
269                    .current
270                    .derive_normal(self.dir_id().to_owned(), None, true);
271                let item = TraverseObjectItem::Normal(item);
272                self.cb.on_object(item).await
273            }
274        }
275    }
276
277    fn body_map(&self) -> Option<&DirBodyContentObjectList> {
278        if let Some(body) = self.dir().body() {
279            match body.content() {
280                DirBodyContent::Chunk(_) => {
281                    assert!(self.body_obj_list.is_some());
282                    self.body_obj_list.as_ref()
283                }
284                DirBodyContent::ObjList(list) => Some(list),
285            }
286        } else {
287            None
288        }
289    }
290}