1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::iterator::*;
4use super::lock::*;
5use super::path::*;
6use super::root::ObjectMapRootHolder;
7use crate::*;
8
9use async_std::sync::Mutex as AsyncMutex;
10use once_cell::sync::OnceCell;
11use std::sync::{Arc, RwLock};
12
13pub struct ObjectMapPathSnapshot {
16 root: RwLock<ObjectId>,
18
19 path: ObjectMapPath,
21}
22
23pub struct ObjectMapPathOpEnv {
25 sid: u64,
27
28 root_holder: ObjectMapRootHolder,
30
31 path: OnceCell<ObjectMapPathSnapshot>,
32
33 lock: ObjectMapPathLock,
35
36 cache: ObjectMapOpEnvCacheRef,
38
39 write_lock: AsyncMutex<()>,
41
42 access: Option<OpEnvPathAccess>,
44}
45
46impl Drop for ObjectMapPathOpEnv {
47 fn drop(&mut self) {
48 async_std::task::block_on(self.unlock());
49 }
50}
51
52impl ObjectMapPathOpEnv {
53 pub(crate) fn new(
54 sid: u64,
55 root_holder: &ObjectMapRootHolder,
56 lock: &ObjectMapPathLock,
57 root_cache: &ObjectMapRootCacheRef,
58 access: Option<OpEnvPathAccess>,
59 ) -> Self {
60 debug!("new path_op_env: sid={},", sid);
61 let cache = ObjectMapOpEnvMemoryCache::new_ref(root_cache.clone());
62
63 Self {
64 sid,
65 root_holder: root_holder.clone(),
66 path: OnceCell::new(),
67 cache,
68 lock: lock.clone(),
69 write_lock: AsyncMutex::new(()),
70 access,
71 }
72 }
73
74 fn path_snapshot(&self) -> &ObjectMapPathSnapshot {
76 self.path.get_or_init(|| {
77 let root = self.root_holder.get_current_root();
79 info!(
80 "path_op_env bind root snapshot: sid={}, root={}",
81 self.sid, root
82 );
83 let path = ObjectMapPath::new(root.clone(), self.cache.clone(), true);
84
85 ObjectMapPathSnapshot {
86 root: RwLock::new(root),
87 path,
88 }
89 })
90 }
91
92 pub fn cache(&self) -> &ObjectMapOpEnvCacheRef {
93 &self.cache
94 }
95
96 pub fn sid(&self) -> u64 {
97 self.sid
98 }
99
100 pub fn root(&self) -> ObjectId {
102 self.path_snapshot().root.read().unwrap().to_owned()
103 }
104
105 fn path(&self) -> &ObjectMapPath {
106 &self.path_snapshot().path
107 }
108
109 pub async fn lock_path(
110 &self,
111 path_list: Vec<String>,
112 duration_in_millsecs: u64,
113 as_try: bool,
114 ) -> BuckyResult<()> {
115 info!(
116 "path_op_env lock_path: sid={}, path_list={:?}, duration_in_millsecs={}",
117 self.sid, path_list, duration_in_millsecs
118 );
119
120 if let Some(access) = &self.access {
122 access.check_full_path_list(&path_list, RequestOpType::Write)?;
123 }
124
125 let mut req_list = vec![];
126 let expired = if duration_in_millsecs > 0 {
127 let now = bucky_time_now();
128 if duration_in_millsecs < (u64::MAX - now) / 1000 {
129 now + duration_in_millsecs * 1000
130 } else {
131 duration_in_millsecs
132 }
133 } else {
134 0
135 };
136
137 for path in path_list {
138 let req = PathLockRequest {
139 path,
140 sid: self.sid,
141 expired,
142 };
143
144 req_list.push(req);
145 }
146
147 if as_try {
148 self.lock.try_lock_list(req_list).await
149 } else {
150 self.lock.lock_list(req_list).await;
151 Ok(())
152 }
153 }
154
155 pub async fn list(&self, path: &str) -> BuckyResult<ObjectMapContentList> {
157 if let Some(access) = &self.access {
159 access.check_full_path(path, RequestOpType::Read)?;
160 }
161 self.path().list(path).await
162 }
163
164 pub async fn metadata(&self, path: &str) -> BuckyResult<ObjectMapMetaData> {
166 if let Some(access) = &self.access {
168 access.check_full_path(path, RequestOpType::Read)?;
169 }
170 self.path().metadata(path).await
171 }
172
173 pub async fn get_by_path(&self, full_path: &str) -> BuckyResult<Option<ObjectId>> {
175 if let Some(access) = &self.access {
177 access.check_full_path(full_path, RequestOpType::Read)?;
178 }
179
180 self.path().get_by_path(full_path).await
181 }
182
183 pub async fn create_new_with_path(
184 &self,
185 full_path: &str,
186 content_type: ObjectMapSimpleContentType,
187 ) -> BuckyResult<()> {
188 info!(
189 "op_path_env create_new_with_path: sid={}, path={}, content_type={:?}",
190 self.sid, full_path, content_type,
191 );
192
193 if let Some(access) = &self.access {
195 access.check_full_path(full_path, RequestOpType::Write)?;
196 }
197
198 let _write_lock = self.write_lock.lock().await;
199 self.lock.try_enter_path(full_path, self.sid).await?;
200 self.path()
201 .create_new_with_path(full_path, content_type)
202 .await
203 }
204
205 pub async fn insert_with_path(&self, full_path: &str, value: &ObjectId) -> BuckyResult<()> {
206 info!(
207 "op_path_env insert_with_path: sid={}, full_path={}, value={}",
208 self.sid, full_path, value
209 );
210
211 if let Some(access) = &self.access {
213 access.check_full_path(full_path, RequestOpType::Write)?;
214 }
215
216 let _write_lock = self.write_lock.lock().await;
217 self.lock.try_enter_path(full_path, self.sid).await?;
218 self.path().insert_with_path(full_path, value).await
219 }
220
221 pub async fn set_with_path(
222 &self,
223 full_path: &str,
224 value: &ObjectId,
225 prev_value: &Option<ObjectId>,
226 auto_insert: bool,
227 ) -> BuckyResult<Option<ObjectId>> {
228 info!(
229 "op_path_env set_with_path: sid={}, full_path={}, value={}, prev_value={:?}, auto_insert={}",
230 self.sid, full_path, value, prev_value, auto_insert,
231 );
232
233 if let Some(access) = &self.access {
235 access.check_full_path(full_path, RequestOpType::Write)?;
236 }
237
238 let _write_lock = self.write_lock.lock().await;
239 self.lock.try_enter_path(full_path, self.sid).await?;
240 self.path()
241 .set_with_path(full_path, value, prev_value, auto_insert)
242 .await
243 }
244
245 pub async fn remove_with_path(
246 &self,
247 full_path: &str,
248 prev_value: &Option<ObjectId>,
249 ) -> BuckyResult<Option<ObjectId>> {
250 info!(
251 "op_path_env remove_with_path: sid={}, full_path={}, prev_value={:?}",
252 self.sid, full_path, prev_value,
253 );
254
255 if let Some(access) = &self.access {
257 access.check_full_path(full_path, RequestOpType::Write)?;
258 }
259
260 let _write_lock = self.write_lock.lock().await;
261 self.lock.try_enter_path(full_path, self.sid).await?;
262 self.path().remove_with_path(full_path, prev_value).await
263 }
264
265 pub async fn get_by_key(&self, path: &str, key: &str) -> BuckyResult<Option<ObjectId>> {
267 if let Some(access) = &self.access {
269 access.check_path_key(path, key, RequestOpType::Read)?;
270 }
271
272 self.path().get_by_key(path, key).await
273 }
274
275 pub async fn create_new(
276 &self,
277 path: &str,
278 key: &str,
279 content_type: ObjectMapSimpleContentType,
280 ) -> BuckyResult<()> {
281 info!(
282 "op_path_env create_new: sid={}, path={}, key={}, content_type={:?}",
283 self.sid, path, key, content_type,
284 );
285
286 if let Some(access) = &self.access {
288 access.check_path_key(path, key, RequestOpType::Write)?;
289 }
290
291 let _write_lock = self.write_lock.lock().await;
292 self.lock
293 .try_enter_path_and_key(path, key, self.sid)
294 .await?;
295
296 self.path().create_new(path, key, content_type).await
297 }
298
299 pub async fn insert_with_key(
300 &self,
301 path: &str,
302 key: &str,
303 value: &ObjectId,
304 ) -> BuckyResult<()> {
305 info!(
306 "op_path_env insert_with_key: sid={}, path={}, key={}, value={}",
307 self.sid, path, key, value
308 );
309
310 if let Some(access) = &self.access {
312 access.check_path_key(path, key, RequestOpType::Write)?;
313 }
314
315 let _write_lock = self.write_lock.lock().await;
316 self.lock
317 .try_enter_path_and_key(path, key, self.sid)
318 .await?;
319 self.path().insert_with_key(path, key, value).await
320 }
321
322 pub async fn set_with_key(
323 &self,
324 path: &str,
325 key: &str,
326 value: &ObjectId,
327 prev_value: &Option<ObjectId>,
328 auto_insert: bool,
329 ) -> BuckyResult<Option<ObjectId>> {
330 info!(
331 "op_path_env set_with_key: sid={}, path={}, key={}, value={}, prev_value={:?}, auto_insert={}",
332 self.sid, path, key, value, prev_value, auto_insert,
333 );
334
335 if let Some(access) = &self.access {
337 access.check_path_key(path, key, RequestOpType::Write)?;
338 }
339
340 let _write_lock = self.write_lock.lock().await;
341 self.lock
342 .try_enter_path_and_key(path, key, self.sid)
343 .await?;
344 self.path()
345 .set_with_key(path, key, value, prev_value, auto_insert)
346 .await
347 }
348
349 pub async fn remove_with_key(
350 &self,
351 path: &str,
352 key: &str,
353 prev_value: &Option<ObjectId>,
354 ) -> BuckyResult<Option<ObjectId>> {
355 info!(
356 "op_path_env remove_with_key: sid={}, path={}, key={}, prev_value={:?}",
357 self.sid, path, key, prev_value,
358 );
359
360 if let Some(access) = &self.access {
362 access.check_path_key(path, key, RequestOpType::Write)?;
363 }
364
365 let _write_lock = self.write_lock.lock().await;
366 self.lock
367 .try_enter_path_and_key(path, key, self.sid)
368 .await?;
369 self.path().remove_with_key(path, key, prev_value).await
370 }
371
372 pub async fn contains(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
374 if let Some(access) = &self.access {
376 access.check_full_path(path, RequestOpType::Read)?;
377 }
378
379 self.path().contains(path, object_id).await
380 }
381
382 pub async fn insert(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
383 info!(
384 "op_path_env insert: sid={}, path={}, value={}",
385 self.sid, path, object_id,
386 );
387
388 if let Some(access) = &self.access {
390 access.check_full_path(path, RequestOpType::Write)?;
391 }
392
393 let _write_lock = self.write_lock.lock().await;
394 self.lock.try_enter_path(path, self.sid).await?;
395 self.path().insert(path, object_id).await
396 }
397
398 pub async fn remove(&self, path: &str, object_id: &ObjectId) -> BuckyResult<bool> {
399 info!(
400 "op_path_env remove: sid={}, path={}, value={}",
401 self.sid, path, object_id,
402 );
403
404 if let Some(access) = &self.access {
406 access.check_full_path(path, RequestOpType::Write)?;
407 }
408
409 let _write_lock = self.write_lock.lock().await;
410 self.lock.try_enter_path(path, self.sid).await?;
411 self.path().remove(path, object_id).await
412 }
413
414 async fn update_root(&self) -> BuckyResult<ObjectId> {
415 let new_root = self.path().root();
417 let current_root = self.root();
418 if new_root == current_root {
419 info!(
420 "op env commit but root not changed! sid={}, root={}",
421 self.sid, current_root
422 );
423 return Ok(new_root);
424 }
425
426 let this = &self;
427 let update = |root: ObjectId| async move {
428 if root != current_root {
432 info!("path_op_env commit but root changed, now will redo op list! sid={}, current_root={}, new_root={}",
433 this.sid, current_root, root);
434
435 this.cache.abort();
436
437 this.path().update_root(root.clone(), &new_root)?;
439
440 info!(
441 "will commit op list on root changed: {} -> {}",
442 current_root, root
443 );
444 this.path().commit_op_list().await?;
445 } else {
446 info!("will clear op list because root not changed during the operations: {}", root);
448 this.path().clear_op_list();
449 }
450
451 let new_root = this.path().root();
453 *this.path_snapshot().root.write().unwrap() = new_root.clone();
454
455 if let Err(e) = this.cache.gc(false, &new_root).await {
457 error!("path env's cache gc error! root={}, {}", root, e);
458 }
459
460 this.cache.commit().await?;
461
462 Ok(new_root)
463 };
464
465 let new_root = self.root_holder.update_root(Box::new(update)).await?;
466
467 Ok(new_root)
468 }
469
470 pub async fn update(&self) -> BuckyResult<ObjectId> {
471 let _write_lock = self.write_lock.lock().await;
472 self.update_root().await
473 }
474
475 pub async fn commit(self) -> BuckyResult<ObjectId> {
478 self.update_root().await
479 }
480
481 async fn unlock(&self) {
483 let req = PathUnlockRequest {
484 path: None,
485 sid: self.sid,
486 };
487
488 self.lock.unlock(req).await.unwrap();
489 }
490
491 pub fn abort(self) -> BuckyResult<()> {
492 info!("will abort path_op_env: sid={}", self.sid);
493
494 self.cache.abort();
496
497 Ok(())
498 }
499}
500
501#[derive(Clone)]
502pub struct ObjectMapPathOpEnvRef(Arc<ObjectMapPathOpEnv>);
503
504impl ObjectMapPathOpEnvRef {
505 pub fn new(env: ObjectMapPathOpEnv) -> Self {
506 Self(Arc::new(env))
507 }
508
509 fn into_raw(self) -> BuckyResult<ObjectMapPathOpEnv> {
510 let sid = self.sid();
511 let env = Arc::try_unwrap(self.0).map_err(|this| {
512 let msg = format!(
513 "path_op_env's ref_count is more than one! sid={}, ref={}",
514 sid,
515 Arc::strong_count(&this)
516 );
517 error!("{}", msg);
518 BuckyError::new(BuckyErrorCode::ErrorState, msg)
519 })?;
520
521 Ok(env)
522 }
523
524 pub fn is_dropable(&self) -> bool {
525 Arc::strong_count(&self.0) == 1
526 }
527
528 pub async fn commit(self) -> BuckyResult<ObjectId> {
529 let env = self.into_raw()?;
530
531 env.commit().await
532 }
533
534 pub fn abort(self) -> BuckyResult<()> {
535 let env = self.into_raw()?;
536
537 env.abort()
538 }
539}
540
541impl std::ops::Deref for ObjectMapPathOpEnvRef {
542 type Target = Arc<ObjectMapPathOpEnv>;
543 fn deref(&self) -> &Arc<ObjectMapPathOpEnv> {
544 &self.0
545 }
546}