Skip to main content

danube_core/metadata/
memory_store.rs

1use std::time::Duration;
2
3use super::{
4    errors::Result,
5    store::{KeyValueVersion, MetaOptions, MetadataStore},
6    watch::{WatchEvent, WatchStream},
7    MetadataError,
8};
9
10use async_trait::async_trait;
11use dashmap::{mapref::one::RefMut, DashMap};
12use serde_json::Value;
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16
17/// MemoryStore is a simple in-memory key-value store that implements the MetadataStore trait.
18/// SHOULD BE USED ONLY FOR TESTING PURPOSES
19#[derive(Debug, Clone)]
20pub struct MemoryStore {
21    inner: Arc<DashMap<String, BTreeMap<String, Value>>>,
22    watchers: Arc<DashMap<String, broadcast::Sender<WatchEvent>>>,
23}
24
25impl MemoryStore {
26    #[allow(dead_code)]
27    pub async fn new() -> Result<Self> {
28        Ok(MemoryStore {
29            inner: Arc::new(DashMap::new()),
30            watchers: Arc::new(DashMap::new()),
31        })
32    }
33
34    fn notify_watchers(&self, event: WatchEvent) {
35        let key_str = match &event {
36            WatchEvent::Put { key, .. } => String::from_utf8_lossy(key).to_string(),
37            WatchEvent::Delete { key, .. } => String::from_utf8_lossy(key).to_string(),
38        };
39        for entry in self.watchers.iter() {
40            if key_str.starts_with(entry.key()) {
41                let _ = entry.value().send(event.clone());
42            }
43        }
44    }
45
46    fn get_map(&self, path: &str) -> Result<RefMut<'_, String, BTreeMap<String, Value>>> {
47        let parts: Vec<&str> = path.split('/').take(3).collect();
48
49        // Validate that path has at least 3 parts (empty, namespace, category)
50        if parts.len() < 3 {
51            return Err(MetadataError::InvalidArguments(format!(
52                "Path must have at least 3 segments: {}",
53                path
54            )));
55        }
56
57        let key = parts.join("/");
58        let bmap = self.inner.entry(key.to_owned()).or_insert(BTreeMap::new());
59
60        Ok(bmap)
61    }
62}
63
64#[async_trait]
65impl MetadataStore for MemoryStore {
66    // Read the value of one key, identified by the path
67    async fn get(&self, path: &str, _get_options: MetaOptions) -> Result<Option<Value>> {
68        let bmap = self.get_map(path)?;
69
70        let parts: Vec<&str> = path.split('/').skip(3).collect();
71        let key = parts.join("/");
72
73        match bmap.get(&key) {
74            Some(value) => Ok(Some(value.clone())),
75            None => Ok(None),
76        }
77    }
78
79    // Return all the paths that are children to the specific path.
80    // Returns full paths to match ETCD behavior for production compatibility
81    async fn get_childrens(&self, path: &str) -> Result<Vec<String>> {
82        let parts: Vec<&str> = path.split('/').skip(3).collect();
83        let minimum_path = parts.join("/");
84
85        // Get the map prefix (first 3 parts of the path)
86        let path_parts: Vec<&str> = path.split('/').take(3).collect();
87        let map_prefix = path_parts.join("/");
88        let map_key = map_prefix.clone();
89
90        let mut child_paths = Vec::new();
91
92        // Access the map directly without creating a new one if it doesn't exist
93        if let Some(bmap_ref) = self.inner.get(&map_key) {
94            for key in bmap_ref.keys() {
95                if key.starts_with(&minimum_path)
96                    && key.len() > minimum_path.len()
97                    && key.chars().nth(minimum_path.len()).unwrap() == '/'
98                {
99                    // Return full path to match ETCD behavior
100                    let full_path = format!("{}/{}", map_prefix, key);
101                    child_paths.push(full_path);
102                }
103            }
104        }
105        Ok(child_paths)
106    }
107
108    // Put a new value for a given key
109    async fn put(&self, path: &str, value: Value, _put_options: MetaOptions) -> Result<()> {
110        let mut bmap = self.get_map(path)?;
111
112        let parts: Vec<&str> = path.split('/').skip(3).collect();
113        let key = parts.join("/");
114
115        // Validate that there's actually a key to store (path must have more than 3 parts)
116        if key.is_empty() {
117            return Err(MetadataError::InvalidArguments(format!(
118                "Path must have a key component: {}",
119                path
120            )));
121        }
122
123        let value_bytes = serde_json::to_vec(&value)?;
124        bmap.insert(key, value);
125
126        self.notify_watchers(WatchEvent::Put {
127            key: path.as_bytes().to_vec(),
128            value: value_bytes,
129            mod_revision: None,
130            version: None,
131        });
132
133        Ok(())
134    }
135
136    // Delete the key / value from the store
137    async fn delete(&self, path: &str) -> Result<()> {
138        let mut bmap = self.get_map(path)?;
139
140        let parts: Vec<&str> = path.split('/').skip(3).collect();
141        let key = parts.join("/");
142
143        if key.is_empty() {
144            return Err(MetadataError::Unknown("wrong path".to_string()).into());
145        }
146
147        let _value = bmap.remove(&key);
148
149        self.notify_watchers(WatchEvent::Delete {
150            key: path.as_bytes().to_vec(),
151            mod_revision: None,
152            version: None,
153        });
154
155        Ok(())
156    }
157
158    async fn watch(&self, prefix: &str) -> Result<WatchStream> {
159        let (tx, rx) = broadcast::channel(256);
160        self.watchers.insert(prefix.to_string(), tx);
161        Ok(WatchStream::from_broadcast(rx))
162    }
163
164    async fn put_with_ttl(&self, key: &str, value: Value, _ttl: Duration) -> Result<()> {
165        // MemoryStore ignores TTL — testing only. Real TTL comes with the Raft backend.
166        self.put(key, value, MetaOptions::None).await
167    }
168
169    async fn allocate_monotonic_id(&self, counter_key: &str) -> Result<u64> {
170        // Simple atomic-ish increment for testing. Real atomicity comes from Raft.
171        let current = self.get(counter_key, MetaOptions::None).await?;
172        let next = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
173        self.put(counter_key, serde_json::json!(next), MetaOptions::None)
174            .await?;
175        Ok(next)
176    }
177
178    async fn get_bulk(&self, prefix: &str) -> Result<Vec<KeyValueVersion>> {
179        let map_parts: Vec<&str> = prefix.split('/').take(3).collect();
180        if map_parts.len() < 3 {
181            return Err(MetadataError::InvalidArguments(format!(
182                "Prefix must have at least 3 segments: {}",
183                prefix
184            )));
185        }
186        let map_prefix = map_parts.join("/");
187        let suffix_parts: Vec<&str> = prefix.split('/').skip(3).collect();
188        let suffix = suffix_parts.join("/");
189
190        let mut out: Vec<KeyValueVersion> = Vec::new();
191        if let Some(bmap_ref) = self.inner.get(&map_prefix) {
192            for (k, v) in bmap_ref.iter() {
193                if k.starts_with(&suffix) {
194                    let full_key = format!("{}/{}", map_prefix, k);
195                    let value_bytes = serde_json::to_vec(v)?;
196                    out.push(KeyValueVersion {
197                        key: full_key,
198                        value: value_bytes,
199                        version: 0,
200                    });
201                }
202            }
203        }
204        Ok(out)
205    }
206}