cyfs_util/util/
file_obj_builder.rs

1use crate::*;
2use cyfs_base::*;
3
4use async_std::sync::{Mutex, MutexGuard};
5use futures::{AsyncReadExt, AsyncSeekExt};
6use cyfs_sha2::Digest;
7use std::io::SeekFrom;
8use std::path::Path;
9use std::sync::Arc;
10
11#[async_trait::async_trait]
12pub trait FileObjectBuilderState: Send + Clone {
13    async fn get_cur_state(&self) -> BuckyResult<(u64, (u64, [u32; 8]), Vec<ChunkId>)>;
14    async fn update(
15        &mut self,
16        pos: u64,
17        hash_state: (u64, &[u32; 8]),
18        chunk_id: ChunkId,
19    ) -> BuckyResult<()>;
20}
21
22#[derive(Clone)]
23pub struct FileObjectBuilderStateWrapper<T: FileObjectBuilderState> {
24    state: Arc<Mutex<T>>,
25}
26
27impl<T: FileObjectBuilderState> FileObjectBuilderStateWrapper<T> {
28    pub fn new(state: T) -> Self {
29        Self {
30            state: Arc::new(Mutex::new(state)),
31        }
32    }
33
34    async fn get_cur_state(&self) -> BuckyResult<(u64, (u64, [u32; 8]), Vec<ChunkId>)> {
35        let t = self.state.lock().await;
36        t.get_cur_state().await
37    }
38
39    async fn update(
40        &self,
41        pos: u64,
42        hash_state: (u64, &[u32; 8]),
43        chunk_id: ChunkId,
44    ) -> BuckyResult<()> {
45        let mut t = self.state.lock().await;
46        t.update(pos, hash_state, chunk_id).await
47    }
48
49    pub async fn get_state(&self) -> T {
50        let t = self.state.lock().await;
51        t.clone()
52    }
53
54    pub async fn get_state_mut(&self) -> MutexGuard<'_, T> {
55        self.state.lock().await
56    }
57}
58
59pub struct FileObjectBuilder<T: FileObjectBuilderState> {
60    local_path: String,
61    owner: ObjectId,
62    chunk_size: u32,
63    state: Option<FileObjectBuilderStateWrapper<T>>,
64}
65
66impl<T: FileObjectBuilderState> FileObjectBuilder<T> {
67    pub fn new(
68        local_path: String,
69        owner: ObjectId,
70        chunk_size: u32,
71        state: Option<FileObjectBuilderStateWrapper<T>>,
72    ) -> Self {
73        Self {
74            local_path,
75            owner,
76            chunk_size,
77            state,
78        }
79    }
80
81    async fn get_file_time(path: &Path) -> BuckyResult<(u64, u64, u64)> {
82        let metadata = async_std::fs::metadata(path).await?;
83        let modify_time = metadata.modified()?;
84        let create_time = match metadata.created() {
85            Ok(create_time) => create_time,
86            Err(_) => modify_time,
87        };
88        let modify_time = system_time_to_bucky_time(&modify_time);
89        let create_time = system_time_to_bucky_time(&create_time);
90        let access_time = metadata.accessed()?;
91        let access_time = system_time_to_bucky_time(&access_time);
92        Ok((create_time, modify_time, access_time))
93    }
94
95    pub async fn build(&self) -> BuckyResult<File> {
96        let path = Path::new(self.local_path.as_str());
97        if !path.is_file() {
98            let msg = format!("{} is not file", self.local_path.as_str());
99            log::error!("{}", msg.as_str());
100            return Err(BuckyError::new(BuckyErrorCode::InvalidParam, msg));
101        }
102
103        if self.chunk_size % 64 != 0 {
104            let msg = format!("chunk size {} mod 64 is not zero", self.chunk_size);
105            log::error!("{}", msg.as_str());
106            return Err(BuckyError::new(BuckyErrorCode::InvalidParam, msg));
107        }
108
109        let mut file = async_std::fs::File::open(self.local_path.as_str())
110            .await
111            .map_err(|e| {
112                let msg = format!(
113                    "open file for calc chunk list error! file={}, {}",
114                    self.local_path.as_str(),
115                    e
116                );
117                error!("{}", msg);
118
119                BuckyError::new(BuckyErrorCode::IoError, msg)
120            })?;
121
122        let (pos, hash_state, mut list) = {
123            if self.state.is_some() {
124                self.state.as_ref().unwrap().get_cur_state().await?
125            } else {
126                (0, (0, [0u32; 8]), Vec::new())
127            }
128        };
129        let mut file_sha256 = if pos == 0 {
130            cyfs_sha2::Sha256::new()
131        } else {
132            file.seek(SeekFrom::Start(pos)).await.map_err(|e| {
133                let msg = format!(
134                    "seek file {} to {} failed.{}",
135                    self.local_path.as_str(),
136                    pos,
137                    e
138                );
139                log::error!("{}", msg.as_str());
140                BuckyError::new(BuckyErrorCode::IoError, msg)
141            })?;
142            cyfs_sha2::Sha256::from((hash_state.0, &hash_state.1))
143        };
144
145        let mut file_len = pos as usize;
146        let mut file_hash = None;
147        let mut buf = Vec::with_capacity(self.chunk_size as usize);
148
149        unsafe {
150            buf.set_len(self.chunk_size as usize);
151        }
152        loop {
153            let len = file.read(&mut buf).await.map_err(|e| {
154                let msg = format!("read file {} failed.{}", self.local_path.as_str(), e);
155                log::error!("{}", msg.as_str());
156                BuckyError::new(BuckyErrorCode::IoError, msg)
157            })?;
158            if len == 0 {
159                break;
160            }
161
162            let hash = hash_data(&buf[0..len]);
163            let chunk_id = ChunkId::new(&hash, len as u32);
164
165            debug!(
166                "got file chunk: id={}, len={}, file={}, ",
167                chunk_id,
168                len,
169                self.local_path.as_str()
170            );
171            list.push(chunk_id.clone());
172            file_len += len;
173
174            // 判断是不是最后一个chunk
175            if len < self.chunk_size as usize {
176                if file_len == len {
177                    // 只有一个block的情况,不需要再hash一次了
178                    assert!(file_hash.is_none());
179                    file_hash = Some(hash);
180                } else {
181                    file_sha256.input(&buf[0..len]);
182                }
183                break;
184            }
185
186            file_sha256.input(&buf[0..len]);
187
188            if self.state.is_some() {
189                self.state
190                    .as_ref()
191                    .unwrap()
192                    .update(file_len as u64, file_sha256.get_state(), chunk_id)
193                    .await?;
194            }
195        }
196
197        let file_hash: HashValue = match file_hash {
198            Some(v) => v,
199            None => file_sha256.result().into(),
200        };
201
202        log::info!("file_hash {}", file_hash.to_string());
203        let (create_time, _, _) = Self::get_file_time(Path::new(self.local_path.as_str())).await?;
204        let file = File::new(
205            self.owner.clone(),
206            file_len as u64,
207            file_hash,
208            ChunkList::ChunkInList(list),
209        )
210        .create_time(create_time)
211        .build();
212        Ok(file)
213    }
214}