cyfs_lib/storage/
collection.rs

1use super::storage::*;
2use crate::prelude::NamedObjectCacheRef;
3use crate::root_state::*;
4use cyfs_base::*;
5use cyfs_debug::Mutex;
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10/*
11基于storage的编码兼容处理
12一般有三种编码格式:
131. 使用jsoncodec手工编解码,对于增加的字段,自己手动处理
142. 使用serde_json自动编解码,对于新增加的字段,要使用Option选项,否则会导致出现missing field导致无法解码
153. 使用raw_codec自动编解码,不支持增删字段后的编解码,需要小心,改变结构定义后,需要处理解码失败导致load失败的情况
16*/
17pub trait CollectionCodec<T> {
18    fn encode(&self) -> BuckyResult<Vec<u8>>;
19    fn decode(buf: &[u8]) -> BuckyResult<T>;
20}
21
22impl<T> CollectionCodec<T> for T
23where
24    T: for<'de> RawDecode<'de> + RawEncode,
25{
26    fn encode(&self) -> BuckyResult<Vec<u8>> {
27        self.to_vec()
28    }
29
30    fn decode(buf: &[u8]) -> BuckyResult<T> {
31        T::clone_from_slice(&buf)
32    }
33}
34
35#[macro_export]
36macro_rules! declare_collection_codec_for_serde {
37    ($T:ty) => {
38        impl CollectionCodec<$T> for $T {
39            fn encode(&self) -> cyfs_base::BuckyResult<Vec<u8>> {
40                let body = serde_json::to_string(&self).map_err(|e| {
41                    let msg = format!("encode to json error! {}", e);
42                    log::error!("{}", msg);
43                    cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
44                })?;
45                Ok(body.into_bytes())
46            }
47            fn decode(buf: &[u8]) -> cyfs_base::BuckyResult<$T> {
48                serde_json::from_slice(buf).map_err(|e| {
49                    let msg = format!("decode from json error! {}", e);
50                    log::error!("{}", msg);
51                    cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
52                })
53            }
54        }
55    };
56}
57
58#[macro_export]
59macro_rules! declare_collection_codec_for_json_codec {
60    ($T:ty) => {
61        impl CollectionCodec<$T> for $T {
62            fn encode(&self) -> cyfs_base::BuckyResult<Vec<u8>> {
63                Ok(self.encode_string().into())
64            }
65            fn decode(buf: &[u8]) -> cyfs_base::BuckyResult<$T> {
66                use std::str;
67                let str_value = str::from_utf8(buf).map_err(|e| {
68                    let msg = format!("not valid utf8 string format: {}", e);
69                    log::error!("{}", msg);
70                    cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
71                })?;
72                Self::decode_string(str_value)
73            }
74        }
75    };
76}
77
78pub struct NOCStorageWrapper {
79    storage: Box<dyn NOCStorage>,
80}
81
82impl NOCStorageWrapper {
83    pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
84        Self {
85            storage: Box::new(NOCRawStorage::new(id, noc)),
86        }
87    }
88
89    pub fn new_global_state(
90        global_state: GlobalStateOutputProcessorRef,
91        dec_id: Option<ObjectId>,
92        path: String,
93        target: Option<ObjectId>,
94        id: &str,
95        noc: NamedObjectCacheRef,
96    ) -> Self {
97        Self {
98            storage: Box::new(NOCGlobalStateStorage::new(
99                global_state, dec_id, path, target, id, noc,
100            )),
101        }
102    }
103
104    pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
105        NOCRawStorage::exists(id, noc).await
106    }
107
108    pub fn id(&self) -> &str {
109        self.storage.id()
110    }
111
112    pub async fn load<T>(&self) -> BuckyResult<Option<T>>
113    where
114        T: CollectionCodec<T>,
115    {
116        match self.storage.load().await? {
117            Some(buf) => {
118                let coll = T::decode(&buf).map_err(|e| {
119                    error!(
120                        "decode storage buf to collection failed! id={}, {}",
121                        self.id(),
122                        e
123                    );
124                    e
125                })?;
126
127                Ok(Some(coll))
128            }
129            None => Ok(None),
130        }
131    }
132
133    pub async fn save<T>(&self, data: &T) -> BuckyResult<()>
134    where
135        T: CollectionCodec<T>,
136    {
137        let buf = data.encode().map_err(|e| {
138            error!(
139                "convert collection to buf failed! id={}, {}",
140                self.storage.id(),
141                e
142            );
143            e
144        })?;
145
146        self.storage.save(buf).await
147    }
148
149    pub async fn delete(&self) -> BuckyResult<()> {
150        self.storage.delete().await
151    }
152}
153
154pub struct NOCCollection<T>
155where
156    T: Default + CollectionCodec<T>,
157{
158    coll: T,
159    storage: NOCStorageWrapper,
160    dirty: bool,
161}
162
163impl<T> NOCCollection<T>
164where
165    T: Default + CollectionCodec<T>,
166{
167    pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
168        Self {
169            coll: T::default(),
170            storage: NOCStorageWrapper::new(id, noc),
171            dirty: false,
172        }
173    }
174
175    pub fn id(&self) -> &str {
176        self.storage.id()
177    }
178
179    pub fn coll(&self) -> &T {
180        &self.coll
181    }
182
183    pub fn is_dirty(&self) -> bool {
184        self.dirty
185    }
186
187    pub fn set_dirty(&mut self, dirty: bool) {
188        self.dirty = dirty;
189    }
190
191    pub fn swap(&mut self, mut value: T) -> T {
192        std::mem::swap(&mut self.coll, &mut value);
193        value
194    }
195
196    pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
197        NOCStorageWrapper::exists(id, noc).await
198    }
199
200    pub async fn load(&mut self) -> BuckyResult<()> {
201        match self.storage.load().await? {
202            Some(coll) => {
203                self.coll = coll;
204                Ok(())
205            }
206            None => Ok(()),
207        }
208    }
209
210    pub async fn save(&mut self) -> BuckyResult<()> {
211        if self.is_dirty() {
212            self.set_dirty(false);
213
214            self.storage.save(&self.coll).await.map_err(|e| {
215                self.set_dirty(true);
216                e
217            })
218        } else {
219            Ok(())
220        }
221    }
222
223    pub async fn delete(&mut self) -> BuckyResult<()> {
224        self.storage.delete().await?;
225
226        // FIXME 删除后是否要置空?
227        // self.coll = T::default();
228
229        Ok(())
230    }
231}
232
233use std::ops::Deref;
234use std::ops::DerefMut;
235
236pub trait NOCCollectionWithLock<T>
237where
238    T: Default + ?Sized + Send + 'static,
239{
240    fn read(&self) -> Box<dyn Deref<Target = T> + '_>;
241    fn write(&self) -> Box<dyn DerefMut<Target = T> + '_>;
242    //fn replace(&self, value: T);
243}
244
245struct NOCCollectionWithMutex<T>
246where
247    T: Default + ?Sized + Send + 'static,
248{
249    coll: Mutex<T>,
250}
251
252impl<T> NOCCollectionWithMutex<T>
253where
254    T: Default + ?Sized + Send + 'static,
255{
256    fn new() -> Self {
257        Self {
258            coll: Mutex::new(T::default()),
259        }
260    }
261}
262
263impl<T> NOCCollectionWithLock<T> for NOCCollectionWithMutex<T>
264where
265    T: Default + ?Sized + Send + 'static,
266{
267    fn read(&self) -> Box<dyn Deref<Target = T> + '_> {
268        Box::new(self.coll.lock().unwrap())
269    }
270    fn write(&self) -> Box<dyn DerefMut<Target = T> + '_> {
271        Box::new(self.coll.lock().unwrap())
272    }
273}
274
275use std::sync::RwLock;
276
277struct NOCCollectionWithRWLock<T>
278where
279    T: Default + ?Sized + Send + 'static,
280{
281    coll: RwLock<T>,
282}
283
284impl<T> NOCCollectionWithRWLock<T>
285where
286    T: Default + ?Sized + Send + 'static,
287{
288    fn new() -> Self {
289        Self {
290            coll: RwLock::new(T::default()),
291        }
292    }
293}
294
295impl<T> NOCCollectionWithLock<T> for NOCCollectionWithRWLock<T>
296where
297    T: Default + ?Sized + Send + 'static,
298{
299    fn read(&self) -> Box<dyn Deref<Target = T> + '_> {
300        Box::new(self.coll.read().unwrap())
301    }
302    fn write(&self) -> Box<dyn DerefMut<Target = T> + '_> {
303        Box::new(self.coll.write().unwrap())
304    }
305}
306
307pub struct NOCCollectionSync<T>
308where
309    T: Default + CollectionCodec<T> + Send + 'static,
310{
311    coll: Arc<Mutex<T>>,
312    storage: Arc<Box<dyn NOCStorage>>,
313
314    dirty: Arc<AtomicBool>,
315    auto_save: Arc<AtomicBool>,
316}
317
318impl<T> Clone for NOCCollectionSync<T>
319where
320    T: Default + CollectionCodec<T> + Send + 'static,
321{
322    fn clone(&self) -> Self {
323        Self {
324            coll: self.coll.clone(),
325            storage: self.storage.clone(),
326            dirty: self.dirty.clone(),
327            auto_save: self.auto_save.clone(),
328        }
329    }
330}
331
332impl<T> NOCCollectionSync<T>
333where
334    T: Default + CollectionCodec<T> + Send + 'static,
335{
336    pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
337        let storage = NOCRawStorage::new(id, noc);
338
339        Self {
340            coll: Arc::new(Mutex::new(T::default())),
341            storage: Arc::new(Box::new(storage)),
342            dirty: Arc::new(AtomicBool::new(false)),
343            auto_save: Arc::new(AtomicBool::new(false)),
344        }
345    }
346
347    pub fn new_global_state(
348        global_state: GlobalStateOutputProcessorRef,
349        dec_id: Option<ObjectId>,
350        path: String,
351        target: Option<ObjectId>,
352        id: &str,
353        noc: NamedObjectCacheRef,
354    ) -> Self {
355        let storage = NOCGlobalStateStorage::new(
356            global_state, dec_id, path, target, id, noc,
357        );
358
359        Self {
360            coll: Arc::new(Mutex::new(T::default())),
361            storage: Arc::new(Box::new(storage)),
362            dirty: Arc::new(AtomicBool::new(false)),
363            auto_save: Arc::new(AtomicBool::new(false)),
364        }
365    }
366
367    pub fn is_dirty(&self) -> bool {
368        self.dirty.load(Ordering::SeqCst)
369    }
370
371    pub fn set_dirty(&self, dirty: bool) -> bool {
372        self.dirty.swap(dirty, Ordering::SeqCst)
373    }
374
375    pub fn coll(&self) -> &Arc<Mutex<T>> {
376        &self.coll
377    }
378
379    pub fn id(&self) -> &str {
380        self.storage.id()
381    }
382
383    pub fn swap(&mut self, mut value: T) -> T {
384        {
385            let mut cur = self.coll.lock().unwrap();
386            std::mem::swap(&mut *cur, &mut value);
387        }
388
389        self.set_dirty(true);
390        value
391    }
392
393    pub async fn load(&self) -> BuckyResult<()> {
394        match self.storage.load().await? {
395            Some(buf) => {
396                let coll = T::decode(&buf).map_err(|e| {
397                    error!(
398                        "decode storage buf to collection failed! id={}, {}",
399                        self.id(),
400                        e
401                    );
402                    e
403                })?;
404
405                *self.coll.lock().unwrap() = coll;
406                Ok(())
407            }
408            None => Ok(()),
409        }
410    }
411
412    // 保存,必须正确的调用set_dirty才会发起真正的保存操作
413    pub async fn save(&self) -> BuckyResult<()> {
414        if self.set_dirty(false) {
415            self.save_impl().await.map_err(|e| {
416                self.set_dirty(true);
417                e
418            })
419        } else {
420            Ok(())
421        }
422    }
423
424    // 异步的保存,必须正确的调用set_dirty才会发起真正的保存操作
425    pub fn async_save(&self) {
426        let this = self.clone();
427        async_std::task::spawn(async move {
428            let _r = this.save().await;
429        });
430    }
431
432    async fn save_impl(&self) -> BuckyResult<()> {
433        let buf = {
434            let coll = self.coll.lock().unwrap();
435            coll.encode().map_err(|e| {
436                error!(
437                    "convert collection to buf failed! id={}, {}",
438                    self.storage.id(),
439                    e
440                );
441                e
442            })?
443        };
444
445        self.storage.save(buf).await
446    }
447
448    pub async fn delete(&self) -> BuckyResult<()> {
449        self.storage.delete().await?;
450
451        // 删除后需要停止自动保存
452        self.stop_save();
453
454        // FIXME 删除后是否要置空?
455        // self.coll = T::default();
456
457        Ok(())
458    }
459
460    // 开始定时保存操作
461    pub fn start_save(&self, dur: std::time::Duration) {
462        use async_std::prelude::*;
463
464        let ret = self
465            .auto_save
466            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
467        if ret.is_err() {
468            warn!("storage already in saving state! id={}", self.id());
469            return;
470        }
471
472        let this = self.clone();
473        async_std::task::spawn(async move {
474            let mut interval = async_std::stream::interval(dur);
475            while let Some(_) = interval.next().await {
476                if !this.auto_save.load(Ordering::SeqCst) {
477                    warn!("storage auto save stopped! id={}", this.id());
478                    break;
479                }
480                let _ = this.save().await;
481            }
482        });
483    }
484
485    pub fn stop_save(&self) {
486        if let Ok(_) =
487            self.auto_save
488                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
489        {
490            info!("will stop storage auto save! id={}", self.id());
491        }
492    }
493}
494
495pub struct NOCCollectionRWSync<T>
496where
497    T: Default + CollectionCodec<T> + Send + Sync + 'static,
498{
499    coll: Arc<RwLock<T>>,
500    storage: Arc<Box<dyn NOCStorage>>,
501
502    dirty: Arc<AtomicBool>,
503
504    auto_save: Arc<AtomicBool>,
505}
506
507
508impl<T> Clone for NOCCollectionRWSync<T>
509where
510    T: Default + CollectionCodec<T> + Send + Sync + 'static,
511{
512    fn clone(&self) -> Self {
513        Self {
514            coll: self.coll.clone(),
515            storage: self.storage.clone(),
516            dirty: self.dirty.clone(),
517            auto_save: self.auto_save.clone(),
518        }
519    }
520}
521
522impl<T> NOCCollectionRWSync<T>
523where
524    T: Default + CollectionCodec<T> + Send + Sync + 'static,
525{
526    pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
527        let noc = NOCRawStorage::new(id, noc);
528        Self {
529            coll: Arc::new(RwLock::new(T::default())),
530            storage: Arc::new(Box::new(noc)),
531            dirty: Arc::new(AtomicBool::new(false)),
532            auto_save: Arc::new(AtomicBool::new(false)),
533        }
534    }
535
536    pub fn new_global_state(
537        global_state: GlobalStateOutputProcessorRef,
538        dec_id: Option<ObjectId>,
539        path: String,
540        target: Option<ObjectId>,
541        id: &str,
542        noc: NamedObjectCacheRef,
543    ) -> Self {
544        let storage = NOCGlobalStateStorage::new(
545            global_state, dec_id, path, target, id, noc,
546        );
547
548        Self {
549            coll: Arc::new(RwLock::new(T::default())),
550            storage: Arc::new(Box::new(storage)),
551            dirty: Arc::new(AtomicBool::new(false)),
552            auto_save: Arc::new(AtomicBool::new(false)),
553        }
554    }
555
556    pub fn is_dirty(&self) -> bool {
557        self.dirty.load(Ordering::SeqCst)
558    }
559
560    pub fn set_dirty(&self, dirty: bool) {
561        self.dirty.store(dirty, Ordering::SeqCst);
562    }
563
564    pub fn coll(&self) -> &Arc<RwLock<T>> {
565        &self.coll
566    }
567
568    pub fn id(&self) -> &str {
569        self.storage.id()
570    }
571
572    pub fn swap(&self, mut value: T) -> T {
573        {
574            let mut cur = self.coll.write().unwrap();
575            std::mem::swap(&mut *cur, &mut value);
576        }
577
578        self.set_dirty(true);
579
580        value
581    }
582
583    pub async fn load(&self) -> BuckyResult<()> {
584        match self.storage.load().await? {
585            Some(buf) => {
586                let coll = T::decode(&buf).map_err(|e| {
587                    error!(
588                        "decode storage buf to collection failed! id={}, {}",
589                        self.id(),
590                        e
591                    );
592                    e
593                })?;
594
595                *self.coll.write().unwrap() = coll;
596                Ok(())
597            }
598            None => Ok(()),
599        }
600    }
601
602    pub async fn save(&self) -> BuckyResult<()> {
603        if self.is_dirty() {
604            self.set_dirty(false);
605
606            self.save_impl().await.map_err(|e| {
607                self.set_dirty(true);
608                e
609            })
610        } else {
611            Ok(())
612        }
613    }
614
615    pub async fn save_impl(&self) -> BuckyResult<()> {
616        let buf = {
617            let coll = self.coll.read().unwrap();
618            coll.encode().map_err(|e| {
619                error!(
620                    "convert collection to buf failed! id={}, {}",
621                    self.storage.id(),
622                    e
623                );
624                e
625            })?
626        };
627
628        self.storage.save(buf).await
629    }
630
631    pub async fn delete(&mut self) -> BuckyResult<()> {
632        self.storage.delete().await?;
633
634        // 删除后需要停止自动保存
635        self.stop_save();
636
637        // FIXME 删除后是否要置空?
638        // self.coll = T::default();
639
640        Ok(())
641    }
642
643    // 开始定时保存操作
644    pub fn start_save(&self, dur: std::time::Duration) {
645        use async_std::prelude::*;
646
647        let ret = self
648            .auto_save
649            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
650        if ret.is_err() {
651            warn!("storage already in saving state! id={}", self.id());
652            return;
653        }
654
655        let this = self.clone();
656        async_std::task::spawn(async move {
657            let mut interval = async_std::stream::interval(dur);
658            while let Some(_) = interval.next().await {
659                if !this.auto_save.load(Ordering::SeqCst) {
660                    warn!("storage auto save stopped! id={}", this.id());
661                    break;
662                }
663
664                let _ = this.save().await;
665            }
666        });
667    }
668
669    pub fn stop_save(&self) {
670        if let Ok(_) =
671            self.auto_save
672                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
673        {
674            info!("will stop storage auto save! id={}", self.id());
675        }
676    }
677}