1use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::path::Path;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13use crate::Result;
14
15#[derive(Clone)]
17pub struct DataStore {
18 inner: Arc<RwLock<StoreInner>>,
20 persistence_path: Option<String>,
22}
23
24#[derive(Default, Serialize, Deserialize)]
25struct StoreInner {
26 collections: HashMap<String, Vec<Value>>,
28 values: HashMap<String, Value>,
30}
31
32impl DataStore {
33 pub fn new() -> Self {
35 Self {
36 inner: Arc::new(RwLock::new(StoreInner::default())),
37 persistence_path: None,
38 }
39 }
40
41 pub fn with_persistence(path: impl Into<String>) -> Result<Self> {
43 let path = path.into();
44 let inner = if Path::new(&path).exists() {
45 let content = std::fs::read_to_string(&path)?;
46 serde_json::from_str(&content)?
47 } else {
48 StoreInner::default()
49 };
50
51 Ok(Self {
52 inner: Arc::new(RwLock::new(inner)),
53 persistence_path: Some(path),
54 })
55 }
56
57 pub async fn load_collection(&self, name: &str, path: impl AsRef<Path>) -> Result<()> {
59 let content = std::fs::read_to_string(path)?;
60 let data: Vec<Value> = serde_json::from_str(&content)?;
61
62 let mut inner = self.inner.write().await;
63 inner.collections.insert(name.to_string(), data);
64
65 Ok(())
66 }
67
68 pub async fn get_collection(&self, name: &str) -> Option<Vec<Value>> {
70 let inner = self.inner.read().await;
71 inner.collections.get(name).cloned()
72 }
73
74 pub async fn get_item(&self, collection: &str, index: usize) -> Option<Value> {
76 let inner = self.inner.read().await;
77 inner.collections.get(collection)?.get(index).cloned()
78 }
79
80 pub async fn find_by(&self, collection: &str, field: &str, value: &Value) -> Vec<Value> {
82 let inner = self.inner.read().await;
83 inner
84 .collections
85 .get(collection)
86 .map(|items| {
87 items
88 .iter()
89 .filter(|item| item.get(field).map(|v| v == value).unwrap_or(false))
90 .cloned()
91 .collect()
92 })
93 .unwrap_or_default()
94 }
95
96 pub async fn find_one_by(&self, collection: &str, field: &str, value: &Value) -> Option<Value> {
98 self.find_by(collection, field, value)
99 .await
100 .into_iter()
101 .next()
102 }
103
104 pub async fn create(&self, collection: &str, mut item: Value) -> Result<Value> {
106 let mut inner = self.inner.write().await;
107
108 let items = inner
109 .collections
110 .entry(collection.to_string())
111 .or_insert_with(Vec::new);
112
113 if item.get("id").is_none() {
115 let max_id = items
116 .iter()
117 .filter_map(|i| i.get("id").and_then(|v| v.as_u64()))
118 .max()
119 .unwrap_or(0);
120 let id = max_id + 1;
121 if let Value::Object(ref mut map) = item {
122 map.insert("id".to_string(), Value::Number(id.into()));
123 }
124 }
125
126 if let Value::Object(ref mut map) = item {
129 map.entry("created_at".to_string()).or_insert_with(|| {
130 Value::String(
131 chrono::Local::now()
132 .format("%Y-%m-%dT%H:%M:%S%.3f")
133 .to_string(),
134 )
135 });
136 }
137
138 items.push(item.clone());
139 drop(inner);
140
141 self.persist().await?;
142 Ok(item)
143 }
144
145 pub async fn update(
147 &self,
148 collection: &str,
149 id: &Value,
150 updates: Value,
151 ) -> Result<Option<Value>> {
152 let mut inner = self.inner.write().await;
153
154 if let Some(items) = inner.collections.get_mut(collection) {
155 for item in items.iter_mut() {
156 if item.get("id") == Some(id) {
157 if let (Value::Object(existing), Value::Object(new)) = (item, &updates) {
159 for (key, value) in new {
160 existing.insert(key.clone(), value.clone());
161 }
162 }
163 let updated = items.iter().find(|i| i.get("id") == Some(id)).cloned();
164 drop(inner);
165 self.persist().await?;
166 return Ok(updated);
167 }
168 }
169 }
170
171 Ok(None)
172 }
173
174 pub async fn delete(&self, collection: &str, id: &Value) -> Result<bool> {
176 let mut inner = self.inner.write().await;
177
178 if let Some(items) = inner.collections.get_mut(collection) {
179 let original_len = items.len();
180 items.retain(|item| item.get("id") != Some(id));
181 let deleted = items.len() < original_len;
182
183 drop(inner);
184 if deleted {
185 self.persist().await?;
186 }
187 return Ok(deleted);
188 }
189
190 Ok(false)
191 }
192
193 pub async fn set(&self, key: &str, value: Value) -> Result<()> {
195 let mut inner = self.inner.write().await;
196 inner.values.insert(key.to_string(), value);
197 drop(inner);
198 self.persist().await
199 }
200
201 pub async fn get(&self, key: &str) -> Option<Value> {
203 let inner = self.inner.read().await;
204 inner.values.get(key).cloned()
205 }
206
207 pub async fn atomic_modify<F>(&self, key: &str, f: F) -> Result<Value>
211 where
212 F: FnOnce(Option<&Value>) -> Value,
213 {
214 let mut inner = self.inner.write().await;
215 let new_value = f(inner.values.get(key));
216 inner.values.insert(key.to_string(), new_value.clone());
217 drop(inner);
218 self.persist().await?;
219 Ok(new_value)
220 }
221
222 pub async fn remove(&self, key: &str) -> Result<Option<Value>> {
224 let mut inner = self.inner.write().await;
225 let removed = inner.values.remove(key);
226 drop(inner);
227 self.persist().await?;
228 Ok(removed)
229 }
230
231 pub async fn as_context(&self) -> HashMap<String, Value> {
233 let inner = self.inner.read().await;
234 let mut context = HashMap::new();
235
236 for (name, items) in &inner.collections {
238 context.insert(name.clone(), Value::Array(items.clone()));
239 }
240
241 for (key, value) in &inner.values {
243 context.insert(key.clone(), value.clone());
244 }
245
246 context
247 }
248
249 pub async fn set_collection(&self, name: &str, items: Vec<Value>) -> Result<()> {
251 let mut inner = self.inner.write().await;
252 inner.collections.insert(name.to_string(), items);
253 drop(inner);
254 self.persist().await
255 }
256
257 async fn persist(&self) -> Result<()> {
259 if let Some(ref path) = self.persistence_path {
260 let inner = self.inner.read().await;
261 let content = serde_json::to_string_pretty(&*inner)?;
262 let tmp_path = format!("{}.tmp", path);
264 tokio::fs::write(&tmp_path, &content).await?;
265 tokio::fs::rename(&tmp_path, path).await?;
266 }
267 Ok(())
268 }
269}
270
271impl Default for DataStore {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use serde_json::json;
281
282 #[tokio::test]
283 async fn test_create_and_get() {
284 let store = DataStore::new();
285
286 let post = json!({
287 "title": "Hello World",
288 "content": "My first post"
289 });
290
291 let created = store.create("posts", post).await.unwrap();
292 assert_eq!(created.get("id"), Some(&json!(1)));
293
294 let posts = store.get_collection("posts").await.unwrap();
295 assert_eq!(posts.len(), 1);
296 }
297
298 #[tokio::test]
299 async fn test_find_by() {
300 let store = DataStore::new();
301
302 store
303 .create("users", json!({"name": "Alice", "role": "admin"}))
304 .await
305 .unwrap();
306 store
307 .create("users", json!({"name": "Bob", "role": "user"}))
308 .await
309 .unwrap();
310 store
311 .create("users", json!({"name": "Charlie", "role": "admin"}))
312 .await
313 .unwrap();
314
315 let admins = store.find_by("users", "role", &json!("admin")).await;
316 assert_eq!(admins.len(), 2);
317 }
318
319 #[tokio::test]
320 async fn test_update() {
321 let store = DataStore::new();
322
323 store
324 .create("posts", json!({"title": "Draft"}))
325 .await
326 .unwrap();
327
328 store
329 .update(
330 "posts",
331 &json!(1),
332 json!({"title": "Published", "status": "live"}),
333 )
334 .await
335 .unwrap();
336
337 let post = store.find_one_by("posts", "id", &json!(1)).await.unwrap();
338 assert_eq!(post.get("title"), Some(&json!("Published")));
339 assert_eq!(post.get("status"), Some(&json!("live")));
340 }
341
342 #[tokio::test]
343 async fn test_delete() {
344 let store = DataStore::new();
345
346 store
347 .create("posts", json!({"title": "To Delete"}))
348 .await
349 .unwrap();
350
351 let deleted = store.delete("posts", &json!(1)).await.unwrap();
352 assert!(deleted);
353
354 let posts = store.get_collection("posts").await.unwrap();
355 assert!(posts.is_empty());
356 }
357}