1use crate::root_state::*;
2use crate::UniCyfsStackRef;
3use cyfs_base::*;
4
5use async_std::sync::Mutex as AsyncMutex;
6use once_cell::sync::OnceCell;
7use std::sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc,
10};
11
12#[derive(Clone)]
13struct StorageOpData {
14 path_stub: PathOpEnvStub,
15 single_stub: SingleOpEnvStub,
16 current: Arc<AsyncMutex<Option<ObjectId>>>,
17}
18
19pub struct StateStorage {
20 path: String,
21 content_type: ObjectMapSimpleContentType,
22 global_state: GlobalStateOutputProcessorRef,
23 target: Option<ObjectId>,
24 dec_id: Option<ObjectId>,
25
26 dirty: Arc<AtomicBool>,
27 auto_save: Arc<AtomicBool>,
28 op_data: OnceCell<StorageOpData>,
29}
30
31impl Drop for StateStorage {
32 fn drop(&mut self) {
33 async_std::task::block_on(async move {
34 self.abort().await;
35 })
36 }
37}
38
39impl StateStorage {
40 pub fn new(
41 global_state: GlobalStateOutputProcessorRef,
42 path: impl Into<String>,
43 content_type: ObjectMapSimpleContentType,
44 target: Option<ObjectId>,
45 dec_id: Option<ObjectId>,
46 ) -> Self {
47 Self {
48 global_state,
49 path: path.into(),
50 content_type,
51 target,
52 dec_id,
53
54 dirty: Arc::new(AtomicBool::new(false)),
55 auto_save: Arc::new(AtomicBool::new(false)),
56 op_data: OnceCell::new(),
57 }
58 }
59
60 pub fn new_with_stack(
61 stack: UniCyfsStackRef,
62 category: GlobalStateCategory,
63 path: impl Into<String>,
64 content_type: ObjectMapSimpleContentType,
65 target: Option<ObjectId>,
66 dec_id: Option<ObjectId>,
67 ) -> Self {
68 let global_state = match category {
69 GlobalStateCategory::RootState => stack.root_state().clone(),
70 GlobalStateCategory::LocalCache => stack.local_cache().clone(),
71 };
72
73 Self {
74 global_state,
75 path: path.into(),
76 content_type,
77 target,
78 dec_id,
79
80 dirty: Arc::new(AtomicBool::new(false)),
81 auto_save: Arc::new(AtomicBool::new(false)),
82 op_data: OnceCell::new(),
83 }
84 }
85
86 pub fn path(&self) -> &str {
87 &self.path
88 }
89
90 pub fn stub(&self) -> &SingleOpEnvStub {
91 &self.op_data.get().unwrap().single_stub
92 }
93
94 pub fn is_dirty(&self) -> bool {
95 self.dirty.load(Ordering::SeqCst)
96 }
97
98 pub fn set_dirty(&self, dirty: bool) {
99 self.dirty.store(dirty, Ordering::SeqCst);
100 }
101
102 pub async fn init(&self) -> BuckyResult<()> {
103 assert!(self.op_data.get().is_none());
104
105 let op_data = self.load().await?;
106 if let Err(_) = self.op_data.set(op_data) {
107 unreachable!();
108 }
109
110 Ok(())
111 }
112
113 pub fn start_save(&self, dur: std::time::Duration) {
114 use async_std::prelude::*;
115
116 let ret = self
117 .auto_save
118 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire);
119 if ret.is_err() {
120 warn!("storage already in saving state! path={}", self.path);
121 return;
122 }
123
124 let auto_save = self.auto_save.clone();
125 let path = self.path.clone();
126 let dirty = self.dirty.clone();
127 let op_data = self.op_data.get().unwrap().clone();
128
129 async_std::task::spawn(async move {
130 let mut interval = async_std::stream::interval(dur);
131 while let Some(_) = interval.next().await {
132 if !auto_save.load(Ordering::SeqCst) {
133 warn!("storage auto save stopped! path={}", path);
134 break;
135 }
136
137 let _ = Self::save_impl(&path, &dirty, &op_data).await;
138 }
139 });
140 }
141
142 pub fn stop_save(&self) {
143 if let Ok(_) =
144 self.auto_save
145 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
146 {
147 info!("stop state storage auto save! path={}", self.path);
148 }
149 }
150
151 async fn load(&self) -> BuckyResult<StorageOpData> {
152 let dec_id = match &self.dec_id {
153 Some(dec_id) => Some(dec_id.to_owned()),
154 None => Some(cyfs_core::get_system_dec_app().to_owned()),
155 };
156
157 let stub = GlobalStateStub::new(self.global_state.clone(), self.target.clone(), dec_id);
158
159 let path_stub = stub.create_path_op_env().await?;
160 path_stub
161 .lock(vec![self.path.clone()], u64::MAX)
162 .await
163 .unwrap();
164
165 let single_stub = stub.create_single_op_env().await?;
166
167 let current = path_stub.get_by_path(&self.path).await?;
168 match current {
169 Some(ref obj) => {
170 single_stub.load(obj.clone()).await?;
171 }
172 None => {
173 single_stub.create_new(self.content_type).await?;
174 }
175 }
176
177 let op_data = StorageOpData {
178 path_stub,
179 single_stub,
180 current: Arc::new(AsyncMutex::new(current)),
181 };
182
183 Ok(op_data)
184 }
185
186 pub async fn reload(&self) -> BuckyResult<bool> {
188 let op_data = self.op_data.get().unwrap();
189
190 let new = op_data.path_stub.get_by_path(&self.path).await?;
191
192 let mut current = op_data.current.lock().await;
193 if *current == new {
194 return Ok(false);
195 }
196
197 match new {
198 Some(ref obj) => {
199 op_data.single_stub.load(obj.clone()).await?;
200 }
201 None => {
202 op_data.single_stub.create_new(self.content_type).await?;
203 }
204 }
205
206 *current = new;
207 Ok(true)
208 }
209
210 pub async fn save(&self) -> BuckyResult<()> {
211 if let Some(op_data) = self.op_data.get() {
212 Self::save_impl(&self.path, &self.dirty, op_data).await
213 } else {
214 Ok(())
215 }
216 }
217
218 async fn save_impl(
219 path: &str,
220 dirty: &Arc<AtomicBool>,
221 op_data: &StorageOpData,
222 ) -> BuckyResult<()> {
223 let ret = dirty.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire);
224 if ret.is_err() {
225 return Ok(());
226 }
227
228 let ret = Self::commit_impl(path, op_data).await;
229
230 if ret.is_err() {
231 dirty.store(true, Ordering::SeqCst);
232 }
233
234 ret
235 }
236
237 pub async fn abort(&mut self) {
238 self.stop_save();
239
240 if let Some(op_data) = self.op_data.take() {
241 self.abort_impl(op_data).await;
242 }
243 }
244
245 async fn abort_impl(&self, op_data: StorageOpData) {
246 info!("will abort state storage: path={}", self.path);
247
248 let mut _current = op_data.current.lock().await;
250
251 if let Err(e) = op_data.single_stub.abort().await {
252 error!(
253 "abort state storage single stub error! path={}, {}",
254 self.path, e
255 );
256 }
257
258 if let Err(e) = op_data.path_stub.abort().await {
259 error!(
260 "abort state storage path stub error! path={}, {}",
261 self.path, e
262 );
263 }
264
265 self.set_dirty(false);
266 }
267
268 async fn commit_impl(path: &str, op_data: &StorageOpData) -> BuckyResult<()> {
269 let mut current = op_data.current.lock().await;
271
272 let new = op_data.single_stub.update().await.map_err(|e| {
273 error!("commit state storage failed! path={}, {}", path, e);
274 e
275 })?;
276
277 if Some(new) == *current {
278 debug!(
279 "commit state storage but not changed! path={}, current={}",
280 path, new
281 );
282 return Ok(());
283 }
284
285 match op_data
286 .path_stub
287 .set_with_path(path, &new, current.clone(), true)
288 .await
289 {
290 Ok(_) => {
291 info!(
292 "update state storage success! path={}, current={}, prev={:?}",
293 path, new, current
294 );
295 }
296 Err(e) => {
297 error!(
298 "update state storage but failed! path={}, current={}, prev={:?}, {}",
299 path, new, current, e
300 );
301
302 return Err(e);
303 }
304 }
305
306 op_data.path_stub.update().await.map_err(|e| {
307 error!(
308 "commit state storage to global state failed! path={}, {}",
309 path, e
310 );
311 e
312 })?;
313
314 *current = Some(new);
315
316 info!(
317 "commit state storage to global state success! path={}",
318 path
319 );
320
321 Ok(())
322 }
323}
324
325pub struct StateStorageMap {
326 storage: StateStorage,
327}
328
329impl StateStorageMap {
330 pub fn new(storage: StateStorage) -> Self {
331 Self { storage }
332 }
333
334 pub fn storage(&self) -> &StateStorage {
335 &self.storage
336 }
337
338 pub fn into_storage(self) -> StateStorage {
339 self.storage
340 }
341
342 pub async fn save(&self) -> BuckyResult<()> {
343 self.storage.save().await
344 }
345
346 pub async fn abort(mut self) {
347 self.storage.abort().await
348 }
349
350 pub async fn get(&self, key: impl Into<String>) -> BuckyResult<Option<ObjectId>> {
351 self.storage.stub().get_by_key(key).await
352 }
353
354 pub async fn set(
355 &self,
356 key: impl Into<String>,
357 value: &ObjectId,
358 ) -> BuckyResult<Option<ObjectId>> {
359 self.set_ex(key, value, None, true).await
360 }
361
362 pub async fn set_ex(
363 &self,
364 key: impl Into<String>,
365 value: &ObjectId,
366 prev_value: Option<ObjectId>,
367 auto_insert: bool,
368 ) -> BuckyResult<Option<ObjectId>> {
369 let ret = self
370 .storage
371 .stub()
372 .set_with_key(key, value, prev_value.clone(), auto_insert)
373 .await?;
374
375 if Some(*value) != ret {
376 self.storage.set_dirty(true);
377 }
378
379 Ok(ret)
380 }
381
382 pub async fn insert(&self, key: impl Into<String>, value: &ObjectId) -> BuckyResult<()> {
383 let ret = self.storage.stub().insert_with_key(key, value).await?;
384 self.storage.set_dirty(true);
385
386 Ok(ret)
387 }
388
389 pub async fn remove(&self, key: impl Into<String>) -> BuckyResult<Option<ObjectId>> {
390 self.remove_ex(key, None).await
391 }
392
393 pub async fn remove_ex(
394 &self,
395 key: impl Into<String>,
396 prev_value: Option<ObjectId>,
397 ) -> BuckyResult<Option<ObjectId>> {
398 let ret = self.storage.stub().remove_with_key(key, prev_value).await?;
399
400 if ret.is_some() {
401 self.storage.set_dirty(true);
402 }
403
404 Ok(ret)
405 }
406
407 pub async fn next(&self, step: u32) -> BuckyResult<Vec<(String, ObjectId)>> {
408 let list = self.storage.stub().next(step).await?;
409
410 self.convert_list(list)
411 }
412
413 pub async fn reset(&self) -> BuckyResult<()> {
414 self.storage.stub().reset().await
415 }
416
417 pub async fn list(&self) -> BuckyResult<Vec<(String, ObjectId)>> {
418 let list = self.storage.stub().list().await?;
419
420 self.convert_list(list)
421 }
422
423 fn convert_list(
424 &self,
425 list: Vec<ObjectMapContentItem>,
426 ) -> BuckyResult<Vec<(String, ObjectId)>> {
427 if list.is_empty() {
428 return Ok(vec![]);
429 }
430
431 if list[0].content_type() != ObjectMapSimpleContentType::Map {
432 let msg = format!(
433 "state storage is not valid map type! path={}, type={}",
434 self.storage().path,
435 list[0].content_type().as_str()
436 );
437 error!("{}", msg);
438 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
439 }
440
441 let list = list
442 .into_iter()
443 .map(|item| match item {
444 ObjectMapContentItem::Map(kp) => kp,
445 _ => unreachable!(),
446 })
447 .collect();
448
449 Ok(list)
450 }
451}
452
453pub struct StateStorageSet {
454 storage: StateStorage,
455}
456
457impl StateStorageSet {
458 pub fn new(storage: StateStorage) -> Self {
459 Self { storage }
460 }
461
462 pub fn storage(&self) -> &StateStorage {
463 &self.storage
464 }
465
466 pub fn into_storage(self) -> StateStorage {
467 self.storage
468 }
469
470 pub async fn save(&self) -> BuckyResult<()> {
471 self.storage.save().await
472 }
473
474 pub async fn abort(mut self) {
475 self.storage.abort().await
476 }
477
478 pub async fn contains(&self, object_id: &ObjectId) -> BuckyResult<bool> {
479 self.storage.stub().contains(object_id).await
480 }
481
482 pub async fn insert(&self, object_id: &ObjectId) -> BuckyResult<bool> {
483 let ret = self.storage.stub().insert(object_id).await?;
484 if ret {
485 self.storage.set_dirty(true);
486 }
487
488 Ok(ret)
489 }
490
491 pub async fn remove(&self, object_id: &ObjectId) -> BuckyResult<bool> {
492 let ret = self.storage.stub().remove(object_id).await?;
493 if ret {
494 self.storage.set_dirty(true);
495 }
496
497 Ok(ret)
498 }
499
500 pub async fn next(&self, step: u32) -> BuckyResult<Vec<ObjectId>> {
501 let list = self.storage.stub().next(step).await?;
502
503 self.convert_list(list)
504 }
505
506 pub async fn reset(&self) -> BuckyResult<()> {
507 self.storage.stub().reset().await
508 }
509
510 pub async fn list(&self) -> BuckyResult<Vec<ObjectId>> {
511 let list = self.storage.stub().list().await?;
512
513 self.convert_list(list)
514 }
515
516 fn convert_list(&self, list: Vec<ObjectMapContentItem>) -> BuckyResult<Vec<ObjectId>> {
517 if list.is_empty() {
518 return Ok(vec![]);
519 }
520
521 if list[0].content_type() != ObjectMapSimpleContentType::Set {
522 let msg = format!(
523 "state storage is not valid set type! path={}, type={}",
524 self.storage().path,
525 list[0].content_type().as_str()
526 );
527 error!("{}", msg);
528 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
529 }
530
531 let list = list
532 .into_iter()
533 .map(|item| match item {
534 ObjectMapContentItem::Set(id) => id,
535 _ => unreachable!(),
536 })
537 .collect();
538
539 Ok(list)
540 }
541}