cyfs_util/util/
file_obj_builder.rs1use crate::*;
2use cyfs_base::*;
3
4use async_std::sync::{Mutex, MutexGuard};
5use futures::{AsyncReadExt, AsyncSeekExt};
6use cyfs_sha2::Digest;
7use std::io::SeekFrom;
8use std::path::Path;
9use std::sync::Arc;
10
11#[async_trait::async_trait]
12pub trait FileObjectBuilderState: Send + Clone {
13 async fn get_cur_state(&self) -> BuckyResult<(u64, (u64, [u32; 8]), Vec<ChunkId>)>;
14 async fn update(
15 &mut self,
16 pos: u64,
17 hash_state: (u64, &[u32; 8]),
18 chunk_id: ChunkId,
19 ) -> BuckyResult<()>;
20}
21
22#[derive(Clone)]
23pub struct FileObjectBuilderStateWrapper<T: FileObjectBuilderState> {
24 state: Arc<Mutex<T>>,
25}
26
27impl<T: FileObjectBuilderState> FileObjectBuilderStateWrapper<T> {
28 pub fn new(state: T) -> Self {
29 Self {
30 state: Arc::new(Mutex::new(state)),
31 }
32 }
33
34 async fn get_cur_state(&self) -> BuckyResult<(u64, (u64, [u32; 8]), Vec<ChunkId>)> {
35 let t = self.state.lock().await;
36 t.get_cur_state().await
37 }
38
39 async fn update(
40 &self,
41 pos: u64,
42 hash_state: (u64, &[u32; 8]),
43 chunk_id: ChunkId,
44 ) -> BuckyResult<()> {
45 let mut t = self.state.lock().await;
46 t.update(pos, hash_state, chunk_id).await
47 }
48
49 pub async fn get_state(&self) -> T {
50 let t = self.state.lock().await;
51 t.clone()
52 }
53
54 pub async fn get_state_mut(&self) -> MutexGuard<'_, T> {
55 self.state.lock().await
56 }
57}
58
59pub struct FileObjectBuilder<T: FileObjectBuilderState> {
60 local_path: String,
61 owner: ObjectId,
62 chunk_size: u32,
63 state: Option<FileObjectBuilderStateWrapper<T>>,
64}
65
66impl<T: FileObjectBuilderState> FileObjectBuilder<T> {
67 pub fn new(
68 local_path: String,
69 owner: ObjectId,
70 chunk_size: u32,
71 state: Option<FileObjectBuilderStateWrapper<T>>,
72 ) -> Self {
73 Self {
74 local_path,
75 owner,
76 chunk_size,
77 state,
78 }
79 }
80
81 async fn get_file_time(path: &Path) -> BuckyResult<(u64, u64, u64)> {
82 let metadata = async_std::fs::metadata(path).await?;
83 let modify_time = metadata.modified()?;
84 let create_time = match metadata.created() {
85 Ok(create_time) => create_time,
86 Err(_) => modify_time,
87 };
88 let modify_time = system_time_to_bucky_time(&modify_time);
89 let create_time = system_time_to_bucky_time(&create_time);
90 let access_time = metadata.accessed()?;
91 let access_time = system_time_to_bucky_time(&access_time);
92 Ok((create_time, modify_time, access_time))
93 }
94
95 pub async fn build(&self) -> BuckyResult<File> {
96 let path = Path::new(self.local_path.as_str());
97 if !path.is_file() {
98 let msg = format!("{} is not file", self.local_path.as_str());
99 log::error!("{}", msg.as_str());
100 return Err(BuckyError::new(BuckyErrorCode::InvalidParam, msg));
101 }
102
103 if self.chunk_size % 64 != 0 {
104 let msg = format!("chunk size {} mod 64 is not zero", self.chunk_size);
105 log::error!("{}", msg.as_str());
106 return Err(BuckyError::new(BuckyErrorCode::InvalidParam, msg));
107 }
108
109 let mut file = async_std::fs::File::open(self.local_path.as_str())
110 .await
111 .map_err(|e| {
112 let msg = format!(
113 "open file for calc chunk list error! file={}, {}",
114 self.local_path.as_str(),
115 e
116 );
117 error!("{}", msg);
118
119 BuckyError::new(BuckyErrorCode::IoError, msg)
120 })?;
121
122 let (pos, hash_state, mut list) = {
123 if self.state.is_some() {
124 self.state.as_ref().unwrap().get_cur_state().await?
125 } else {
126 (0, (0, [0u32; 8]), Vec::new())
127 }
128 };
129 let mut file_sha256 = if pos == 0 {
130 cyfs_sha2::Sha256::new()
131 } else {
132 file.seek(SeekFrom::Start(pos)).await.map_err(|e| {
133 let msg = format!(
134 "seek file {} to {} failed.{}",
135 self.local_path.as_str(),
136 pos,
137 e
138 );
139 log::error!("{}", msg.as_str());
140 BuckyError::new(BuckyErrorCode::IoError, msg)
141 })?;
142 cyfs_sha2::Sha256::from((hash_state.0, &hash_state.1))
143 };
144
145 let mut file_len = pos as usize;
146 let mut file_hash = None;
147 let mut buf = Vec::with_capacity(self.chunk_size as usize);
148
149 unsafe {
150 buf.set_len(self.chunk_size as usize);
151 }
152 loop {
153 let len = file.read(&mut buf).await.map_err(|e| {
154 let msg = format!("read file {} failed.{}", self.local_path.as_str(), e);
155 log::error!("{}", msg.as_str());
156 BuckyError::new(BuckyErrorCode::IoError, msg)
157 })?;
158 if len == 0 {
159 break;
160 }
161
162 let hash = hash_data(&buf[0..len]);
163 let chunk_id = ChunkId::new(&hash, len as u32);
164
165 debug!(
166 "got file chunk: id={}, len={}, file={}, ",
167 chunk_id,
168 len,
169 self.local_path.as_str()
170 );
171 list.push(chunk_id.clone());
172 file_len += len;
173
174 if len < self.chunk_size as usize {
176 if file_len == len {
177 assert!(file_hash.is_none());
179 file_hash = Some(hash);
180 } else {
181 file_sha256.input(&buf[0..len]);
182 }
183 break;
184 }
185
186 file_sha256.input(&buf[0..len]);
187
188 if self.state.is_some() {
189 self.state
190 .as_ref()
191 .unwrap()
192 .update(file_len as u64, file_sha256.get_state(), chunk_id)
193 .await?;
194 }
195 }
196
197 let file_hash: HashValue = match file_hash {
198 Some(v) => v,
199 None => file_sha256.result().into(),
200 };
201
202 log::info!("file_hash {}", file_hash.to_string());
203 let (create_time, _, _) = Self::get_file_time(Path::new(self.local_path.as_str())).await?;
204 let file = File::new(
205 self.owner.clone(),
206 file_len as u64,
207 file_hash,
208 ChunkList::ChunkInList(list),
209 )
210 .create_time(create_time)
211 .build();
212 Ok(file)
213 }
214}