Skip to main content

rio_rs/object_placement/
local.rs

1//! In-memory implementation of the trait [ObjectPlacement]
2
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6use async_trait::async_trait;
7
8use crate::object_placement::{ObjectPlacement, ObjectPlacementItem};
9use crate::ObjectId;
10
11type PlacementMap = Arc<RwLock<HashMap<String, String>>>;
12
13/// In-memory implementation of the trait [ObjectPlacement]
14#[derive(Default, Clone, Debug)]
15pub struct LocalObjectPlacement {
16    placement: PlacementMap,
17}
18
19#[async_trait]
20impl ObjectPlacement for LocalObjectPlacement {
21    async fn update(&self, object_placement: ObjectPlacementItem) {
22        let object_id = format!(
23            "{}.{}",
24            object_placement.object_id.0, object_placement.object_id.1
25        );
26        let mut placement_guard = self
27            .placement
28            .write()
29            .expect("Poisoned lock: ObjectPlacement map");
30        if let Some(address) = object_placement.server_address {
31            *placement_guard.entry(object_id).or_default() = address;
32        } else {
33            placement_guard.remove(&object_id);
34        }
35    }
36
37    async fn lookup(&self, object_id: &ObjectId) -> Option<String> {
38        let object_id = format!("{}.{}", object_id.0, object_id.1);
39        let placement_guard = self
40            .placement
41            .read()
42            .expect("Poisoned lock: ObjectPlacement map");
43        placement_guard.get(&object_id).cloned()
44    }
45
46    async fn clean_server(&self, address: String) {
47        let mut placement_guard = self
48            .placement
49            .write()
50            .expect("Poisoned lock: ObjectPlacement map");
51        placement_guard.retain(|_, v| *v != address);
52    }
53
54    async fn remove(&self, object_id: &ObjectId) {
55        let object_id = format!("{}.{}", object_id.0, object_id.1);
56        let mut placement_guard = self
57            .placement
58            .write()
59            .expect("Poisoned lock: ObjectPlacement map");
60        placement_guard.remove(&object_id);
61    }
62}
63
64#[cfg(test)]
65mod test {
66    use super::*;
67
68    #[tokio::test]
69    async fn local_object_placement_provider_is_clonable() {
70        let provider = LocalObjectPlacement::default();
71        let cloned_provider = provider.clone();
72
73        provider
74            .update(ObjectPlacementItem::new(
75                ObjectId("test".to_string(), "1".to_string()),
76                Some("0.0.0.0:80".to_string()),
77            ))
78            .await;
79
80        assert!(provider
81            .lookup(&ObjectId("test".to_string(), "1".to_string()))
82            .await
83            .is_some());
84        assert!(cloned_provider
85            .lookup(&ObjectId("test".to_string(), "1".to_string()))
86            .await
87            .is_some());
88
89        cloned_provider.clean_server("0.0.0.0:80".to_string()).await;
90
91        assert!(provider
92            .lookup(&ObjectId("test".to_string(), "1".to_string()))
93            .await
94            .is_none());
95        assert!(cloned_provider
96            .lookup(&ObjectId("test".to_string(), "1".to_string()))
97            .await
98            .is_none());
99    }
100}