danube_core/metadata/
memory_store.rs1use std::time::Duration;
2
3use super::{
4 errors::Result,
5 store::{KeyValueVersion, MetaOptions, MetadataStore},
6 watch::{WatchEvent, WatchStream},
7 MetadataError,
8};
9
10use async_trait::async_trait;
11use dashmap::{mapref::one::RefMut, DashMap};
12use serde_json::Value;
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16
17#[derive(Debug, Clone)]
20pub struct MemoryStore {
21 inner: Arc<DashMap<String, BTreeMap<String, Value>>>,
22 watchers: Arc<DashMap<String, broadcast::Sender<WatchEvent>>>,
23}
24
25impl MemoryStore {
26 #[allow(dead_code)]
27 pub async fn new() -> Result<Self> {
28 Ok(MemoryStore {
29 inner: Arc::new(DashMap::new()),
30 watchers: Arc::new(DashMap::new()),
31 })
32 }
33
34 fn notify_watchers(&self, event: WatchEvent) {
35 let key_str = match &event {
36 WatchEvent::Put { key, .. } => String::from_utf8_lossy(key).to_string(),
37 WatchEvent::Delete { key, .. } => String::from_utf8_lossy(key).to_string(),
38 };
39 for entry in self.watchers.iter() {
40 if key_str.starts_with(entry.key()) {
41 let _ = entry.value().send(event.clone());
42 }
43 }
44 }
45
46 fn get_map(&self, path: &str) -> Result<RefMut<'_, String, BTreeMap<String, Value>>> {
47 let parts: Vec<&str> = path.split('/').take(3).collect();
48
49 if parts.len() < 3 {
51 return Err(MetadataError::InvalidArguments(format!(
52 "Path must have at least 3 segments: {}",
53 path
54 )));
55 }
56
57 let key = parts.join("/");
58 let bmap = self.inner.entry(key.to_owned()).or_insert(BTreeMap::new());
59
60 Ok(bmap)
61 }
62}
63
64#[async_trait]
65impl MetadataStore for MemoryStore {
66 async fn get(&self, path: &str, _get_options: MetaOptions) -> Result<Option<Value>> {
68 let bmap = self.get_map(path)?;
69
70 let parts: Vec<&str> = path.split('/').skip(3).collect();
71 let key = parts.join("/");
72
73 match bmap.get(&key) {
74 Some(value) => Ok(Some(value.clone())),
75 None => Ok(None),
76 }
77 }
78
79 async fn get_childrens(&self, path: &str) -> Result<Vec<String>> {
82 let parts: Vec<&str> = path.split('/').skip(3).collect();
83 let minimum_path = parts.join("/");
84
85 let path_parts: Vec<&str> = path.split('/').take(3).collect();
87 let map_prefix = path_parts.join("/");
88 let map_key = map_prefix.clone();
89
90 let mut child_paths = Vec::new();
91
92 if let Some(bmap_ref) = self.inner.get(&map_key) {
94 for key in bmap_ref.keys() {
95 if key.starts_with(&minimum_path)
96 && key.len() > minimum_path.len()
97 && key.chars().nth(minimum_path.len()).unwrap() == '/'
98 {
99 let full_path = format!("{}/{}", map_prefix, key);
101 child_paths.push(full_path);
102 }
103 }
104 }
105 Ok(child_paths)
106 }
107
108 async fn put(&self, path: &str, value: Value, _put_options: MetaOptions) -> Result<()> {
110 let mut bmap = self.get_map(path)?;
111
112 let parts: Vec<&str> = path.split('/').skip(3).collect();
113 let key = parts.join("/");
114
115 if key.is_empty() {
117 return Err(MetadataError::InvalidArguments(format!(
118 "Path must have a key component: {}",
119 path
120 )));
121 }
122
123 let value_bytes = serde_json::to_vec(&value)?;
124 bmap.insert(key, value);
125
126 self.notify_watchers(WatchEvent::Put {
127 key: path.as_bytes().to_vec(),
128 value: value_bytes,
129 mod_revision: None,
130 version: None,
131 });
132
133 Ok(())
134 }
135
136 async fn delete(&self, path: &str) -> Result<()> {
138 let mut bmap = self.get_map(path)?;
139
140 let parts: Vec<&str> = path.split('/').skip(3).collect();
141 let key = parts.join("/");
142
143 if key.is_empty() {
144 return Err(MetadataError::Unknown("wrong path".to_string()).into());
145 }
146
147 let _value = bmap.remove(&key);
148
149 self.notify_watchers(WatchEvent::Delete {
150 key: path.as_bytes().to_vec(),
151 mod_revision: None,
152 version: None,
153 });
154
155 Ok(())
156 }
157
158 async fn watch(&self, prefix: &str) -> Result<WatchStream> {
159 let (tx, rx) = broadcast::channel(256);
160 self.watchers.insert(prefix.to_string(), tx);
161 Ok(WatchStream::from_broadcast(rx))
162 }
163
164 async fn put_with_ttl(&self, key: &str, value: Value, _ttl: Duration) -> Result<()> {
165 self.put(key, value, MetaOptions::None).await
167 }
168
169 async fn allocate_monotonic_id(&self, counter_key: &str) -> Result<u64> {
170 let current = self.get(counter_key, MetaOptions::None).await?;
172 let next = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
173 self.put(counter_key, serde_json::json!(next), MetaOptions::None)
174 .await?;
175 Ok(next)
176 }
177
178 async fn get_bulk(&self, prefix: &str) -> Result<Vec<KeyValueVersion>> {
179 let map_parts: Vec<&str> = prefix.split('/').take(3).collect();
180 if map_parts.len() < 3 {
181 return Err(MetadataError::InvalidArguments(format!(
182 "Prefix must have at least 3 segments: {}",
183 prefix
184 )));
185 }
186 let map_prefix = map_parts.join("/");
187 let suffix_parts: Vec<&str> = prefix.split('/').skip(3).collect();
188 let suffix = suffix_parts.join("/");
189
190 let mut out: Vec<KeyValueVersion> = Vec::new();
191 if let Some(bmap_ref) = self.inner.get(&map_prefix) {
192 for (k, v) in bmap_ref.iter() {
193 if k.starts_with(&suffix) {
194 let full_key = format!("{}/{}", map_prefix, k);
195 let value_bytes = serde_json::to_vec(v)?;
196 out.push(KeyValueVersion {
197 key: full_key,
198 value: value_bytes,
199 version: 0,
200 });
201 }
202 }
203 }
204 Ok(out)
205 }
206}