cyfs_lib/storage/
state_storage.rs

1use crate::root_state::*;
2use crate::UniCyfsStackRef;
3use cyfs_base::*;
4
5use async_std::sync::Mutex as AsyncMutex;
6use once_cell::sync::OnceCell;
7use std::sync::{
8    atomic::{AtomicBool, Ordering},
9    Arc,
10};
11
12#[derive(Clone)]
13struct StorageOpData {
14    path_stub: PathOpEnvStub,
15    single_stub: SingleOpEnvStub,
16    current: Arc<AsyncMutex<Option<ObjectId>>>,
17}
18
19pub struct StateStorage {
20    path: String,
21    content_type: ObjectMapSimpleContentType,
22    global_state: GlobalStateOutputProcessorRef,
23    target: Option<ObjectId>,
24    dec_id: Option<ObjectId>,
25
26    dirty: Arc<AtomicBool>,
27    auto_save: Arc<AtomicBool>,
28    op_data: OnceCell<StorageOpData>,
29}
30
31impl Drop for StateStorage {
32    fn drop(&mut self) {
33        async_std::task::block_on(async move {
34            self.abort().await;
35        })
36    }
37}
38
39impl StateStorage {
40    pub fn new(
41        global_state: GlobalStateOutputProcessorRef,
42        path: impl Into<String>,
43        content_type: ObjectMapSimpleContentType,
44        target: Option<ObjectId>,
45        dec_id: Option<ObjectId>,
46    ) -> Self {
47        Self {
48            global_state,
49            path: path.into(),
50            content_type,
51            target,
52            dec_id,
53
54            dirty: Arc::new(AtomicBool::new(false)),
55            auto_save: Arc::new(AtomicBool::new(false)),
56            op_data: OnceCell::new(),
57        }
58    }
59
60    pub fn new_with_stack(
61        stack: UniCyfsStackRef,
62        category: GlobalStateCategory,
63        path: impl Into<String>,
64        content_type: ObjectMapSimpleContentType,
65        target: Option<ObjectId>,
66        dec_id: Option<ObjectId>,
67    ) -> Self {
68        let global_state = match category {
69            GlobalStateCategory::RootState => stack.root_state().clone(),
70            GlobalStateCategory::LocalCache => stack.local_cache().clone(),
71        };
72
73        Self {
74            global_state,
75            path: path.into(),
76            content_type,
77            target,
78            dec_id,
79
80            dirty: Arc::new(AtomicBool::new(false)),
81            auto_save: Arc::new(AtomicBool::new(false)),
82            op_data: OnceCell::new(),
83        }
84    }
85
86    pub fn path(&self) -> &str {
87        &self.path
88    }
89
90    pub fn stub(&self) -> &SingleOpEnvStub {
91        &self.op_data.get().unwrap().single_stub
92    }
93
94    pub fn is_dirty(&self) -> bool {
95        self.dirty.load(Ordering::SeqCst)
96    }
97
98    pub fn set_dirty(&self, dirty: bool) {
99        self.dirty.store(dirty, Ordering::SeqCst);
100    }
101
102    pub async fn init(&self) -> BuckyResult<()> {
103        assert!(self.op_data.get().is_none());
104
105        let op_data = self.load().await?;
106        if let Err(_) = self.op_data.set(op_data) {
107            unreachable!();
108        }
109
110        Ok(())
111    }
112
113    pub fn start_save(&self, dur: std::time::Duration) {
114        use async_std::prelude::*;
115
116        let ret = self
117            .auto_save
118            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
119        if ret.is_err() {
120            warn!("storage already in saving state! path={}", self.path);
121            return;
122        }
123
124        let auto_save = self.auto_save.clone();
125        let path = self.path.clone();
126        let dirty = self.dirty.clone();
127        let op_data = self.op_data.get().unwrap().clone();
128
129        async_std::task::spawn(async move {
130            let mut interval = async_std::stream::interval(dur);
131            while let Some(_) = interval.next().await {
132                if !auto_save.load(Ordering::SeqCst) {
133                    warn!("storage auto save stopped! path={}", path);
134                    break;
135                }
136
137                let _ = Self::save_impl(&path, &dirty, &op_data).await;
138            }
139        });
140    }
141
142    pub fn stop_save(&self) {
143        if let Ok(_) =
144            self.auto_save
145                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
146        {
147            info!("stop state storage auto save! path={}", self.path);
148        }
149    }
150
151    async fn load(&self) -> BuckyResult<StorageOpData> {
152        let dec_id = match &self.dec_id {
153            Some(dec_id) => Some(dec_id.to_owned()),
154            None => Some(cyfs_core::get_system_dec_app().to_owned()),
155        };
156
157        let stub = GlobalStateStub::new(self.global_state.clone(), self.target.clone(), dec_id);
158
159        let path_stub = stub.create_path_op_env().await?;
160        path_stub
161            .lock(vec![self.path.clone()], u64::MAX)
162            .await
163            .unwrap();
164
165        let single_stub = stub.create_single_op_env().await?;
166
167        let current = path_stub.get_by_path(&self.path).await?;
168        match current {
169            Some(ref obj) => {
170                single_stub.load(obj.clone()).await?;
171            }
172            None => {
173                single_stub.create_new(self.content_type).await?;
174            }
175        }
176
177        let op_data = StorageOpData {
178            path_stub,
179            single_stub,
180            current: Arc::new(AsyncMutex::new(current)),
181        };
182
183        Ok(op_data)
184    }
185
186    // reload the target object and ignore all the unsaved changes!
187    pub async fn reload(&self) -> BuckyResult<bool> {
188        let op_data = self.op_data.get().unwrap();
189
190        let new = op_data.path_stub.get_by_path(&self.path).await?;
191
192        let mut current = op_data.current.lock().await;
193        if *current == new {
194            return Ok(false);
195        }
196
197        match new {
198            Some(ref obj) => {
199                op_data.single_stub.load(obj.clone()).await?;
200            }
201            None => {
202                op_data.single_stub.create_new(self.content_type).await?;
203            }
204        }
205
206        *current = new;
207        Ok(true)
208    }
209
210    pub async fn save(&self) -> BuckyResult<()> {
211        if let Some(op_data) = self.op_data.get() {
212            Self::save_impl(&self.path, &self.dirty, op_data).await
213        } else {
214            Ok(())
215        }
216    }
217
218    async fn save_impl(
219        path: &str,
220        dirty: &Arc<AtomicBool>,
221        op_data: &StorageOpData,
222    ) -> BuckyResult<()> {
223        let ret = dirty.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire);
224        if ret.is_err() {
225            return Ok(());
226        }
227
228        let ret = Self::commit_impl(path, op_data).await;
229
230        if ret.is_err() {
231            dirty.store(true, Ordering::SeqCst);
232        }
233
234        ret
235    }
236
237    pub async fn abort(&mut self) {
238        self.stop_save();
239
240        if let Some(op_data) = self.op_data.take() {
241            self.abort_impl(op_data).await;
242        }
243    }
244
245    async fn abort_impl(&self, op_data: StorageOpData) {
246        info!("will abort state storage: path={}", self.path);
247
248        // first hold the lock for update
249        let mut _current = op_data.current.lock().await;
250
251        if let Err(e) = op_data.single_stub.abort().await {
252            error!(
253                "abort state storage single stub error! path={}, {}",
254                self.path, e
255            );
256        }
257
258        if let Err(e) = op_data.path_stub.abort().await {
259            error!(
260                "abort state storage path stub error! path={}, {}",
261                self.path, e
262            );
263        }
264
265        self.set_dirty(false);
266    }
267
268    async fn commit_impl(path: &str, op_data: &StorageOpData) -> BuckyResult<()> {
269        // first hold the lock for update
270        let mut current = op_data.current.lock().await;
271
272        let new = op_data.single_stub.update().await.map_err(|e| {
273            error!("commit state storage failed! path={}, {}", path, e);
274            e
275        })?;
276
277        if Some(new) == *current {
278            debug!(
279                "commit state storage but not changed! path={}, current={}",
280                path, new
281            );
282            return Ok(());
283        }
284
285        match op_data
286            .path_stub
287            .set_with_path(path, &new, current.clone(), true)
288            .await
289        {
290            Ok(_) => {
291                info!(
292                    "update state storage success! path={}, current={}, prev={:?}",
293                    path, new, current
294                );
295            }
296            Err(e) => {
297                error!(
298                    "update state storage but failed! path={}, current={}, prev={:?}, {}",
299                    path, new, current, e
300                );
301
302                return Err(e);
303            }
304        }
305
306        op_data.path_stub.update().await.map_err(|e| {
307            error!(
308                "commit state storage to global state failed! path={}, {}",
309                path, e
310            );
311            e
312        })?;
313
314        *current = Some(new);
315
316        info!(
317            "commit state storage to global state success! path={}",
318            path
319        );
320
321        Ok(())
322    }
323}
324
325pub struct StateStorageMap {
326    storage: StateStorage,
327}
328
329impl StateStorageMap {
330    pub fn new(storage: StateStorage) -> Self {
331        Self { storage }
332    }
333
334    pub fn storage(&self) -> &StateStorage {
335        &self.storage
336    }
337
338    pub fn into_storage(self) -> StateStorage {
339        self.storage
340    }
341
342    pub async fn save(&self) -> BuckyResult<()> {
343        self.storage.save().await
344    }
345
346    pub async fn abort(mut self) {
347        self.storage.abort().await
348    }
349
350    pub async fn get(&self, key: impl Into<String>) -> BuckyResult<Option<ObjectId>> {
351        self.storage.stub().get_by_key(key).await
352    }
353
354    pub async fn set(
355        &self,
356        key: impl Into<String>,
357        value: &ObjectId,
358    ) -> BuckyResult<Option<ObjectId>> {
359        self.set_ex(key, value, None, true).await
360    }
361
362    pub async fn set_ex(
363        &self,
364        key: impl Into<String>,
365        value: &ObjectId,
366        prev_value: Option<ObjectId>,
367        auto_insert: bool,
368    ) -> BuckyResult<Option<ObjectId>> {
369        let ret = self
370            .storage
371            .stub()
372            .set_with_key(key, value, prev_value.clone(), auto_insert)
373            .await?;
374
375        if Some(*value) != ret {
376            self.storage.set_dirty(true);
377        }
378
379        Ok(ret)
380    }
381
382    pub async fn insert(&self, key: impl Into<String>, value: &ObjectId) -> BuckyResult<()> {
383        let ret = self.storage.stub().insert_with_key(key, value).await?;
384        self.storage.set_dirty(true);
385
386        Ok(ret)
387    }
388
389    pub async fn remove(&self, key: impl Into<String>) -> BuckyResult<Option<ObjectId>> {
390        self.remove_ex(key, None).await
391    }
392
393    pub async fn remove_ex(
394        &self,
395        key: impl Into<String>,
396        prev_value: Option<ObjectId>,
397    ) -> BuckyResult<Option<ObjectId>> {
398        let ret = self.storage.stub().remove_with_key(key, prev_value).await?;
399
400        if ret.is_some() {
401            self.storage.set_dirty(true);
402        }
403
404        Ok(ret)
405    }
406
407    pub async fn next(&self, step: u32) -> BuckyResult<Vec<(String, ObjectId)>> {
408        let list = self.storage.stub().next(step).await?;
409
410        self.convert_list(list)
411    }
412
413    pub async fn reset(&self) -> BuckyResult<()> {
414        self.storage.stub().reset().await
415    }
416
417    pub async fn list(&self) -> BuckyResult<Vec<(String, ObjectId)>> {
418        let list = self.storage.stub().list().await?;
419
420        self.convert_list(list)
421    }
422
423    fn convert_list(
424        &self,
425        list: Vec<ObjectMapContentItem>,
426    ) -> BuckyResult<Vec<(String, ObjectId)>> {
427        if list.is_empty() {
428            return Ok(vec![]);
429        }
430
431        if list[0].content_type() != ObjectMapSimpleContentType::Map {
432            let msg = format!(
433                "state storage is not valid map type! path={}, type={}",
434                self.storage().path,
435                list[0].content_type().as_str()
436            );
437            error!("{}", msg);
438            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
439        }
440
441        let list = list
442            .into_iter()
443            .map(|item| match item {
444                ObjectMapContentItem::Map(kp) => kp,
445                _ => unreachable!(),
446            })
447            .collect();
448
449        Ok(list)
450    }
451}
452
453pub struct StateStorageSet {
454    storage: StateStorage,
455}
456
457impl StateStorageSet {
458    pub fn new(storage: StateStorage) -> Self {
459        Self { storage }
460    }
461
462    pub fn storage(&self) -> &StateStorage {
463        &self.storage
464    }
465
466    pub fn into_storage(self) -> StateStorage {
467        self.storage
468    }
469
470    pub async fn save(&self) -> BuckyResult<()> {
471        self.storage.save().await
472    }
473
474    pub async fn abort(mut self) {
475        self.storage.abort().await
476    }
477
478    pub async fn contains(&self, object_id: &ObjectId) -> BuckyResult<bool> {
479        self.storage.stub().contains(object_id).await
480    }
481
482    pub async fn insert(&self, object_id: &ObjectId) -> BuckyResult<bool> {
483        let ret = self.storage.stub().insert(object_id).await?;
484        if ret {
485            self.storage.set_dirty(true);
486        }
487
488        Ok(ret)
489    }
490
491    pub async fn remove(&self, object_id: &ObjectId) -> BuckyResult<bool> {
492        let ret = self.storage.stub().remove(object_id).await?;
493        if ret {
494            self.storage.set_dirty(true);
495        }
496
497        Ok(ret)
498    }
499
500    pub async fn next(&self, step: u32) -> BuckyResult<Vec<ObjectId>> {
501        let list = self.storage.stub().next(step).await?;
502
503        self.convert_list(list)
504    }
505
506    pub async fn reset(&self) -> BuckyResult<()> {
507        self.storage.stub().reset().await
508    }
509
510    pub async fn list(&self) -> BuckyResult<Vec<ObjectId>> {
511        let list = self.storage.stub().list().await?;
512
513        self.convert_list(list)
514    }
515
516    fn convert_list(&self, list: Vec<ObjectMapContentItem>) -> BuckyResult<Vec<ObjectId>> {
517        if list.is_empty() {
518            return Ok(vec![]);
519        }
520
521        if list[0].content_type() != ObjectMapSimpleContentType::Set {
522            let msg = format!(
523                "state storage is not valid set type! path={}, type={}",
524                self.storage().path,
525                list[0].content_type().as_str()
526            );
527            error!("{}", msg);
528            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
529        }
530
531        let list = list
532            .into_iter()
533            .map(|item| match item {
534                ObjectMapContentItem::Set(id) => id,
535                _ => unreachable!(),
536            })
537            .collect();
538
539        Ok(list)
540    }
541}