1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::iterator::*;
4use super::object_map::*;
5use super::path::*;
6use crate::*;
7
8use async_std::sync::Mutex as AsyncMutex;
9use once_cell::sync::OnceCell;
10use std::sync::Arc;
11
12pub struct ObjectMapSingleOpEnv {
13 sid: u64,
14
15 root_holder: ObjectMapRootHolder,
17
18 root: AsyncMutex<Option<ObjectMap>>,
20
21 cache: ObjectMapOpEnvCacheRef,
23
24 iterator: OnceCell<AsyncMutex<ObjectMapIterator>>,
25
26 access: Option<OpEnvPathAccess>,
28}
29
30impl ObjectMapSingleOpEnv {
31 pub(crate) fn new(
32 sid: u64,
33 root_holder: &ObjectMapRootHolder,
34 root_cache: &ObjectMapRootCacheRef,
35 access: Option<OpEnvPathAccess>,
36 ) -> Self {
37 let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
38
39 Self {
40 sid,
41 root_holder: root_holder.clone(),
42 root: AsyncMutex::new(None),
43 cache,
44 iterator: OnceCell::new(),
45 access,
46 }
47 }
48
49 pub fn sid(&self) -> u64 {
50 self.sid
51 }
52
53 pub async fn get_current_root(&self) -> Option<ObjectId> {
55 let ret = self.root.lock().await;
56 ret.as_ref().map(|v| v.cached_object_id().unwrap())
57 }
58
59 async fn set_root(&self, obj_map: ObjectMap) -> BuckyResult<()> {
60 let mut current = self.root.lock().await;
61 if current.is_some() {
62 let msg = format!(
63 "single op_env root already been set! id={}",
64 current.as_ref().unwrap().cached_object_id().unwrap()
65 );
66 error!("{}", msg);
67 return Err(BuckyError::new(BuckyErrorCode::AlreadyExists, msg));
68 }
69
70 info!(
71 "single op_env root init: id={}",
72 obj_map.cached_object_id().unwrap()
73 );
74
75 *current = Some(obj_map);
76
77 Ok(())
78 }
79
80 pub async fn create_new(&self, content_type: ObjectMapSimpleContentType, owner: Option<ObjectId>, dec_id: Option<ObjectId>,) -> BuckyResult<()> {
82 let obj = ObjectMap::new(
83 content_type.clone(),
84 owner,
85 dec_id,
86 )
87 .no_create_time()
88 .build();
89 let id = obj.flush_id();
90 info!(
91 "create new objectmap for single op_env: content_type={:?}, id={}",
92 content_type, id
93 );
94
95 self.set_root(obj).await?;
96
97 Ok(())
98 }
99
100 pub async fn load(&self, obj_map_id: &ObjectId) -> BuckyResult<()> {
102 let ret = self.cache.get_object_map(obj_map_id).await?;
103 if ret.is_none() {
104 let msg = format!(
105 "load single op_env object_id but not found! id={}",
106 obj_map_id,
107 );
108 error!("{}", msg);
109 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
110 }
111
112 debug!("load objectmap for single op_env: id={}", obj_map_id,);
113
114 let obj_map = ret.unwrap().lock().await.clone();
116 self.set_root(obj_map).await?;
117
118 Ok(())
119 }
120
121 pub async fn load_by_path(&self, full_path: &str) -> BuckyResult<()> {
122 let (path, key) = ObjectMapPath::parse_path_allow_empty_key(full_path)?;
123
124 self.load_by_key(path, key).await
125 }
126
127 pub async fn load_with_inner_path(&self, obj_map_id: &ObjectId, inner_path: Option<String>) -> BuckyResult<()> {
128 let value = match &inner_path {
129 Some(inner_path) if inner_path.len() > 0 => {
130 let object_path = ObjectMapPath::new(obj_map_id.clone(), self.cache.clone(), false);
131 let value = object_path.get_by_path(&inner_path).await?;
132 if value.is_none() {
133 let msg = format!(
134 "load single_op_env with inner_path but not found! root={}, inner_path={}",
135 obj_map_id, inner_path,
136 );
137 warn!("{}", msg);
138 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
139 }
140
141 value.unwrap()
142 }
143 _ => {
144 obj_map_id.to_owned()
145 }
146 };
147
148 info!(
149 "will load single_op_env with inner_path! root={}, inner_path={:?}, target={}",
150 obj_map_id, inner_path, value,
151 );
152
153 self.load(&value).await
154 }
155
156 pub async fn load_by_key(&self, path: &str, key: &str) -> BuckyResult<()> {
159 if let Some(access) = &self.access {
161 access.check_path_key(path, key, RequestOpType::Read)?;
162 }
163
164 let root = self.root_holder.get_current_root();
165
166 let value = if key.len() > 0 {
167 let object_path = ObjectMapPath::new(root.clone(), self.cache.clone(), false);
168 let value = object_path.get_by_key(path, key).await?;
169 if value.is_none() {
170 let msg = format!(
171 "load single_op_env by path but not found! root={}, path={}, key={}",
172 root, path, key
173 );
174 warn!("{}", msg);
175 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
176 }
177
178 value.unwrap()
179 } else {
180 assert_eq!(path, "/");
181 root
182 };
183
184 info!(
185 "will load single_op_env by path! root={}, path={}, key={}, value={}",
186 root, path, key, value
187 );
188
189 self.load(&value).await
190 }
191
192 pub async fn list(&self) -> BuckyResult<ObjectMapContentList> {
194 let ret = self.root.lock().await;
195 if ret.is_none() {
196 let msg = format!("single op_env root not been init yet! sid={}", self.sid);
197 error!("{}", msg);
198 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
199 }
200
201 let obj = ret.as_ref().unwrap();
202 let mut list = ObjectMapContentList::new(obj.count() as usize);
203 ret.as_ref().unwrap().list(&self.cache, &mut list).await?;
204
205 Ok(list)
206 }
207
208 pub async fn next(&self, step: usize) -> BuckyResult<ObjectMapContentList> {
210 let ret = self.root.lock().await;
211 if ret.is_none() {
212 let msg = format!("single op_env root not been init yet! sid={}", self.sid);
213 error!("{}", msg);
214 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
215 }
216
217 let obj = ret.as_ref().unwrap();
218
219 let iterator = self.iterator.get_or_init(|| {
220 let ret = ObjectMapIterator::new(false, &obj, self.cache.clone());
221 AsyncMutex::new(ret)
222 });
223
224 let mut it = iterator.lock().await;
225 it.next(&obj, step).await
226 }
227
228 pub async fn reset(&self) {
230 if self.iterator.get().is_none() {
231 return;
232 }
233
234 let ret = self.root.lock().await;
235 if ret.is_none() {
236 let msg = format!("single op_env root not been init yet! sid={}", self.sid);
237 error!("{}", msg);
238 return;
239 }
240
241 let obj = ret.as_ref().unwrap();
242
243 let ret = self.iterator.get();
244 if ret.is_none() {
245 return;
246 }
247
248 let new_it = ObjectMapIterator::new(false, &obj, self.cache.clone());
249
250 info!(
251 "will reset single op_env iterator: root={}",
252 obj.cached_object_id().unwrap()
253 );
254
255 let iterator = ret.unwrap();
256 *iterator.lock().await = new_it;
257 }
258
259 pub async fn metadata(&self) -> BuckyResult<ObjectMapMetaData> {
260 let ret = self.root.lock().await;
261 if ret.is_none() {
262 let msg = format!("single op_env root not been init yet! sid={}", self.sid);
263 error!("{}", msg);
264 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
265 }
266
267 let obj = ret.as_ref().unwrap();
268 Ok(obj.metadata())
269 }
270
271 pub async fn get_by_key(&self, key: &str) -> BuckyResult<Option<ObjectId>> {
273 let ret = self.root.lock().await;
274 if ret.is_none() {
275 let msg = format!("single op_env root not been init yet! sid={}", self.sid);
276 error!("{}", msg);
277 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
278 }
279
280 ret.as_ref().unwrap().get_by_key(&self.cache, key).await
281 }
282
283 pub async fn insert_with_key(&self, key: &str, value: &ObjectId) -> BuckyResult<()> {
284 let mut ret = self.root.lock().await;
285 if ret.is_none() {
286 let msg = format!(
287 "single op_env root not been init yet! key={}, value={}",
288 key, value
289 );
290 error!("{}", msg);
291 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
292 }
293
294 ret.as_mut()
295 .unwrap()
296 .insert_with_key(&self.cache, key, value)
297 .await
298 }
299
300 pub async fn set_with_key(
301 &self,
302 key: &str,
303 value: &ObjectId,
304 prev_value: &Option<ObjectId>,
305 auto_insert: bool,
306 ) -> BuckyResult<Option<ObjectId>> {
307 let mut ret = self.root.lock().await;
308 if ret.is_none() {
309 let msg = format!(
310 "single op_env root not been init yet! sid={}, key={}, value={}",
311 self.sid, key, value
312 );
313 error!("{}", msg);
314 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
315 }
316
317 ret.as_mut()
318 .unwrap()
319 .set_with_key(&self.cache, key, value, prev_value, auto_insert)
320 .await
321 }
322
323 pub async fn remove_with_key(
324 &self,
325 key: &str,
326 prev_value: &Option<ObjectId>,
327 ) -> BuckyResult<Option<ObjectId>> {
328 let mut ret = self.root.lock().await;
329 if ret.is_none() {
330 let msg = format!(
331 "single op_env root not been init yet! sid={}, key={}",
332 self.sid, key
333 );
334 error!("{}", msg);
335 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
336 }
337
338 ret.as_mut()
339 .unwrap()
340 .remove_with_key(&self.cache, key, prev_value)
341 .await
342 }
343
344 pub async fn contains(&self, object_id: &ObjectId) -> BuckyResult<bool> {
346 let ret = self.root.lock().await;
347 if ret.is_none() {
348 let msg = format!(
349 "single op_env root not been init yet! sid={}, value={}",
350 self.sid, object_id
351 );
352 error!("{}", msg);
353 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
354 }
355
356 ret.as_ref().unwrap().contains(&self.cache, object_id).await
357 }
358
359 pub async fn insert(&self, object_id: &ObjectId) -> BuckyResult<bool> {
360 let mut ret = self.root.lock().await;
361 if ret.is_none() {
362 let msg = format!(
363 "single op_env root not been init yet! sid={}, value={}",
364 self.sid, object_id
365 );
366 error!("{}", msg);
367 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
368 }
369
370 ret.as_mut().unwrap().insert(&self.cache, object_id).await
371 }
372
373 pub async fn remove(&self, object_id: &ObjectId) -> BuckyResult<bool> {
374 let mut ret = self.root.lock().await;
375 if ret.is_none() {
376 let msg = format!(
377 "single op_env root not been init yet! sid={}, value={}",
378 self.sid, object_id
379 );
380 error!("{}", msg);
381 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
382 }
383
384 ret.as_mut().unwrap().remove(&self.cache, object_id).await
385 }
386
387 async fn update_root(&self, finish: bool) -> BuckyResult<ObjectId> {
388 let mut root_slot = self.root.lock().await;
389 if root_slot.is_none() {
390 let msg = format!(
391 "update root error, single op_env root not been init yet! sid={}",
392 self.sid
393 );
394 error!("{}", msg);
395 return Err(BuckyError::new(BuckyErrorCode::ErrorState, msg));
396 }
397
398 let root = root_slot.as_ref().unwrap();
399 let object_id = root.cached_object_id().unwrap();
400 let new_id = root.flush_id();
401 if object_id == new_id {
402 info!(
403 "single op_env update root but object_id unchanged! id={}",
404 object_id
405 );
406 return Ok(new_id);
407 }
408
409 info!(
411 "single op_env root object changed! sid={}, {} => {}",
412 self.sid, object_id, new_id
413 );
414
415 let root = if finish {
416 root_slot.take().unwrap()
417 } else {
418 root.clone()
419 };
420
421 self.cache.put_object_map(&new_id, root, None)?;
422
423 if let Err(e) = self.cache.gc(true, &new_id).await {
424 error!("single env's cache gc error! root={}, {}", new_id, e);
425 }
426
427 self.cache.commit().await?;
428
429 info!(
430 "single op_env update root success! sid={}, root=={}",
431 self.sid, new_id
432 );
433 Ok(new_id)
434 }
435
436 pub async fn update(&self) -> BuckyResult<ObjectId> {
437 self.update_root(false).await
438 }
439
440 pub async fn commit(self) -> BuckyResult<ObjectId> {
441 self.update_root(true).await
442 }
443
444 pub fn abort(self) -> BuckyResult<()> {
445 info!("will abort single_op_env: sid={}", self.sid);
446 self.cache.abort();
447
448 Ok(())
449 }
450}
451
452#[derive(Clone)]
453pub struct ObjectMapSingleOpEnvRef(Arc<ObjectMapSingleOpEnv>);
454
455impl ObjectMapSingleOpEnvRef {
456 pub fn new(env: ObjectMapSingleOpEnv) -> Self {
457 Self(Arc::new(env))
458 }
459
460 fn into_raw(self) -> BuckyResult<ObjectMapSingleOpEnv> {
461 let sid = self.sid();
462 let env = Arc::try_unwrap(self.0).map_err(|this| {
463 let msg = format!(
464 "single_op_env's ref_count is more than one! sid={}, ref={}",
465 sid,
466 Arc::strong_count(&this)
467 );
468 error!("{}", msg);
469 BuckyError::new(BuckyErrorCode::ErrorState, msg)
470 })?;
471
472 Ok(env)
473 }
474
475 pub fn is_dropable(&self) -> bool {
476 Arc::strong_count(&self.0) == 1
477 }
478
479 pub async fn commit(self) -> BuckyResult<ObjectId> {
480 let env = self.into_raw()?;
481
482 env.commit().await
483 }
484
485 pub fn abort(self) -> BuckyResult<()> {
486 let env = self.into_raw()?;
487
488 env.abort()
489 }
490}
491
492impl std::ops::Deref for ObjectMapSingleOpEnvRef {
493 type Target = Arc<ObjectMapSingleOpEnv>;
494 fn deref(&self) -> &Arc<ObjectMapSingleOpEnv> {
495 &self.0
496 }
497}