cyfs_lib/storage/
storage.rs

1use crate::base::*;
2use crate::prelude::*;
3use crate::root_state::*;
4use crate::NONObjectInfo;
5use cyfs_base::*;
6use cyfs_core::*;
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use async_std::sync::Mutex as AsyncMutex;
10
11struct NOCStorageRawHelper {
12    id: String,
13    noc: NamedObjectCacheRef,
14    last_update_time: AtomicU64,
15}
16
17impl NOCStorageRawHelper {
18    pub fn new(id: impl Into<String>, noc: NamedObjectCacheRef) -> Self {
19        Self {
20            id: id.into(),
21            noc,
22            last_update_time: AtomicU64::new(0),
23        }
24    }
25
26    pub async fn load(&self, object_id: &ObjectId) -> BuckyResult<Option<Vec<u8>>> {
27        let req = NamedObjectCacheGetObjectRequest {
28            source: RequestSourceInfo::new_local_system(),
29            object_id: object_id.to_owned(),
30            last_access_rpath: None,
31            flags: 0,
32        };
33
34        let resp = self.noc.get_object(&req).await?;
35        match resp {
36            Some(data) => {
37                match Storage::raw_decode(&data.object.object_raw) {
38                    Ok((storage, _)) => {
39                        // 缓存当前object的修改时间
40                        let update_time = storage.body().as_ref().unwrap().update_time();
41                        self.last_update_time.store(update_time, Ordering::Relaxed);
42
43                        Ok(Some(storage.into_value()))
44                    }
45                    Err(e) => {
46                        error!(
47                            "decode storage object error: id={}, storage={}, {}",
48                            self.id, object_id, e
49                        );
50                        Err(e)
51                    }
52                }
53            }
54
55            None => {
56                info!(
57                    "storage not found in noc: id={}, storage={}",
58                    self.id, object_id,
59                );
60                Ok(None)
61            }
62        }
63    }
64
65    pub async fn save(&self, buf: Vec<u8>, with_hash: bool) -> BuckyResult<StorageId> {
66        let mut storage: Storage = if with_hash {
67            StorageObj::create_with_hash(&self.id, buf)
68        } else {
69            StorageObj::create(&self.id, buf)
70        };
71
72        // 检查一下body的更新时间,确保更新
73        let old_update_time = self.last_update_time.load(Ordering::Relaxed);
74        let mut now = storage.body().as_ref().unwrap().update_time();
75        if now < old_update_time {
76            warn!(
77                "storage new time is older than current! now={}, cur={}",
78                now, old_update_time
79            );
80            now = old_update_time + 1;
81            storage.body_mut().as_mut().unwrap().set_update_time(now);
82        }
83
84        self.save_to_noc(storage).await
85    }
86
87    async fn save_to_noc(&self, storage: Storage) -> BuckyResult<StorageId> {
88        let storage_id = storage.storage_id();
89        info!(
90            "now will save storage to noc: id={}, storage={}",
91            self.id, storage_id
92        );
93
94        let object_raw = storage.to_vec().unwrap();
95        let object = NONObjectInfo::new_from_object_raw(object_raw)?;
96
97        let req = NamedObjectCachePutObjectRequest {
98            source: RequestSourceInfo::new_local_system(),
99            object,
100            storage_category: NamedObjectStorageCategory::Storage,
101            last_access_rpath: None,
102            context: None,
103            access_string: Some(AccessString::dec_default().value()),
104        };
105
106        match self.noc.put_object(&req).await {
107            Ok(resp) => {
108                match resp.result {
109                    NamedObjectCachePutObjectResult::Accept
110                    | NamedObjectCachePutObjectResult::Updated => {
111                        info!(
112                            "insert storage to noc success! id={}, storage={}",
113                            self.id, req.object.object_id
114                        );
115                        Ok(storage_id)
116                    }
117                    r @ _ => {
118                        // 不应该到这里?因为修改后的update_time已经会被更新
119                        // FIXME 如果触发了本地时间回滚之类的问题,这里是否需要强制delete然后再插入?
120                        error!(
121                            "update storage to noc but alreay exist! id={}, storage={}, result={:?}",
122                            self.id, req.object.object_id, r
123                        );
124
125                        Err(BuckyError::from(BuckyErrorCode::AlreadyExists))
126                    }
127                }
128            }
129            Err(e) => {
130                error!(
131                    "insert storage to noc error! id={}, storage={}, {}",
132                    self.id, req.object.object_id, e
133                );
134                Err(e)
135            }
136        }
137    }
138
139    // 从noc删除当前storage对象
140    pub async fn delete(&self, object_id: &ObjectId) -> BuckyResult<()> {
141        let req = NamedObjectCacheDeleteObjectRequest {
142            source: RequestSourceInfo::new_local_system(),
143            object_id: object_id.to_owned(),
144            flags: 0,
145        };
146
147        let resp = self.noc.delete_object(&req).await?;
148        if resp.deleted_count > 0 {
149            info!(
150                "delete storage object from noc successs: id={}, storage={}",
151                self.id, req.object_id
152            );
153        } else {
154            warn!(
155                "delete storage object but not found: id={}, storage={}",
156                self.id, req.object_id,
157            );
158        }
159
160        Ok(())
161    }
162}
163
164#[async_trait::async_trait]
165pub trait NOCStorage: Send + Sync {
166    fn id(&self) -> &str;
167    async fn load(&self) -> BuckyResult<Option<Vec<u8>>>;
168    async fn save(&self, buf: Vec<u8>) -> BuckyResult<()>;
169    async fn delete(&self) -> BuckyResult<()>;
170}
171
172pub struct NOCGlobalStateStorage {
173    global_state: GlobalStateOutputProcessorRef,
174    dec_id: Option<ObjectId>,
175    path: String,
176    target: Option<ObjectId>,
177    noc: NOCStorageRawHelper,
178    op_lock: AsyncMutex<u32>,
179}
180
181impl NOCGlobalStateStorage {
182    pub fn new(
183        global_state: GlobalStateOutputProcessorRef,
184        dec_id: Option<ObjectId>,
185        path: String,
186        target: Option<ObjectId>,
187        id: &str,
188        noc: NamedObjectCacheRef,
189    ) -> Self {
190        let noc = NOCStorageRawHelper::new(id, noc);
191
192        Self {
193            global_state,
194            dec_id,
195            path,
196            target,
197            noc,
198            op_lock: AsyncMutex::new(0),
199        }
200    }
201
202    fn create_global_stub(&self) -> GlobalStateStub {
203        let dec_id = match &self.dec_id {
204            Some(dec_id) => Some(dec_id.to_owned()),
205            None => Some(cyfs_core::get_system_dec_app().to_owned()),
206        };
207
208        let stub = GlobalStateStub::new(self.global_state.clone(), self.target.clone(), dec_id);
209        stub
210    }
211}
212
213#[async_trait::async_trait]
214impl NOCStorage for NOCGlobalStateStorage {
215    fn id(&self) -> &str {
216        &self.noc.id
217    }
218
219    async fn load(&self) -> BuckyResult<Option<Vec<u8>>> {
220        // info!("try load global state storage: path={}", self.path);
221        let _lock = self.op_lock.lock().await;
222
223        let stub = self.create_global_stub();
224
225        let path_stub = stub.create_path_op_env().await?;
226        let current = path_stub.get_by_path(&self.path).await?;
227        match current {
228            Some(id) => {
229                let ret = self.noc.load(&id).await.map_err(|mut e| {
230                    let msg = format!(
231                        "load storage from noc failed! id={}, stroage={}, path={}, dec={:?}, {}",
232                        self.noc.id, id, self.path, self.dec_id, e,
233                    );
234                    error!("{}", msg);
235                    e.set_msg(msg);
236                    e
237                })?;
238
239                match ret {
240                    Some(data) => Ok(Some(data)),
241                    None => {
242                        warn!("load storage from noc but not found! id={}, stroage={}, path={}, dec={:?}",
243                        self.noc.id, id, self.path, self.dec_id);
244
245                        Ok(None)
246                    }
247                }
248            }
249            None => {
250                warn!(
251                    "global state storage load from path but not found! id={}, path={}, dec={:?}",
252                    self.noc.id, self.path, self.dec_id
253                );
254                Ok(None)
255            }
256        }
257    }
258
259    async fn save(&self, buf: Vec<u8>) -> BuckyResult<()> {
260        // info!("try save global state storage: path={}", self.path);
261        let _lock = self.op_lock.lock().await;
262
263        // First save as storage object to noc
264        let storage_id = self.noc.save(buf, true).await.map_err(|mut e| {
265            let msg = format!(
266                "save storage to noc failed! id={}, path={}, dec={:?}, {}",
267                self.noc.id, self.path, self.dec_id, e,
268            );
269            error!("{}", msg);
270            e.set_msg(msg);
271            e
272        })?;
273
274        // Then update the global state to save the object_id
275        let stub = self.create_global_stub();
276        let path_stub = stub.create_path_op_env().await?;
277
278        path_stub
279            .set_with_path(&self.path, storage_id.object_id(), None, true)
280            .await
281            .map_err(|mut e| {
282                let msg = format!(
283                    "save storage to global state failed! id={}, path={}, dec={:?}, {}",
284                    self.noc.id, self.path, self.dec_id, e,
285                );
286                error!("{}", msg);
287                e.set_msg(msg);
288                e
289            })?;
290
291        path_stub.commit().await.map_err(|mut e| {
292            let msg = format!(
293                "commit storage to global state failed! id={}, path={}, dec={:?}, {}",
294                self.noc.id, self.path, self.dec_id, e,
295            );
296            error!("{}", msg);
297            e.set_msg(msg);
298            e
299        })?;
300
301        info!(
302            "save storage to global state success! id={}, path={}, dec={:?}",
303            self.noc.id, self.path, self.dec_id
304        );
305
306        Ok(())
307    }
308
309    async fn delete(&self) -> BuckyResult<()> {
310        // info!("try delete global state storage: path={}", self.path);
311        let _lock = self.op_lock.lock().await;
312
313        // First update the global state to save the object_id
314        let stub = self.create_global_stub();
315        let path_stub = stub.create_path_op_env().await?;
316
317        let ret = path_stub
318            .remove_with_path(&self.path, None)
319            .await
320            .map_err(|mut e| {
321                let msg = format!(
322                    "remove storage from global state failed! id={}, path={}, dec={:?}, {}",
323                    self.noc.id, self.path, self.dec_id, e,
324                );
325                error!("{}", msg);
326                e.set_msg(msg);
327                e
328            })?;
329
330        path_stub.commit().await.map_err(|mut e| {
331            let msg = format!(
332                "commit storage to global state failed! id={}, path={}, dec={:?}, {}",
333                self.noc.id, self.path, self.dec_id, e,
334            );
335            error!("{}", msg);
336            e.set_msg(msg);
337            e
338        })?;
339
340        match ret {
341            Some(object_id) => {
342                // Then delete object from noc
343                if let Err(e) = self.noc.delete(&object_id).await {
344                    error!("delete storage from noc but failed! id={}, path={}, dec={:?}, storage={}, {}", 
345                    self.noc.id, self.path, self.dec_id, object_id, e);
346                }
347            }
348            None => {
349                info!(
350                    "delete storage from global state but not found! id={}, path={}, dec={:?}",
351                    self.noc.id, self.path, self.dec_id,
352                );
353            }
354        }
355
356        Ok(())
357    }
358}
359
360pub struct NOCRawStorage {
361    noc: NOCStorageRawHelper,
362    storage_id: StorageId,
363}
364
365impl NOCRawStorage {
366    pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
367        let storage: Storage = StorageObj::create(id, Vec::new());
368
369        let noc = NOCStorageRawHelper::new(id, noc);
370
371        Self {
372            noc,
373            storage_id: storage.storage_id(),
374        }
375    }
376    
377    pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
378        let storage: Storage = StorageObj::create(id, Vec::new());
379        let storage_id = storage.storage_id();
380
381        let noc_req = NamedObjectCacheExistsObjectRequest {
382            object_id: storage_id.object_id().clone(),
383            source: RequestSourceInfo::new_local_system(),
384        };
385
386        noc.exists_object(&noc_req).await.map(|resp| {
387            resp.meta && resp.object
388        })
389    }
390}
391
392#[async_trait::async_trait]
393impl NOCStorage for NOCRawStorage {
394    fn id(&self) -> &str {
395        &self.noc.id
396    }
397
398    async fn load(&self) -> BuckyResult<Option<Vec<u8>>> {
399        self.noc.load(self.storage_id.object_id()).await
400    }
401
402    async fn save(&self, buf: Vec<u8>) -> BuckyResult<()> {
403        match self.noc.save(buf, false).await {
404            Ok(id) => {
405                assert_eq!(id, self.storage_id);
406                Ok(())
407            }
408            Err(e) => Err(e),
409        }
410    }
411
412    async fn delete(&self) -> BuckyResult<()> {
413        self.noc.delete(self.storage_id.object_id()).await
414    }
415}