bucky_objects/objects/object_map/
op_env.rs

1use super::access::OpEnvPathAccess;
2use super::cache::*;
3use super::lock::*;
4use super::path_env::*;
5use super::single_env::*;
6use super::isolate_path_env::*;
7use crate::*;
8
9use std::str::FromStr;
10use std::sync::atomic::AtomicU64;
11use std::sync::{Arc, Mutex};
12
13#[derive(Clone, Debug, Copy, Eq, PartialEq)]
14pub enum ObjectMapOpEnvType {
15    Path,
16    Single,
17    IsolatePath,
18}
19
20
21impl ObjectMapOpEnvType {
22    fn as_str(&self) -> &str {
23        match *self {
24            Self::Path => "path",
25            Self::Single => "single",
26            Self::IsolatePath => "isolate-path",
27        }
28    }
29}
30
31impl std::fmt::Display for ObjectMapOpEnvType {
32    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
33		write!(f, "{}", self.as_str())
34	}
35}
36
37impl FromStr for ObjectMapOpEnvType {
38    type Err = BuckyError;
39
40    fn from_str(value: &str) -> Result<Self, Self::Err> {
41        let ret = match value {
42            "path" => Self::Path,
43            "single" => Self::Single,
44            "isolate-path" => Self::IsolatePath,
45
46            v @ _ => {
47                let msg = format!("unknown op env type: {}", v);
48                error!("{}", msg);
49
50                return Err(BuckyError::new(BuckyErrorCode::InvalidData, msg));
51            }
52        };
53
54        Ok(ret)
55    }
56}
57
58#[derive(Clone, Debug, Copy)]
59pub struct OpEnvSessionIDHelper;
60
61// 最高位两位表示op_env的类型
62const OP_ENV_PATH_FLAGS: u8 = 0b_00000000;
63const OP_ENV_SINGLE_FLAGS: u8 = 0b_00000001;
64const OP_ENV_ISOLATE_PATH_FLAGS: u8 = 0b_00000010;
65
66impl OpEnvSessionIDHelper {
67    pub fn get_flags(sid: u64) -> u8 {
68        (sid >> 62) as u8
69    }
70
71    pub fn get_type(sid: u64) -> BuckyResult<ObjectMapOpEnvType> {
72        let flags = Self::get_flags(sid);
73        if flags == OP_ENV_PATH_FLAGS {
74            Ok(ObjectMapOpEnvType::Path)
75        } else if flags == OP_ENV_SINGLE_FLAGS {
76            Ok(ObjectMapOpEnvType::Single)
77        } else if flags == OP_ENV_ISOLATE_PATH_FLAGS {
78            Ok(ObjectMapOpEnvType::IsolatePath)
79        } else {
80            let msg = format!("unknown op_ev sid flags: sid={}, flags={}", sid, flags);
81            error!("{}", msg);
82            Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg))
83        }
84    }
85
86    pub fn set_type(sid: u64, op_env_type: ObjectMapOpEnvType) -> u64 {
87        let flags = match op_env_type {
88            ObjectMapOpEnvType::Path => OP_ENV_PATH_FLAGS,
89            ObjectMapOpEnvType::Single => OP_ENV_SINGLE_FLAGS,
90            ObjectMapOpEnvType::IsolatePath => OP_ENV_ISOLATE_PATH_FLAGS,
91        };
92
93        //assert!(Self::get_flags(sid) == 0);
94        //println!("prev clear: {:#x}", sid);
95        let sid = sid & 0b_00111111_11111111_11111111_11111111_11111111_11111111_11111111_11111111;
96        //println!("after clear: {:#x}", sid);
97
98        let sid = sid | ((flags as u64) << 62);
99        //println!("after set: {:#x}", sid);
100
101        sid
102    }
103}
104
105#[cfg(test)]
106mod test_sid {
107    use std::sync::atomic::AtomicU64;
108
109    use super::OpEnvSessionIDHelper;
110    use crate::*;
111
112    #[test]
113    fn test_sid() {
114        let sid = 123;
115        let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
116        assert_eq!(t, ObjectMapOpEnvType::Path);
117        let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::Single);
118        let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
119        assert_eq!(t, ObjectMapOpEnvType::Single);
120
121        let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::IsolatePath);
122        let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
123        assert_eq!(t, ObjectMapOpEnvType::IsolatePath);
124
125        let sid = OpEnvSessionIDHelper::set_type(sid, ObjectMapOpEnvType::Path);
126        let t = OpEnvSessionIDHelper::get_type(sid).unwrap();
127        assert_eq!(t, ObjectMapOpEnvType::Path);
128
129        let sid = AtomicU64::new(u64::MAX);
130        let ret = sid.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
131        assert_eq!(ret, u64::MAX);
132        let ret = sid.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
133        assert_eq!(ret, 0);
134    }
135}
136
137#[derive(Clone)]
138pub enum ObjectMapOpEnv {
139    Path(ObjectMapPathOpEnvRef),
140    Single(ObjectMapSingleOpEnvRef),
141    IsolatePath(ObjectMapIsolatePathOpEnvRef)
142}
143
144impl ObjectMapOpEnv {
145    pub fn sid(&self) -> u64 {
146        match self {
147            Self::Path(value) => value.sid(),
148            Self::Single(value) => value.sid(),
149            Self::IsolatePath(value) => value.sid(),
150        }
151    }
152
153    pub fn op_env_type(&self) -> ObjectMapOpEnvType {
154        match self {
155            Self::Path(_) => ObjectMapOpEnvType::Path,
156            Self::Single(_) => ObjectMapOpEnvType::Single,
157            Self::IsolatePath(_) => ObjectMapOpEnvType::IsolatePath,
158        }
159    }
160
161    pub fn path_op_env(&self, sid: u64) -> BuckyResult<ObjectMapPathOpEnvRef> {
162        match self {
163            Self::Path(value) => Ok(value.clone()),
164            _ => {
165                let msg = format!(
166                    "unmatch env type, path_op_env expected, got {}! sid={}",
167                    self.op_env_type(),
168                    sid
169                );
170                error!("{}", msg);
171                Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
172            }
173        }
174    }
175
176    pub fn single_op_env(&self, sid: u64) -> BuckyResult<ObjectMapSingleOpEnvRef> {
177        match self {
178            Self::Single(value) => Ok(value.clone()),
179            _ => {
180                let msg = format!(
181                    "unmatch env type, single_op_env expected, got {}! sid={}",
182                    self.op_env_type(),
183                    sid
184                );
185                error!("{}", msg);
186                Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
187            }
188        }
189    }
190
191    pub fn isolate_path_op_env(&self, sid: u64) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
192        match self {
193            Self::IsolatePath(value) => Ok(value.clone()),
194            _ => {
195                let msg = format!(
196                    "unmatch env type, isolate_path_op_env expected, got {}! sid={}",
197                    self.op_env_type(),
198                    sid
199                );
200                error!("{}", msg);
201                Err(BuckyError::new(BuckyErrorCode::Unmatch, msg))
202            }
203        }
204    }
205
206    pub async fn get_current_root(&self) -> BuckyResult<ObjectId> {
207        match self {
208            ObjectMapOpEnv::Path(env) => Ok(env.root()),
209            ObjectMapOpEnv::Single(env) => match env.get_current_root().await {
210                Some(root) => Ok(root),
211                None => {
212                    let msg = format!("single op_env root not been init yet! sid={}", env.sid());
213                    error!("{}", msg);
214                    Err(BuckyError::new(BuckyErrorCode::ErrorState, msg))
215                }
216            },
217            ObjectMapOpEnv::IsolatePath(env) => match env.root() {
218                Some(root) => Ok(root),
219                None => {
220                    let msg = format!("isolate_path_op_env root not been init yet! sid={}", env.sid());
221                    error!("{}", msg);
222                    Err(BuckyError::new(BuckyErrorCode::ErrorState, msg))
223                }
224            },
225        }
226    }
227
228    pub async fn update(&self) -> BuckyResult<ObjectId> {
229        match self {
230            ObjectMapOpEnv::Path(env) => env.update().await,
231            ObjectMapOpEnv::Single(env) => env.update().await,
232            ObjectMapOpEnv::IsolatePath(env) => env.update().await,
233        }
234    }
235
236    pub async fn commit(self) -> BuckyResult<ObjectId> {
237        match self {
238            ObjectMapOpEnv::Path(env) => env.commit().await,
239            ObjectMapOpEnv::Single(env) => env.commit().await,
240            ObjectMapOpEnv::IsolatePath(env) => env.commit().await,
241        }
242    }
243
244    pub fn abort(self) -> BuckyResult<()> {
245        match self {
246            ObjectMapOpEnv::Path(env) => env.abort(),
247            ObjectMapOpEnv::Single(env) => env.abort(),
248            ObjectMapOpEnv::IsolatePath(env) => env.abort(),
249        }
250    }
251
252    pub fn is_dropable(&self) -> bool {
253        match self {
254            ObjectMapOpEnv::Path(env) => env.is_dropable(),
255            ObjectMapOpEnv::Single(env) => env.is_dropable(),
256            ObjectMapOpEnv::IsolatePath(env) => env.is_dropable(),
257        }
258    }
259}
260
261use std::collections::HashMap;
262
263pub struct OpEnvSourceInfo {
264    pub dec: ObjectId,
265    pub device: Option<DeviceId>,
266}
267
268impl std::fmt::Debug for OpEnvSourceInfo {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        std::fmt::Display::fmt(self, f)
271    }
272}
273
274impl std::fmt::Display for OpEnvSourceInfo {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        write!(f, "dec={}, device={:?}", self.dec, self.device)
277    }
278}
279
280struct ObjectMapOpEnvHolder {
281    last_access: u64,
282    op_env: ObjectMapOpEnv,
283    source: Option<OpEnvSourceInfo>,
284}
285
286const OP_ENV_EXPIRED_DURATION: u64 = 1000 * 1000 * 60 * 60;
287
288impl ObjectMapOpEnvHolder {
289    fn new(op_env: ObjectMapOpEnv, source: Option<OpEnvSourceInfo>) -> Self {
290        Self {
291            last_access: bucky_time_now(),
292            op_env,
293            source,
294        }
295    }
296
297    fn op_env(&self) -> &ObjectMapOpEnv {
298        &self.op_env
299    }
300
301    fn into_op_env(self) -> ObjectMapOpEnv {
302        self.op_env
303    }
304
305    fn is_gc_able(&self, now: u64) -> bool {
306        if self.op_env.is_dropable() {
307            if now - self.last_access > OP_ENV_EXPIRED_DURATION {
308                true
309            } else {
310                false
311            }
312        } else {
313            false
314        }
315    }
316
317    fn touch(&mut self) {
318        self.last_access = bucky_time_now();
319    }
320
321    fn compare_source(&self, source: Option<&OpEnvSourceInfo>) -> bool {
322        match &self.source {
323            Some(this) => match source {
324                Some(source) => {
325                    if this.dec == source.dec && this.device == source.device {
326                        true
327                    } else {
328                        false
329                    }
330                }
331                None => false,
332            },
333            None => true,
334        }
335    }
336}
337
338#[derive(Clone)]
339pub struct ObjectMapOpEnvContainer {
340    all: Arc<Mutex<HashMap<u64, ObjectMapOpEnvHolder>>>,
341}
342
343impl ObjectMapOpEnvContainer {
344    pub(crate) fn new() -> Self {
345        let ret = Self {
346            all: Arc::new(Mutex::new(HashMap::new())),
347        };
348
349        // 自动启动定期gc
350        ret.start_monitor();
351
352        ret
353    }
354
355    pub fn start_monitor(&self) {
356        let this = self.clone();
357        async_std::task::spawn(async move {
358            loop {
359                async_std::task::sleep(std::time::Duration::from_secs(60)).await;
360                this.gc_once();
361            }
362        });
363    }
364
365    fn gc_once(&self) {
366        let mut expired_list = vec![];
367        let now = bucky_time_now();
368        self.all.lock().unwrap().retain(|sid, op_env| {
369            if op_env.is_gc_able(now) {
370                expired_list.push((*sid, op_env.op_env().to_owned()));
371                false
372            } else {
373                true
374            }
375        });
376
377        self.gc_list(expired_list);
378    }
379
380    // 回收超时的op_env列表
381    fn gc_list(&self, expired_list: Vec<(u64, ObjectMapOpEnv)>) {
382        for (sid, op_env) in expired_list {
383            warn!("will gc managed op_env on timeout: sid={}", sid);
384            if let Err(e) = op_env.abort() {
385                error!("op_env abort error! sid={}, {}", sid, e);
386            }
387        }
388    }
389
390    pub fn add_env(&self, env: ObjectMapOpEnv, source: Option<OpEnvSourceInfo>) {
391        let sid = env.sid();
392        let holder = ObjectMapOpEnvHolder::new(env, source);
393        let prev = self.all.lock().unwrap().insert(sid, holder);
394        assert!(prev.is_none());
395    }
396
397    pub fn get_op_env(
398        &self,
399        sid: u64,
400        source: Option<&OpEnvSourceInfo>,
401    ) -> BuckyResult<ObjectMapOpEnv> {
402        let mut list = self.all.lock().unwrap();
403        let ret = list.get_mut(&sid);
404        match ret {
405            Some(value) => {
406                if !value.compare_source(source) {
407                    let msg = format!(
408                        "get op_env but source does not match! sid={}, source={:?}, current={:?}",
409                        sid, source, value.source
410                    );
411                    error!("{}", msg);
412                    return Err(BuckyError::new(BuckyErrorCode::PermissionDenied, msg));
413                }
414
415                value.touch();
416                Ok(value.op_env().to_owned())
417            }
418            None => {
419                let msg = format!("op_env not found! sid={}", sid);
420                error!("{}", msg);
421                Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
422            }
423        }
424    }
425
426    pub fn get_path_op_env(
427        &self,
428        sid: u64,
429        source: Option<&OpEnvSourceInfo>,
430    ) -> BuckyResult<ObjectMapPathOpEnvRef> {
431        let op_env = self.get_op_env(sid, source)?;
432        op_env.path_op_env(sid)
433    }
434
435    pub fn get_single_op_env(
436        &self,
437        sid: u64,
438        source: Option<&OpEnvSourceInfo>,
439    ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
440        let op_env = self.get_op_env(sid, source)?;
441        op_env.single_op_env(sid)
442    }
443
444    pub fn get_isolate_path_op_env(
445        &self,
446        sid: u64,
447        source: Option<&OpEnvSourceInfo>,
448    ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
449        let op_env = self.get_op_env(sid, source)?;
450        op_env.isolate_path_op_env(sid)
451    }
452
453    pub async fn get_current_root(
454        &self,
455        sid: u64,
456        source: Option<&OpEnvSourceInfo>,
457    ) -> BuckyResult<ObjectId> {
458        let op_env = self.get_op_env(sid, source)?;
459
460        op_env.get_current_root().await
461    }
462
463    pub async fn update(
464        &self,
465        sid: u64,
466        source: Option<&OpEnvSourceInfo>,
467    ) -> BuckyResult<ObjectId> {
468        let op_env = self.get_op_env(sid, source)?;
469
470        op_env.update().await
471    }
472
473    pub async fn commit(
474        &self,
475        sid: u64,
476        source: Option<&OpEnvSourceInfo>,
477    ) -> BuckyResult<ObjectId> {
478        let item = self.remove(sid, source)?;
479
480        item.into_op_env().commit().await
481    }
482
483    pub fn abort(&self, sid: u64, source: Option<&OpEnvSourceInfo>) -> BuckyResult<()> {
484        let item = self.remove(sid, source)?;
485
486        item.into_op_env().abort()
487    }
488
489    fn remove(
490        &self,
491        sid: u64,
492        source: Option<&OpEnvSourceInfo>,
493    ) -> BuckyResult<ObjectMapOpEnvHolder> {
494        let mut all = self.all.lock().unwrap();
495        let ret = all.get(&sid);
496        if ret.is_none() {
497            let msg = format!("op_env not found! sid={}", sid);
498            error!("{}", msg);
499            return Err(BuckyError::new(BuckyErrorCode::NotFound, msg));
500        }
501        let value = ret.unwrap();
502        if !value.compare_source(source) {
503            let msg = format!(
504                "get op_env but source does not match! sid={}, source={:?}, current={:?}",
505                sid, source, value.source
506            );
507            error!("{}", msg);
508            return Err(BuckyError::new(BuckyErrorCode::PermissionDenied, msg));
509        }
510
511        drop(value);
512
513        let ret = all.remove(&sid);
514        Ok(ret.unwrap())
515    }
516}
517
518// 用来管理root的管理器
519pub struct ObjectMapRootManager {
520    // ObjectMap的核心属性
521    owner: Option<ObjectId>,
522    dec_id: Option<ObjectId>,
523
524    // 为每个op_env分配唯一的sid
525    next_sid: AtomicU64,
526
527    // 所属的root
528    root: ObjectMapRootHolder,
529
530    // 一个root所有env共享一个锁管理器
531    lock: ObjectMapPathLock,
532
533    // root级别的cache
534    cache: ObjectMapRootCacheRef,
535
536    // 所有托管的env
537    all_envs: ObjectMapOpEnvContainer,
538}
539
540impl ObjectMapRootManager {
541    pub fn new(
542        owner: Option<ObjectId>,
543        dec_id: Option<ObjectId>,
544        noc: ObjectMapNOCCacheRef,
545        root: ObjectMapRootHolder,
546    ) -> Self {
547        use rand::Rng;
548        let mut rng = rand::thread_rng();
549        let sid1 = rng.gen::<u32>();
550        let sid2 = rng.gen::<u16>();
551        let begin_sid = sid1 as u64 * sid2 as u64;
552
553        let lock = ObjectMapPathLock::new();
554        let cache = ObjectMapRootMemoryCache::new_ref(dec_id.clone(), noc, 60 * 5, 1024);
555        Self {
556            owner,
557            dec_id,
558            next_sid: AtomicU64::new(begin_sid),
559            root,
560            lock,
561            cache,
562            all_envs: ObjectMapOpEnvContainer::new(),
563        }
564    }
565
566    fn next_sid(&self, op_env_type: ObjectMapOpEnvType) -> u64 {
567        let sid = self
568            .next_sid
569            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
570        // 设置类型
571        OpEnvSessionIDHelper::set_type(sid, op_env_type)
572    }
573
574    pub fn owner(&self) -> &Option<ObjectId> {
575        &self.owner
576    }
577
578    pub fn dec_id(&self) -> &Option<ObjectId> {
579        &self.dec_id
580    }
581
582    pub fn get_current_root(&self) -> ObjectId {
583        self.root.get_current_root()
584    }
585
586    pub fn root_holder(&self) -> &ObjectMapRootHolder {
587        &self.root
588    }
589
590    pub fn root_cache(&self) -> &ObjectMapRootCacheRef {
591        &self.cache
592    }
593
594    pub fn managed_envs(&self) -> &ObjectMapOpEnvContainer {
595        &self.all_envs
596    }
597
598    pub fn create_op_env(
599        &self,
600        access: Option<OpEnvPathAccess>,
601    ) -> BuckyResult<ObjectMapPathOpEnvRef> {
602        let sid = self.next_sid(ObjectMapOpEnvType::Path);
603        let env = ObjectMapPathOpEnv::new(sid, &self.root, &self.lock, &self.cache, access);
604        let env = ObjectMapPathOpEnvRef::new(env);
605
606        Ok(env)
607    }
608
609    pub fn create_managed_op_env(
610        &self,
611        access: Option<OpEnvPathAccess>,
612        source: Option<OpEnvSourceInfo>,
613    ) -> BuckyResult<ObjectMapPathOpEnvRef> {
614        let env = self.create_op_env(access)?;
615
616        self.all_envs
617            .add_env(ObjectMapOpEnv::Path(env.clone()), source);
618
619        Ok(env)
620    }
621
622    pub fn create_single_op_env(
623        &self,
624        access: Option<OpEnvPathAccess>,
625    ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
626        let sid = self.next_sid(ObjectMapOpEnvType::Single);
627        let env = ObjectMapSingleOpEnv::new(
628            sid,
629            &self.root,
630            &self.cache,
631            access,
632        );
633        let env = ObjectMapSingleOpEnvRef::new(env);
634
635        Ok(env)
636    }
637
638    pub fn create_managed_single_op_env(
639        &self,
640        access: Option<OpEnvPathAccess>,
641        source: Option<OpEnvSourceInfo>,
642    ) -> BuckyResult<ObjectMapSingleOpEnvRef> {
643        let env = self.create_single_op_env(access)?;
644        self.all_envs
645            .add_env(ObjectMapOpEnv::Single(env.clone()), source);
646
647        Ok(env)
648    }
649
650    pub fn create_isolate_path_op_env(
651        &self,
652        access: Option<OpEnvPathAccess>,
653    ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
654        let sid = self.next_sid(ObjectMapOpEnvType::IsolatePath);
655        let env = ObjectMapIsolatePathOpEnv::new(
656            sid,
657            &self.root,
658            &self.cache,
659            access,
660        );
661        let env = ObjectMapIsolatePathOpEnvRef::new(env);
662
663        Ok(env)
664    }
665
666    pub fn create_managed_isolate_path_op_env(
667        &self,
668        access: Option<OpEnvPathAccess>,
669        source: Option<OpEnvSourceInfo>,
670    ) -> BuckyResult<ObjectMapIsolatePathOpEnvRef> {
671        let env = self.create_isolate_path_op_env(access)?;
672        self.all_envs
673            .add_env(ObjectMapOpEnv::IsolatePath(env.clone()), source);
674
675        Ok(env)
676    }
677}
678
679pub type ObjectMapRootManagerRef = Arc<ObjectMapRootManager>;
680
681mod test_root {
682    use crate::*;
683    use std::future::Future;
684
685    async fn update_root<F, Fut>(update_root_fn: F) -> BuckyResult<()>
686    where
687        F: FnOnce(i32, i32) -> Fut,
688        Fut: Future<Output = BuckyResult<i32>>,
689    {
690        info!("begin exec update fn...");
691        let result = update_root_fn(1, 2).await?;
692        info!("end exec update fn: {}", result);
693
694        assert_eq!(result, 3);
695        Ok(())
696    }
697
698    #[test]
699    fn test_fn() {
700        crate::init_simple_log("test-root-fn", Some("debug"));
701
702        let update = |first: i32, second: i32| async move {
703            info!("will exec add: {} + {}", first, second);
704            Ok(first + second)
705        };
706
707        async_std::task::block_on(async move {
708            update_root(update).await.unwrap();
709        });
710    }
711}