1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::lock::*;
4use super::path_env::*;
5use super::single_env::*;
6use super::isolate_path_env::*;
7use crate::*;
8
9use std::str::FromStr;
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12
13#[derive(Clone, Debug, Copy, Eq, PartialEq)]
14pub enum ObjectMapOpEnvType {
15 Path,
16 Single,
17 IsolatePath,
18}
19
20
21impl ObjectMapOpEnvType {
22 fn as_str(&self) -> &str {
23 match *self {
24 Self::Path => "path",
25 Self::Single => "single",
26 Self::IsolatePath => "isolate-path",
27 }
28 }
29}
30
31impl std::fmt::Display for ObjectMapOpEnvType {
32 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
33 write!(f, "{}", self.as_str())
34 }
35}
36
37impl FromStr for ObjectMapOpEnvType {
38 type Err = BuckyError;
39
40 fn from_str(value: &str) -> Result<Self, Self::Err> {
41 let ret = match value {
42 "path" => Self::Path,
43 "single" => Self::Single,
44 "isolate-path" => Self::IsolatePath,
45
46 v @ _ => {
47 let msg = format!("unknown op env type: {}", v);
48 error!("{}", msg);
49
50 return Err(BuckyError::new(BuckyErrorCode::InvalidData, msg));
51 }
52 };
53
54 Ok(ret)
55 }
56}
57
58#[derive(Clone, Debug, Copy)]
59pub struct OpEnvSessionIDHelper;
60
61const OP_ENV_PATH_FLAGS: u8 = 0b_00000000;
63const OP_ENV_SINGLE_FLAGS: u8 = 0b_00000001;
64const OP_ENV_ISOLATE_PATH_FLAGS: u8 = 0b_00000010;
65
66impl OpEnvSessionIDHelper {
67 pub fn get_flags(sid: u64) -> u8 {
68 (sid >> 62) as u8
69 }
70
71 pub fn get_type(sid: u64) -> BuckyResult<ObjectMapOpEnvType> {
72 let flags = Self::get_flags(sid);
73 if flags == OP_ENV_PATH_FLAGS {
74 Ok(ObjectMapOpEnvType::Path)
75 } else if flags == OP_ENV_SINGLE_FLAGS {
76 Ok(ObjectMapOpEnvType::Single)
77 } else if flags == OP_ENV_ISOLATE_PATH_FLAGS {
78 Ok(ObjectMapOpEnvType::IsolatePath)
79 } else {
80 let msg = format!("unknown op_ev sid flags: sid={}, flags={}", sid, flags);
81 error!("{}", msg);
82 Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg))
83 }
84 }
85
86 pub fn set_type(sid: u64, op_env_type: ObjectMapOpEnvType) -> u64 {
87 let flags = match op_env_type {
88 ObjectMapOpEnvType::Path => OP_ENV_PATH_FLAGS,
89 ObjectMapOpEnvType::Single => OP_ENV_SINGLE_FLAGS,
90 ObjectMapOpEnvType::IsolatePath => OP_ENV_ISOLATE_PATH_FLAGS,
91 };
92
93 let sid = sid & 0b_00111111_11111111_11111111_11111111_11111111_11111111_11111111_11111111;
96 let sid = sid | ((flags as u64) << 62);
99 sid
102 }
103}
104
105#[cfg(test)]
106mod test_sid {
107 use std::sync::atomic::AtomicU64;
108
109 use super::OpEnvSessionIDHelper;
110 use crate::*;
111
112 #[test]
113 fn test_sid() {
114 let sid = 123;
115 let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
116 assert_eq!(t, ObjectMapOpEnvType::Path);
117 let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::Single);
118 let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
119 assert_eq!(t, ObjectMapOpEnvType::Single);
120
121 let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::IsolatePath);
122 let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
123 assert_eq!(t, ObjectMapOpEnvType::IsolatePath);
124
125 let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::Path);
126 let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
127 assert_eq!(t, ObjectMapOpEnvType::Path);
128
129 let sid = AtomicU64::new(u64::MAX);
130 let ret = sid.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
131 assert_eq!(ret, u64::MAX);
132 let ret = sid.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
133 assert_eq!(ret, 0);
134 }
135}
136
137#[derive(Clone)]
138pub enum ObjectMapOpEnv {
139 Path(ObjectMapPathOpEnvRef),
140 Single(ObjectMapSingleOpEnvRef),
141 IsolatePath(ObjectMapIsolatePathOpEnvRef)
142}
143
144impl ObjectMapOpEnv {
145 pub fn sid(&self) -> u64 {
146 match self {
147 Self::Path(value) => value.sid(),
148 Self::Single(value) => value.sid(),
149 Self::IsolatePath(value) => value.sid(),
150 }
151 }
152
153 pub fn op_env_type(&self) -> ObjectMapOpEnvType {
154 match self {
155 Self::Path(_) => ObjectMapOpEnvType::Path,
156 Self::Single(_) => ObjectMapOpEnvType::Single,
157 Self::IsolatePath(_) => ObjectMapOpEnvType::IsolatePath,
158 }
159 }
160
161 pub fn path_op_env(&self, sid: u64) -> BuckyResult<ObjectMapPathOpEnvRef> {
162 match self {
163 Self::Path(value) => Ok(value.clone()),
164 _ => {
165 let msg = format!(
166 "unmatch env type, path_op_env expected, got {}! sid={}",
167 self.op_env_type(),
168 sid
169 );
170 error!("{}", msg);
171 Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
172 }
173 }
174 }
175
176 pub fn single_op_env(&self, sid: u64) -> BuckyResult<ObjectMapSingleOpEnvRef> {
177 match self {
178 Self::Single(value) => Ok(value.clone()),
179 _ => {
180 let msg = format!(
181 "unmatch env type, single_op_env expected, got {}! sid={}",
182 self.op_env_type(),
183 sid
184 );
185 error!("{}", msg);
186 Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
187 }
188 }
189 }
190
191 pub fn isolate_path_op_env(&self, sid: u64) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
192 match self {
193 Self::IsolatePath(value) => Ok(value.clone()),
194 _ => {
195 let msg = format!(
196 "unmatch env type, isolate_path_op_env expected, got {}! sid={}",
197 self.op_env_type(),
198 sid
199 );
200 error!("{}", msg);
201 Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
202 }
203 }
204 }
205
206 pub async fn get_current_root(&self) -> BuckyResult<ObjectId> {
207 match self {
208 ObjectMapOpEnv::Path(env) => Ok(env.root()),
209 ObjectMapOpEnv::Single(env) => match env.get_current_root().await {
210 Some(root) => Ok(root),
211 None => {
212 let msg = format!("single op_env root not been init yet! sid={}", env.sid());
213 error!("{}", msg);
214 Err(BuckyError::new(BuckyErrorCode::ErrorState, msg))
215 }
216 },
217 ObjectMapOpEnv::IsolatePath(env) => match env.root() {
218 Some(root) => Ok(root),
219 None => {
220 let msg = format!("isolate_path_op_env root not been init yet! sid={}", env.sid());
221 error!("{}", msg);
222 Err(BuckyError::new(BuckyErrorCode::ErrorState, msg))
223 }
224 },
225 }
226 }
227
228 pub async fn update(&self) -> BuckyResult<ObjectId> {
229 match self {
230 ObjectMapOpEnv::Path(env) => env.update().await,
231 ObjectMapOpEnv::Single(env) => env.update().await,
232 ObjectMapOpEnv::IsolatePath(env) => env.update().await,
233 }
234 }
235
236 pub async fn commit(self) -> BuckyResult<ObjectId> {
237 match self {
238 ObjectMapOpEnv::Path(env) => env.commit().await,
239 ObjectMapOpEnv::Single(env) => env.commit().await,
240 ObjectMapOpEnv::IsolatePath(env) => env.commit().await,
241 }
242 }
243
244 pub fn abort(self) -> BuckyResult<()> {
245 match self {
246 ObjectMapOpEnv::Path(env) => env.abort(),
247 ObjectMapOpEnv::Single(env) => env.abort(),
248 ObjectMapOpEnv::IsolatePath(env) => env.abort(),
249 }
250 }
251
252 pub fn is_dropable(&self) -> bool {
253 match self {
254 ObjectMapOpEnv::Path(env) => env.is_dropable(),
255 ObjectMapOpEnv::Single(env) => env.is_dropable(),
256 ObjectMapOpEnv::IsolatePath(env) => env.is_dropable(),
257 }
258 }
259}
260
261use std::collections::HashMap;
262
263pub struct OpEnvSourceInfo {
264 pub dec: ObjectId,
265 pub device: Option<DeviceId>,
266}
267
268impl std::fmt::Debug for OpEnvSourceInfo {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 std::fmt::Display::fmt(self, f)
271 }
272}
273
274impl std::fmt::Display for OpEnvSourceInfo {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 write!(f, "dec={}, device={:?}", self.dec, self.device)
277 }
278}
279
280struct ObjectMapOpEnvHolder {
281 last_access: u64,
282 op_env: ObjectMapOpEnv,
283 source: Option<OpEnvSourceInfo>,
284}
285
286const OP_ENV_EXPIRED_DURATION: u64 = 1000 * 1000 * 60 * 60;
287
288impl ObjectMapOpEnvHolder {
289 fn new(op_env: ObjectMapOpEnv, source: Option<OpEnvSourceInfo>) -> Self {
290 Self {
291 last_access: bucky_time_now(),
292 op_env,
293 source,
294 }
295 }
296
297 fn op_env(&self) -> &ObjectMapOpEnv {
298 &self.op_env
299 }
300
301 fn into_op_env(self) -> ObjectMapOpEnv {
302 self.op_env
303 }
304
305 fn is_gc_able(&self, now: u64) -> bool {
306 if self.op_env.is_dropable() {
307 if now - self.last_access > OP_ENV_EXPIRED_DURATION {
308 true
309 } else {
310 false
311 }
312 } else {
313 false
314 }
315 }
316
317 fn touch(&mut self) {
318 self.last_access = bucky_time_now();
319 }
320
321 fn compare_source(&self, source: Option<&OpEnvSourceInfo>) -> bool {
322 match &self.source {
323 Some(this) => match source {
324 Some(source) => {
325 if this.dec == source.dec && this.device == source.device {
326 true
327 } else {
328 false
329 }
330 }
331 None => false,
332 },
333 None => true,
334 }
335 }
336}
337
338#[derive(Clone)]
339pub struct ObjectMapOpEnvContainer {
340 all: Arc<Mutex<HashMap<u64, ObjectMapOpEnvHolder>>>,
341}
342
343impl ObjectMapOpEnvContainer {
344 pub(crate) fn new() -> Self {
345 let ret = Self {
346 all: Arc::new(Mutex::new(HashMap::new())),
347 };
348
349 ret.start_monitor();
351
352 ret
353 }
354
355 pub fn start_monitor(&self) {
356 let this = self.clone();
357 async_std::task::spawn(async move {
358 loop {
359 async_std::task::sleep(std::time::Duration::from_secs(60)).await;
360 this.gc_once();
361 }
362 });
363 }
364
365 fn gc_once(&self) {
366 let mut expired_list = vec![];
367 let now = bucky_time_now();
368 self.all.lock().unwrap().retain(|sid, op_env| {
369 if op_env.is_gc_able(now) {
370 expired_list.push((*sid, op_env.op_env().to_owned()));
371 false
372 } else {
373 true
374 }
375 });
376
377 self.gc_list(expired_list);
378 }
379
380 fn gc_list(&self, expired_list: Vec<(u64, ObjectMapOpEnv)>) {
382 for (sid, op_env) in expired_list {
383 warn!("will gc managed op_env on timeout: sid={}", sid);
384 if let Err(e) = op_env.abort() {
385 error!("op_env abort error! sid={}, {}", sid, e);
386 }
387 }
388 }
389
390 pub fn add_env(&self, env: ObjectMapOpEnv, source: Option<OpEnvSourceInfo>) {
391 let sid = env.sid();
392 let holder = ObjectMapOpEnvHolder::new(env, source);
393 let prev = self.all.lock().unwrap().insert(sid, holder);
394 assert!(prev.is_none());
395 }
396
397 pub fn get_op_env(
398 &self,
399 sid: u64,
400 source: Option<&OpEnvSourceInfo>,
401 ) -> BuckyResult<ObjectMapOpEnv> {
402 let mut list = self.all.lock().unwrap();
403 let ret = list.get_mut(&sid);
404 match ret {
405 Some(value) => {
406 if !value.compare_source(source) {
407 let msg = format!(
408 "get op_env but source does not match! sid={}, source={:?}, current={:?}",
409 sid, source, value.source
410 );
411 error!("{}", msg);
412 return Err(BuckyError::new(BuckyErrorCode::PermissionDenied, msg));
413 }
414
415 value.touch();
416 Ok(value.op_env().to_owned())
417 }
418 None => {
419 let msg = format!("op_env not found! sid={}", sid);
420 error!("{}", msg);
421 Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
422 }
423 }
424 }
425
426 pub fn get_path_op_env(
427 &self,
428 sid: u64,
429 source: Option<&OpEnvSourceInfo>,
430 ) -> BuckyResult<ObjectMapPathOpEnvRef> {
431 let op_env = self.get_op_env(sid, source)?;
432 op_env.path_op_env(sid)
433 }
434
435 pub fn get_single_op_env(
436 &self,
437 sid: u64,
438 source: Option<&OpEnvSourceInfo>,
439 ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
440 let op_env = self.get_op_env(sid, source)?;
441 op_env.single_op_env(sid)
442 }
443
444 pub fn get_isolate_path_op_env(
445 &self,
446 sid: u64,
447 source: Option<&OpEnvSourceInfo>,
448 ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
449 let op_env = self.get_op_env(sid, source)?;
450 op_env.isolate_path_op_env(sid)
451 }
452
453 pub async fn get_current_root(
454 &self,
455 sid: u64,
456 source: Option<&OpEnvSourceInfo>,
457 ) -> BuckyResult<ObjectId> {
458 let op_env = self.get_op_env(sid, source)?;
459
460 op_env.get_current_root().await
461 }
462
463 pub async fn update(
464 &self,
465 sid: u64,
466 source: Option<&OpEnvSourceInfo>,
467 ) -> BuckyResult<ObjectId> {
468 let op_env = self.get_op_env(sid, source)?;
469
470 op_env.update().await
471 }
472
473 pub async fn commit(
474 &self,
475 sid: u64,
476 source: Option<&OpEnvSourceInfo>,
477 ) -> BuckyResult<ObjectId> {
478 let item = self.remove(sid, source)?;
479
480 item.into_op_env().commit().await
481 }
482
483 pub fn abort(&self, sid: u64, source: Option<&OpEnvSourceInfo>) -> BuckyResult<()> {
484 let item = self.remove(sid, source)?;
485
486 item.into_op_env().abort()
487 }
488
489 fn remove(
490 &self,
491 sid: u64,
492 source: Option<&OpEnvSourceInfo>,
493 ) -> BuckyResult<ObjectMapOpEnvHolder> {
494 let mut all = self.all.lock().unwrap();
495 let ret = all.get(&sid);
496 if ret.is_none() {
497 let msg = format!("op_env not found! sid={}", sid);
498 error!("{}", msg);
499 return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
500 }
501 let value = ret.unwrap();
502 if !value.compare_source(source) {
503 let msg = format!(
504 "get op_env but source does not match! sid={}, source={:?}, current={:?}",
505 sid, source, value.source
506 );
507 error!("{}", msg);
508 return Err(BuckyError::new(BuckyErrorCode::PermissionDenied, msg));
509 }
510
511 drop(value);
512
513 let ret = all.remove(&sid);
514 Ok(ret.unwrap())
515 }
516}
517
518pub struct ObjectMapRootManager {
520 owner: Option<ObjectId>,
522 dec_id: Option<ObjectId>,
523
524 next_sid: AtomicU64,
526
527 root: ObjectMapRootHolder,
529
530 lock: ObjectMapPathLock,
532
533 cache: ObjectMapRootCacheRef,
535
536 all_envs: ObjectMapOpEnvContainer,
538}
539
540impl ObjectMapRootManager {
541 pub fn new(
542 owner: Option<ObjectId>,
543 dec_id: Option<ObjectId>,
544 noc: ObjectMapNOCCacheRef,
545 root: ObjectMapRootHolder,
546 ) -> Self {
547 use rand::Rng;
548 let mut rng = rand::thread_rng();
549 let sid1 = rng.gen::<u32>();
550 let sid2 = rng.gen::<u16>();
551 let begin_sid = sid1 as u64 * sid2 as u64;
552
553 let lock = ObjectMapPathLock::new();
554 let cache = ObjectMapRootMemoryCache::new_ref(dec_id.clone(), noc, 60 * 5, 1024);
555 Self {
556 owner,
557 dec_id,
558 next_sid: AtomicU64::new(begin_sid),
559 root,
560 lock,
561 cache,
562 all_envs: ObjectMapOpEnvContainer::new(),
563 }
564 }
565
566 fn next_sid(&self, op_env_type: ObjectMapOpEnvType) -> u64 {
567 let sid = self
568 .next_sid
569 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
570 OpEnvSessionIDHelper::set_type(sid, op_env_type)
572 }
573
574 pub fn owner(&self) -> &Option<ObjectId> {
575 &self.owner
576 }
577
578 pub fn dec_id(&self) -> &Option<ObjectId> {
579 &self.dec_id
580 }
581
582 pub fn get_current_root(&self) -> ObjectId {
583 self.root.get_current_root()
584 }
585
586 pub fn root_holder(&self) -> &ObjectMapRootHolder {
587 &self.root
588 }
589
590 pub fn root_cache(&self) -> &ObjectMapRootCacheRef {
591 &self.cache
592 }
593
594 pub fn managed_envs(&self) -> &ObjectMapOpEnvContainer {
595 &self.all_envs
596 }
597
598 pub fn create_op_env(
599 &self,
600 access: Option<OpEnvPathAccess>,
601 ) -> BuckyResult<ObjectMapPathOpEnvRef> {
602 let sid = self.next_sid(ObjectMapOpEnvType::Path);
603 let env = ObjectMapPathOpEnv::new(sid, &self.root, &self.lock, &self.cache, access);
604 let env = ObjectMapPathOpEnvRef::new(env);
605
606 Ok(env)
607 }
608
609 pub fn create_managed_op_env(
610 &self,
611 access: Option<OpEnvPathAccess>,
612 source: Option<OpEnvSourceInfo>,
613 ) -> BuckyResult<ObjectMapPathOpEnvRef> {
614 let env = self.create_op_env(access)?;
615
616 self.all_envs
617 .add_env(ObjectMapOpEnv::Path(env.clone()), source);
618
619 Ok(env)
620 }
621
622 pub fn create_single_op_env(
623 &self,
624 access: Option<OpEnvPathAccess>,
625 ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
626 let sid = self.next_sid(ObjectMapOpEnvType::Single);
627 let env = ObjectMapSingleOpEnv::new(
628 sid,
629 &self.root,
630 &self.cache,
631 access,
632 );
633 let env = ObjectMapSingleOpEnvRef::new(env);
634
635 Ok(env)
636 }
637
638 pub fn create_managed_single_op_env(
639 &self,
640 access: Option<OpEnvPathAccess>,
641 source: Option<OpEnvSourceInfo>,
642 ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
643 let env = self.create_single_op_env(access)?;
644 self.all_envs
645 .add_env(ObjectMapOpEnv::Single(env.clone()), source);
646
647 Ok(env)
648 }
649
650 pub fn create_isolate_path_op_env(
651 &self,
652 access: Option<OpEnvPathAccess>,
653 ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
654 let sid = self.next_sid(ObjectMapOpEnvType::IsolatePath);
655 let env = ObjectMapIsolatePathOpEnv::new(
656 sid,
657 &self.root,
658 &self.cache,
659 access,
660 );
661 let env = ObjectMapIsolatePathOpEnvRef::new(env);
662
663 Ok(env)
664 }
665
666 pub fn create_managed_isolate_path_op_env(
667 &self,
668 access: Option<OpEnvPathAccess>,
669 source: Option<OpEnvSourceInfo>,
670 ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
671 let env = self.create_isolate_path_op_env(access)?;
672 self.all_envs
673 .add_env(ObjectMapOpEnv::IsolatePath(env.clone()), source);
674
675 Ok(env)
676 }
677}
678
679pub type ObjectMapRootManagerRef = Arc<ObjectMapRootManager>;
680
681mod test_root {
682 use crate::*;
683 use std::future::Future;
684
685 async fn update_root<F, Fut>(update_root_fn: F) -> BuckyResult<()>
686 where
687 F: FnOnce(i32, i32) -> Fut,
688 Fut: Future<Output = BuckyResult<i32>>,
689 {
690 info!("begin exec update fn...");
691 let result = update_root_fn(1, 2).await?;
692 info!("end exec update fn: {}", result);
693
694 assert_eq!(result, 3);
695 Ok(())
696 }
697
698 #[test]
699 fn test_fn() {
700 crate::init_simple_log("test-root-fn", Some("debug"));
701
702 let update = |first: i32, second: i32| async move {
703 info!("will exec add: {} + {}", first, second);
704 Ok(first + second)
705 };
706
707 async_std::task::block_on(async move {
708 update_root(update).await.unwrap();
709 });
710 }
711}