1use super::storage::*;
2use crate::prelude::NamedObjectCacheRef;
3use crate::root_state::*;
4use cyfs_base::*;
5use cyfs_debug::Mutex;
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10pub trait CollectionCodec<T> {
18 fn encode(&self) -> BuckyResult<Vec<u8>>;
19 fn decode(buf: &[u8]) -> BuckyResult<T>;
20}
21
22impl<T> CollectionCodec<T> for T
23where
24 T: for<'de> RawDecode<'de> + RawEncode,
25{
26 fn encode(&self) -> BuckyResult<Vec<u8>> {
27 self.to_vec()
28 }
29
30 fn decode(buf: &[u8]) -> BuckyResult<T> {
31 T::clone_from_slice(&buf)
32 }
33}
34
35#[macro_export]
36macro_rules! declare_collection_codec_for_serde {
37 ($T:ty) => {
38 impl CollectionCodec<$T> for $T {
39 fn encode(&self) -> cyfs_base::BuckyResult<Vec<u8>> {
40 let body = serde_json::to_string(&self).map_err(|e| {
41 let msg = format!("encode to json error! {}", e);
42 log::error!("{}", msg);
43 cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
44 })?;
45 Ok(body.into_bytes())
46 }
47 fn decode(buf: &[u8]) -> cyfs_base::BuckyResult<$T> {
48 serde_json::from_slice(buf).map_err(|e| {
49 let msg = format!("decode from json error! {}", e);
50 log::error!("{}", msg);
51 cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
52 })
53 }
54 }
55 };
56}
57
58#[macro_export]
59macro_rules! declare_collection_codec_for_json_codec {
60 ($T:ty) => {
61 impl CollectionCodec<$T> for $T {
62 fn encode(&self) -> cyfs_base::BuckyResult<Vec<u8>> {
63 Ok(self.encode_string().into())
64 }
65 fn decode(buf: &[u8]) -> cyfs_base::BuckyResult<$T> {
66 use std::str;
67 let str_value = str::from_utf8(buf).map_err(|e| {
68 let msg = format!("not valid utf8 string format: {}", e);
69 log::error!("{}", msg);
70 cyfs_base::BuckyError::new(cyfs_base::BuckyErrorCode::InvalidFormat, msg)
71 })?;
72 Self::decode_string(str_value)
73 }
74 }
75 };
76}
77
78pub struct NOCStorageWrapper {
79 storage: Box<dyn NOCStorage>,
80}
81
82impl NOCStorageWrapper {
83 pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
84 Self {
85 storage: Box::new(NOCRawStorage::new(id, noc)),
86 }
87 }
88
89 pub fn new_global_state(
90 global_state: GlobalStateOutputProcessorRef,
91 dec_id: Option<ObjectId>,
92 path: String,
93 target: Option<ObjectId>,
94 id: &str,
95 noc: NamedObjectCacheRef,
96 ) -> Self {
97 Self {
98 storage: Box::new(NOCGlobalStateStorage::new(
99 global_state, dec_id, path, target, id, noc,
100 )),
101 }
102 }
103
104 pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
105 NOCRawStorage::exists(id, noc).await
106 }
107
108 pub fn id(&self) -> &str {
109 self.storage.id()
110 }
111
112 pub async fn load<T>(&self) -> BuckyResult<Option<T>>
113 where
114 T: CollectionCodec<T>,
115 {
116 match self.storage.load().await? {
117 Some(buf) => {
118 let coll = T::decode(&buf).map_err(|e| {
119 error!(
120 "decode storage buf to collection failed! id={}, {}",
121 self.id(),
122 e
123 );
124 e
125 })?;
126
127 Ok(Some(coll))
128 }
129 None => Ok(None),
130 }
131 }
132
133 pub async fn save<T>(&self, data: &T) -> BuckyResult<()>
134 where
135 T: CollectionCodec<T>,
136 {
137 let buf = data.encode().map_err(|e| {
138 error!(
139 "convert collection to buf failed! id={}, {}",
140 self.storage.id(),
141 e
142 );
143 e
144 })?;
145
146 self.storage.save(buf).await
147 }
148
149 pub async fn delete(&self) -> BuckyResult<()> {
150 self.storage.delete().await
151 }
152}
153
154pub struct NOCCollection<T>
155where
156 T: Default + CollectionCodec<T>,
157{
158 coll: T,
159 storage: NOCStorageWrapper,
160 dirty: bool,
161}
162
163impl<T> NOCCollection<T>
164where
165 T: Default + CollectionCodec<T>,
166{
167 pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
168 Self {
169 coll: T::default(),
170 storage: NOCStorageWrapper::new(id, noc),
171 dirty: false,
172 }
173 }
174
175 pub fn id(&self) -> &str {
176 self.storage.id()
177 }
178
179 pub fn coll(&self) -> &T {
180 &self.coll
181 }
182
183 pub fn is_dirty(&self) -> bool {
184 self.dirty
185 }
186
187 pub fn set_dirty(&mut self, dirty: bool) {
188 self.dirty = dirty;
189 }
190
191 pub fn swap(&mut self, mut value: T) -> T {
192 std::mem::swap(&mut self.coll, &mut value);
193 value
194 }
195
196 pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
197 NOCStorageWrapper::exists(id, noc).await
198 }
199
200 pub async fn load(&mut self) -> BuckyResult<()> {
201 match self.storage.load().await? {
202 Some(coll) => {
203 self.coll = coll;
204 Ok(())
205 }
206 None => Ok(()),
207 }
208 }
209
210 pub async fn save(&mut self) -> BuckyResult<()> {
211 if self.is_dirty() {
212 self.set_dirty(false);
213
214 self.storage.save(&self.coll).await.map_err(|e| {
215 self.set_dirty(true);
216 e
217 })
218 } else {
219 Ok(())
220 }
221 }
222
223 pub async fn delete(&mut self) -> BuckyResult<()> {
224 self.storage.delete().await?;
225
226 Ok(())
230 }
231}
232
233use std::ops::Deref;
234use std::ops::DerefMut;
235
236pub trait NOCCollectionWithLock<T>
237where
238 T: Default + ?Sized + Send + 'static,
239{
240 fn read(&self) -> Box<dyn Deref<Target = T> + '_>;
241 fn write(&self) -> Box<dyn DerefMut<Target = T> + '_>;
242 }
244
245struct NOCCollectionWithMutex<T>
246where
247 T: Default + ?Sized + Send + 'static,
248{
249 coll: Mutex<T>,
250}
251
252impl<T> NOCCollectionWithMutex<T>
253where
254 T: Default + ?Sized + Send + 'static,
255{
256 fn new() -> Self {
257 Self {
258 coll: Mutex::new(T::default()),
259 }
260 }
261}
262
263impl<T> NOCCollectionWithLock<T> for NOCCollectionWithMutex<T>
264where
265 T: Default + ?Sized + Send + 'static,
266{
267 fn read(&self) -> Box<dyn Deref<Target = T> + '_> {
268 Box::new(self.coll.lock().unwrap())
269 }
270 fn write(&self) -> Box<dyn DerefMut<Target = T> + '_> {
271 Box::new(self.coll.lock().unwrap())
272 }
273}
274
275use std::sync::RwLock;
276
277struct NOCCollectionWithRWLock<T>
278where
279 T: Default + ?Sized + Send + 'static,
280{
281 coll: RwLock<T>,
282}
283
284impl<T> NOCCollectionWithRWLock<T>
285where
286 T: Default + ?Sized + Send + 'static,
287{
288 fn new() -> Self {
289 Self {
290 coll: RwLock::new(T::default()),
291 }
292 }
293}
294
295impl<T> NOCCollectionWithLock<T> for NOCCollectionWithRWLock<T>
296where
297 T: Default + ?Sized + Send + 'static,
298{
299 fn read(&self) -> Box<dyn Deref<Target = T> + '_> {
300 Box::new(self.coll.read().unwrap())
301 }
302 fn write(&self) -> Box<dyn DerefMut<Target = T> + '_> {
303 Box::new(self.coll.write().unwrap())
304 }
305}
306
307pub struct NOCCollectionSync<T>
308where
309 T: Default + CollectionCodec<T> + Send + 'static,
310{
311 coll: Arc<Mutex<T>>,
312 storage: Arc<Box<dyn NOCStorage>>,
313
314 dirty: Arc<AtomicBool>,
315 auto_save: Arc<AtomicBool>,
316}
317
318impl<T> Clone for NOCCollectionSync<T>
319where
320 T: Default + CollectionCodec<T> + Send + 'static,
321{
322 fn clone(&self) -> Self {
323 Self {
324 coll: self.coll.clone(),
325 storage: self.storage.clone(),
326 dirty: self.dirty.clone(),
327 auto_save: self.auto_save.clone(),
328 }
329 }
330}
331
332impl<T> NOCCollectionSync<T>
333where
334 T: Default + CollectionCodec<T> + Send + 'static,
335{
336 pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
337 let storage = NOCRawStorage::new(id, noc);
338
339 Self {
340 coll: Arc::new(Mutex::new(T::default())),
341 storage: Arc::new(Box::new(storage)),
342 dirty: Arc::new(AtomicBool::new(false)),
343 auto_save: Arc::new(AtomicBool::new(false)),
344 }
345 }
346
347 pub fn new_global_state(
348 global_state: GlobalStateOutputProcessorRef,
349 dec_id: Option<ObjectId>,
350 path: String,
351 target: Option<ObjectId>,
352 id: &str,
353 noc: NamedObjectCacheRef,
354 ) -> Self {
355 let storage = NOCGlobalStateStorage::new(
356 global_state, dec_id, path, target, id, noc,
357 );
358
359 Self {
360 coll: Arc::new(Mutex::new(T::default())),
361 storage: Arc::new(Box::new(storage)),
362 dirty: Arc::new(AtomicBool::new(false)),
363 auto_save: Arc::new(AtomicBool::new(false)),
364 }
365 }
366
367 pub fn is_dirty(&self) -> bool {
368 self.dirty.load(Ordering::SeqCst)
369 }
370
371 pub fn set_dirty(&self, dirty: bool) -> bool {
372 self.dirty.swap(dirty, Ordering::SeqCst)
373 }
374
375 pub fn coll(&self) -> &Arc<Mutex<T>> {
376 &self.coll
377 }
378
379 pub fn id(&self) -> &str {
380 self.storage.id()
381 }
382
383 pub fn swap(&mut self, mut value: T) -> T {
384 {
385 let mut cur = self.coll.lock().unwrap();
386 std::mem::swap(&mut *cur, &mut value);
387 }
388
389 self.set_dirty(true);
390 value
391 }
392
393 pub async fn load(&self) -> BuckyResult<()> {
394 match self.storage.load().await? {
395 Some(buf) => {
396 let coll = T::decode(&buf).map_err(|e| {
397 error!(
398 "decode storage buf to collection failed! id={}, {}",
399 self.id(),
400 e
401 );
402 e
403 })?;
404
405 *self.coll.lock().unwrap() = coll;
406 Ok(())
407 }
408 None => Ok(()),
409 }
410 }
411
412 pub async fn save(&self) -> BuckyResult<()> {
414 if self.set_dirty(false) {
415 self.save_impl().await.map_err(|e| {
416 self.set_dirty(true);
417 e
418 })
419 } else {
420 Ok(())
421 }
422 }
423
424 pub fn async_save(&self) {
426 let this = self.clone();
427 async_std::task::spawn(async move {
428 let _r = this.save().await;
429 });
430 }
431
432 async fn save_impl(&self) -> BuckyResult<()> {
433 let buf = {
434 let coll = self.coll.lock().unwrap();
435 coll.encode().map_err(|e| {
436 error!(
437 "convert collection to buf failed! id={}, {}",
438 self.storage.id(),
439 e
440 );
441 e
442 })?
443 };
444
445 self.storage.save(buf).await
446 }
447
448 pub async fn delete(&self) -> BuckyResult<()> {
449 self.storage.delete().await?;
450
451 self.stop_save();
453
454 Ok(())
458 }
459
460 pub fn start_save(&self, dur: std::time::Duration) {
462 use async_std::prelude::*;
463
464 let ret = self
465 .auto_save
466 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
467 if ret.is_err() {
468 warn!("storage already in saving state! id={}", self.id());
469 return;
470 }
471
472 let this = self.clone();
473 async_std::task::spawn(async move {
474 let mut interval = async_std::stream::interval(dur);
475 while let Some(_) = interval.next().await {
476 if !this.auto_save.load(Ordering::SeqCst) {
477 warn!("storage auto save stopped! id={}", this.id());
478 break;
479 }
480 let _ = this.save().await;
481 }
482 });
483 }
484
485 pub fn stop_save(&self) {
486 if let Ok(_) =
487 self.auto_save
488 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
489 {
490 info!("will stop storage auto save! id={}", self.id());
491 }
492 }
493}
494
495pub struct NOCCollectionRWSync<T>
496where
497 T: Default + CollectionCodec<T> + Send + Sync + 'static,
498{
499 coll: Arc<RwLock<T>>,
500 storage: Arc<Box<dyn NOCStorage>>,
501
502 dirty: Arc<AtomicBool>,
503
504 auto_save: Arc<AtomicBool>,
505}
506
507
508impl<T> Clone for NOCCollectionRWSync<T>
509where
510 T: Default + CollectionCodec<T> + Send + Sync + 'static,
511{
512 fn clone(&self) -> Self {
513 Self {
514 coll: self.coll.clone(),
515 storage: self.storage.clone(),
516 dirty: self.dirty.clone(),
517 auto_save: self.auto_save.clone(),
518 }
519 }
520}
521
522impl<T> NOCCollectionRWSync<T>
523where
524 T: Default + CollectionCodec<T> + Send + Sync + 'static,
525{
526 pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
527 let noc = NOCRawStorage::new(id, noc);
528 Self {
529 coll: Arc::new(RwLock::new(T::default())),
530 storage: Arc::new(Box::new(noc)),
531 dirty: Arc::new(AtomicBool::new(false)),
532 auto_save: Arc::new(AtomicBool::new(false)),
533 }
534 }
535
536 pub fn new_global_state(
537 global_state: GlobalStateOutputProcessorRef,
538 dec_id: Option<ObjectId>,
539 path: String,
540 target: Option<ObjectId>,
541 id: &str,
542 noc: NamedObjectCacheRef,
543 ) -> Self {
544 let storage = NOCGlobalStateStorage::new(
545 global_state, dec_id, path, target, id, noc,
546 );
547
548 Self {
549 coll: Arc::new(RwLock::new(T::default())),
550 storage: Arc::new(Box::new(storage)),
551 dirty: Arc::new(AtomicBool::new(false)),
552 auto_save: Arc::new(AtomicBool::new(false)),
553 }
554 }
555
556 pub fn is_dirty(&self) -> bool {
557 self.dirty.load(Ordering::SeqCst)
558 }
559
560 pub fn set_dirty(&self, dirty: bool) {
561 self.dirty.store(dirty, Ordering::SeqCst);
562 }
563
564 pub fn coll(&self) -> &Arc<RwLock<T>> {
565 &self.coll
566 }
567
568 pub fn id(&self) -> &str {
569 self.storage.id()
570 }
571
572 pub fn swap(&self, mut value: T) -> T {
573 {
574 let mut cur = self.coll.write().unwrap();
575 std::mem::swap(&mut *cur, &mut value);
576 }
577
578 self.set_dirty(true);
579
580 value
581 }
582
583 pub async fn load(&self) -> BuckyResult<()> {
584 match self.storage.load().await? {
585 Some(buf) => {
586 let coll = T::decode(&buf).map_err(|e| {
587 error!(
588 "decode storage buf to collection failed! id={}, {}",
589 self.id(),
590 e
591 );
592 e
593 })?;
594
595 *self.coll.write().unwrap() = coll;
596 Ok(())
597 }
598 None => Ok(()),
599 }
600 }
601
602 pub async fn save(&self) -> BuckyResult<()> {
603 if self.is_dirty() {
604 self.set_dirty(false);
605
606 self.save_impl().await.map_err(|e| {
607 self.set_dirty(true);
608 e
609 })
610 } else {
611 Ok(())
612 }
613 }
614
615 pub async fn save_impl(&self) -> BuckyResult<()> {
616 let buf = {
617 let coll = self.coll.read().unwrap();
618 coll.encode().map_err(|e| {
619 error!(
620 "convert collection to buf failed! id={}, {}",
621 self.storage.id(),
622 e
623 );
624 e
625 })?
626 };
627
628 self.storage.save(buf).await
629 }
630
631 pub async fn delete(&mut self) -> BuckyResult<()> {
632 self.storage.delete().await?;
633
634 self.stop_save();
636
637 Ok(())
641 }
642
643 pub fn start_save(&self, dur: std::time::Duration) {
645 use async_std::prelude::*;
646
647 let ret = self
648 .auto_save
649 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
650 if ret.is_err() {
651 warn!("storage already in saving state! id={}", self.id());
652 return;
653 }
654
655 let this = self.clone();
656 async_std::task::spawn(async move {
657 let mut interval = async_std::stream::interval(dur);
658 while let Some(_) = interval.next().await {
659 if !this.auto_save.load(Ordering::SeqCst) {
660 warn!("storage auto save stopped! id={}", this.id());
661 break;
662 }
663
664 let _ = this.save().await;
665 }
666 });
667 }
668
669 pub fn stop_save(&self) {
670 if let Ok(_) =
671 self.auto_save
672 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
673 {
674 info!("will stop storage auto save! id={}", self.id());
675 }
676 }
677}