1use crate::base::*;
2use crate::prelude::*;
3use crate::root_state::*;
4use crate::NONObjectInfo;
5use cyfs_base::*;
6use cyfs_core::*;
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use async_std::sync::Mutex as AsyncMutex;
10
11struct NOCStorageRawHelper {
12 id: String,
13 noc: NamedObjectCacheRef,
14 last_update_time: AtomicU64,
15}
16
17impl NOCStorageRawHelper {
18 pub fn new(id: impl Into<String>, noc: NamedObjectCacheRef) -> Self {
19 Self {
20 id: id.into(),
21 noc,
22 last_update_time: AtomicU64::new(0),
23 }
24 }
25
26 pub async fn load(&self, object_id: &ObjectId) -> BuckyResult<Option<Vec<u8>>> {
27 let req = NamedObjectCacheGetObjectRequest {
28 source: RequestSourceInfo::new_local_system(),
29 object_id: object_id.to_owned(),
30 last_access_rpath: None,
31 flags: 0,
32 };
33
34 let resp = self.noc.get_object(&req).await?;
35 match resp {
36 Some(data) => {
37 match Storage::raw_decode(&data.object.object_raw) {
38 Ok((storage, _)) => {
39 let update_time = storage.body().as_ref().unwrap().update_time();
41 self.last_update_time.store(update_time, Ordering::Relaxed);
42
43 Ok(Some(storage.into_value()))
44 }
45 Err(e) => {
46 error!(
47 "decode storage object error: id={}, storage={}, {}",
48 self.id, object_id, e
49 );
50 Err(e)
51 }
52 }
53 }
54
55 None => {
56 info!(
57 "storage not found in noc: id={}, storage={}",
58 self.id, object_id,
59 );
60 Ok(None)
61 }
62 }
63 }
64
65 pub async fn save(&self, buf: Vec<u8>, with_hash: bool) -> BuckyResult<StorageId> {
66 let mut storage: Storage = if with_hash {
67 StorageObj::create_with_hash(&self.id, buf)
68 } else {
69 StorageObj::create(&self.id, buf)
70 };
71
72 let old_update_time = self.last_update_time.load(Ordering::Relaxed);
74 let mut now = storage.body().as_ref().unwrap().update_time();
75 if now < old_update_time {
76 warn!(
77 "storage new time is older than current! now={}, cur={}",
78 now, old_update_time
79 );
80 now = old_update_time + 1;
81 storage.body_mut().as_mut().unwrap().set_update_time(now);
82 }
83
84 self.save_to_noc(storage).await
85 }
86
87 async fn save_to_noc(&self, storage: Storage) -> BuckyResult<StorageId> {
88 let storage_id = storage.storage_id();
89 info!(
90 "now will save storage to noc: id={}, storage={}",
91 self.id, storage_id
92 );
93
94 let object_raw = storage.to_vec().unwrap();
95 let object = NONObjectInfo::new_from_object_raw(object_raw)?;
96
97 let req = NamedObjectCachePutObjectRequest {
98 source: RequestSourceInfo::new_local_system(),
99 object,
100 storage_category: NamedObjectStorageCategory::Storage,
101 last_access_rpath: None,
102 context: None,
103 access_string: Some(AccessString::dec_default().value()),
104 };
105
106 match self.noc.put_object(&req).await {
107 Ok(resp) => {
108 match resp.result {
109 NamedObjectCachePutObjectResult::Accept
110 | NamedObjectCachePutObjectResult::Updated => {
111 info!(
112 "insert storage to noc success! id={}, storage={}",
113 self.id, req.object.object_id
114 );
115 Ok(storage_id)
116 }
117 r @ _ => {
118 error!(
121 "update storage to noc but alreay exist! id={}, storage={}, result={:?}",
122 self.id, req.object.object_id, r
123 );
124
125 Err(BuckyError::from(BuckyErrorCode::AlreadyExists))
126 }
127 }
128 }
129 Err(e) => {
130 error!(
131 "insert storage to noc error! id={}, storage={}, {}",
132 self.id, req.object.object_id, e
133 );
134 Err(e)
135 }
136 }
137 }
138
139 pub async fn delete(&self, object_id: &ObjectId) -> BuckyResult<()> {
141 let req = NamedObjectCacheDeleteObjectRequest {
142 source: RequestSourceInfo::new_local_system(),
143 object_id: object_id.to_owned(),
144 flags: 0,
145 };
146
147 let resp = self.noc.delete_object(&req).await?;
148 if resp.deleted_count > 0 {
149 info!(
150 "delete storage object from noc successs: id={}, storage={}",
151 self.id, req.object_id
152 );
153 } else {
154 warn!(
155 "delete storage object but not found: id={}, storage={}",
156 self.id, req.object_id,
157 );
158 }
159
160 Ok(())
161 }
162}
163
164#[async_trait::async_trait]
165pub trait NOCStorage: Send + Sync {
166 fn id(&self) -> &str;
167 async fn load(&self) -> BuckyResult<Option<Vec<u8>>>;
168 async fn save(&self, buf: Vec<u8>) -> BuckyResult<()>;
169 async fn delete(&self) -> BuckyResult<()>;
170}
171
172pub struct NOCGlobalStateStorage {
173 global_state: GlobalStateOutputProcessorRef,
174 dec_id: Option<ObjectId>,
175 path: String,
176 target: Option<ObjectId>,
177 noc: NOCStorageRawHelper,
178 op_lock: AsyncMutex<u32>,
179}
180
181impl NOCGlobalStateStorage {
182 pub fn new(
183 global_state: GlobalStateOutputProcessorRef,
184 dec_id: Option<ObjectId>,
185 path: String,
186 target: Option<ObjectId>,
187 id: &str,
188 noc: NamedObjectCacheRef,
189 ) -> Self {
190 let noc = NOCStorageRawHelper::new(id, noc);
191
192 Self {
193 global_state,
194 dec_id,
195 path,
196 target,
197 noc,
198 op_lock: AsyncMutex::new(0),
199 }
200 }
201
202 fn create_global_stub(&self) -> GlobalStateStub {
203 let dec_id = match &self.dec_id {
204 Some(dec_id) => Some(dec_id.to_owned()),
205 None => Some(cyfs_core::get_system_dec_app().to_owned()),
206 };
207
208 let stub = GlobalStateStub::new(self.global_state.clone(), self.target.clone(), dec_id);
209 stub
210 }
211}
212
213#[async_trait::async_trait]
214impl NOCStorage for NOCGlobalStateStorage {
215 fn id(&self) -> &str {
216 &self.noc.id
217 }
218
219 async fn load(&self) -> BuckyResult<Option<Vec<u8>>> {
220 let _lock = self.op_lock.lock().await;
222
223 let stub = self.create_global_stub();
224
225 let path_stub = stub.create_path_op_env().await?;
226 let current = path_stub.get_by_path(&self.path).await?;
227 match current {
228 Some(id) => {
229 let ret = self.noc.load(&id).await.map_err(|mut e| {
230 let msg = format!(
231 "load storage from noc failed! id={}, stroage={}, path={}, dec={:?}, {}",
232 self.noc.id, id, self.path, self.dec_id, e,
233 );
234 error!("{}", msg);
235 e.set_msg(msg);
236 e
237 })?;
238
239 match ret {
240 Some(data) => Ok(Some(data)),
241 None => {
242 warn!("load storage from noc but not found! id={}, stroage={}, path={}, dec={:?}",
243 self.noc.id, id, self.path, self.dec_id);
244
245 Ok(None)
246 }
247 }
248 }
249 None => {
250 warn!(
251 "global state storage load from path but not found! id={}, path={}, dec={:?}",
252 self.noc.id, self.path, self.dec_id
253 );
254 Ok(None)
255 }
256 }
257 }
258
259 async fn save(&self, buf: Vec<u8>) -> BuckyResult<()> {
260 let _lock = self.op_lock.lock().await;
262
263 let storage_id = self.noc.save(buf, true).await.map_err(|mut e| {
265 let msg = format!(
266 "save storage to noc failed! id={}, path={}, dec={:?}, {}",
267 self.noc.id, self.path, self.dec_id, e,
268 );
269 error!("{}", msg);
270 e.set_msg(msg);
271 e
272 })?;
273
274 let stub = self.create_global_stub();
276 let path_stub = stub.create_path_op_env().await?;
277
278 path_stub
279 .set_with_path(&self.path, storage_id.object_id(), None, true)
280 .await
281 .map_err(|mut e| {
282 let msg = format!(
283 "save storage to global state failed! id={}, path={}, dec={:?}, {}",
284 self.noc.id, self.path, self.dec_id, e,
285 );
286 error!("{}", msg);
287 e.set_msg(msg);
288 e
289 })?;
290
291 path_stub.commit().await.map_err(|mut e| {
292 let msg = format!(
293 "commit storage to global state failed! id={}, path={}, dec={:?}, {}",
294 self.noc.id, self.path, self.dec_id, e,
295 );
296 error!("{}", msg);
297 e.set_msg(msg);
298 e
299 })?;
300
301 info!(
302 "save storage to global state success! id={}, path={}, dec={:?}",
303 self.noc.id, self.path, self.dec_id
304 );
305
306 Ok(())
307 }
308
309 async fn delete(&self) -> BuckyResult<()> {
310 let _lock = self.op_lock.lock().await;
312
313 let stub = self.create_global_stub();
315 let path_stub = stub.create_path_op_env().await?;
316
317 let ret = path_stub
318 .remove_with_path(&self.path, None)
319 .await
320 .map_err(|mut e| {
321 let msg = format!(
322 "remove storage from global state failed! id={}, path={}, dec={:?}, {}",
323 self.noc.id, self.path, self.dec_id, e,
324 );
325 error!("{}", msg);
326 e.set_msg(msg);
327 e
328 })?;
329
330 path_stub.commit().await.map_err(|mut e| {
331 let msg = format!(
332 "commit storage to global state failed! id={}, path={}, dec={:?}, {}",
333 self.noc.id, self.path, self.dec_id, e,
334 );
335 error!("{}", msg);
336 e.set_msg(msg);
337 e
338 })?;
339
340 match ret {
341 Some(object_id) => {
342 if let Err(e) = self.noc.delete(&object_id).await {
344 error!("delete storage from noc but failed! id={}, path={}, dec={:?}, storage={}, {}",
345 self.noc.id, self.path, self.dec_id, object_id, e);
346 }
347 }
348 None => {
349 info!(
350 "delete storage from global state but not found! id={}, path={}, dec={:?}",
351 self.noc.id, self.path, self.dec_id,
352 );
353 }
354 }
355
356 Ok(())
357 }
358}
359
360pub struct NOCRawStorage {
361 noc: NOCStorageRawHelper,
362 storage_id: StorageId,
363}
364
365impl NOCRawStorage {
366 pub fn new(id: &str, noc: NamedObjectCacheRef) -> Self {
367 let storage: Storage = StorageObj::create(id, Vec::new());
368
369 let noc = NOCStorageRawHelper::new(id, noc);
370
371 Self {
372 noc,
373 storage_id: storage.storage_id(),
374 }
375 }
376
377 pub async fn exists(id: &str, noc: &NamedObjectCacheRef) -> BuckyResult<bool> {
378 let storage: Storage = StorageObj::create(id, Vec::new());
379 let storage_id = storage.storage_id();
380
381 let noc_req = NamedObjectCacheExistsObjectRequest {
382 object_id: storage_id.object_id().clone(),
383 source: RequestSourceInfo::new_local_system(),
384 };
385
386 noc.exists_object(&noc_req).await.map(|resp| {
387 resp.meta && resp.object
388 })
389 }
390}
391
392#[async_trait::async_trait]
393impl NOCStorage for NOCRawStorage {
394 fn id(&self) -> &str {
395 &self.noc.id
396 }
397
398 async fn load(&self) -> BuckyResult<Option<Vec<u8>>> {
399 self.noc.load(self.storage_id.object_id()).await
400 }
401
402 async fn save(&self, buf: Vec<u8>) -> BuckyResult<()> {
403 match self.noc.save(buf, false).await {
404 Ok(id) => {
405 assert_eq!(id, self.storage_id);
406 Ok(())
407 }
408 Err(e) => Err(e),
409 }
410 }
411
412 async fn delete(&self) -> BuckyResult<()> {
413 self.noc.delete(self.storage_id.object_id()).await
414 }
415}