cyfs_meta_lib/
helper.rs

1use super::client::*;
2use cyfs_base::*;
3
4use async_std::sync::Mutex as AsyncMutex;
5use std::sync::Arc;
6
7pub struct MetaClientHelper;
8
9impl MetaClientHelper {
10    pub async fn get_object(
11        meta_client: &MetaClient,
12        object_id: &ObjectId,
13    ) -> BuckyResult<Option<(AnyNamedObject, Vec<u8>)>> {
14        let object_raw = match meta_client.get_raw_data(object_id).await {
15            Ok(v) => v,
16            Err(e) => {
17                if e.code() == BuckyErrorCode::NotFound {
18                    warn!(
19                        "get object from meta chain but not found! obj={} err={}",
20                        object_id, e
21                    );
22
23                    return Ok(None);
24                } else {
25                    let msg = format!(
26                        "load object from meta chain failed! obj={} err={}",
27                        object_id, e
28                    );
29                    error!("{}", msg);
30                    return Err(BuckyError::new(e.code(), msg));
31                }
32            }
33        };
34
35        info!("get object from meta success: {}", object_id);
36        let (object, _) = AnyNamedObject::raw_decode(&object_raw).map_err(|e| {
37            let msg = format!("invalid object format! obj={} err={}", object_id, e);
38            error!("{}", msg);
39
40            BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
41        })?;
42
43        // 校验一下对象id,看是否匹配
44        let id = object.calculate_id();
45        if id != *object_id {
46            let msg = format!(
47                "get object from meta but got unmatch object id! expected={}, got={}",
48                object_id, id
49            );
50            error!("{}", msg);
51
52            return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
53        }
54
55        let resp = (object, object_raw);
56
57        Ok(Some(resp))
58    }
59}
60
61struct MetaClientObjectItem {
62    object_raw: Option<Vec<u8>>,
63}
64
65impl MetaClientObjectItem {
66    pub fn into_object_raw(&self) -> Option<Vec<u8>> {
67        self.object_raw
68            .as_ref()
69            .map(|object_raw| object_raw.clone())
70    }
71}
72
73#[derive(Clone)]
74pub struct MetaClientHelperWithObjectCache {
75    objects: Arc<AsyncMutex<lru_time_cache::LruCache<ObjectId, MetaClientObjectItem>>>,
76}
77
78impl MetaClientHelperWithObjectCache {
79    pub fn new(timeout: std::time::Duration, capacity: usize) -> Self {
80        Self {
81            objects: Arc::new(AsyncMutex::new(
82                lru_time_cache::LruCache::with_expiry_duration_and_capacity(timeout, capacity),
83            )),
84        }
85    }
86
87    pub async fn clear_cache(&self) {
88        self.objects.lock().await.clear();
89    }
90
91    pub async fn get_object(
92        &self,
93        meta_client: &MetaClient,
94        object_id: &ObjectId,
95    ) -> BuckyResult<Option<(AnyNamedObject, Vec<u8>)>> {
96        Ok(self
97            .get_object_raw(meta_client, object_id)
98            .await?
99            .map(|object_raw| {
100                let (object, _) = AnyNamedObject::raw_decode(&object_raw).unwrap();
101                (object, object_raw)
102            }))
103    }
104
105    pub async fn get_object_raw(
106        &self,
107        meta_client: &MetaClient,
108        object_id: &ObjectId,
109    ) -> BuckyResult<Option<Vec<u8>>> {
110        let mut list = self.objects.lock().await;
111        // force remove expired items 
112        list.iter();
113
114        if let Some(item) = list.peek(object_id) {
115            return Ok(item.into_object_raw());
116        }
117
118        let ret = MetaClientHelper::get_object(meta_client, object_id).await?;
119        let ret = ret.map(|(_, object_raw)| object_raw);
120        let item = MetaClientObjectItem {
121            object_raw: ret.as_ref().map(|object_raw| object_raw.clone()),
122        };
123
124        list.insert(object_id.to_owned(), item);
125
126        Ok(ret)
127    }
128}