sqjson/
db.rs

1// src/your_db.rs
2use std::collections::{HashMap, HashSet};
3use std::convert::TryInto;
4use std::fs;
5// no external cache; rely on pager directly
6use serde_json::Value;
7
8use crate::{error::DbError, pager::Pager};
9
10/// Small file-backed key/value JSON DB with optional secondary indexes.
11/// - Page 0 is reserved for the index (key -> page_id) serialized as JSON.
12/// - Pages start at 1 for records.
13/// - Each record page layout: [u32 little-endian length][json bytes][padding...]
14///
15/// Improvements vs original:
16/// - safer page length checks (no panics),
17/// - selective secondary indexing (only index fields present in `indexed_fields` set; empty set = index all),
18/// - helper methods: list_keys, count, compact, etc.
19pub struct YourDb {
20    pager: Pager,
21    index: HashMap<String, u32>, // maps key -> page_id
22    /// secondary_indexes: field -> (value -> set of keys)
23    secondary_indexes: HashMap<String, HashMap<Value, HashSet<String>>>,
24    next_page_id: u32, // next free page id
25    /// If non-empty, only fields present here will be indexed. If empty, index all fields.
26    indexed_fields: HashSet<String>,
27}
28
29impl YourDb {
30    /// Open existing DB or create a new one if path missing.
31    pub fn open(path: &str) -> Result<Self, DbError> {
32        let pager = Pager::new(path)?;
33
34        // Read index from page 0 (length-prefixed JSON). Empty or invalid = new DB.
35        let index: HashMap<String, u32> = match pager.get_page(0) {
36            Ok(page0) => match Self::read_json_from_page(page0) {
37                Ok(val) => serde_json::from_value(val).unwrap_or_default(),
38                Err(_) => HashMap::new(),
39            },
40            Err(_) => HashMap::new(),
41        };
42
43        // compute next_page_id safely
44        let next_page_id = index.values().copied().max().unwrap_or(0).saturating_add(1);
45
46        // create empty secondary index map
47        let mut secondary_indexes: HashMap<String, HashMap<Value, HashSet<String>>> = HashMap::new();
48
49        // build secondary indexes by scanning existing records (index all fields by default)
50        for (key, &page_id) in &index {
51            if let Ok(data) = pager.get_page(page_id) {
52                match Self::read_json_from_page(&data) {
53                    Ok(val) => {
54                        if let Some(obj) = val.as_object() {
55                            for (field, field_value) in obj {
56                                let entry = secondary_indexes
57                                    .entry(field.clone())
58                                    .or_default()
59                                    .entry(field_value.clone())
60                                    .or_default();
61                                entry.insert(key.clone());
62                            }
63                        }
64                    }
65                    Err(_) => {
66                        // skip corrupted record page
67                        continue;
68                    }
69                }
70            }
71        }
72
73        Ok(Self {
74            pager,
75            index,
76            secondary_indexes,
77            next_page_id,
78            indexed_fields: HashSet::new(),
79        })
80    }
81
82    /// Helper: read JSON Value from a page byte slice with safety checks.
83    fn read_json_from_page(page: &[u8]) -> Result<Value, DbError> {
84        if page.len() < 4 {
85            return Err(DbError::Other("Corrupted page: too short".into()));
86        }
87        let len_bytes: [u8; 4] = page[..4].try_into().map_err(|_| DbError::Other("Failed to read length".into()))?;
88        let len = u32::from_le_bytes(len_bytes) as usize;
89        if page.len() < 4 + len {
90            return Err(DbError::Other("Corrupted page: length out of bounds".into()));
91        }
92        let json = serde_json::from_slice(&page[4..4 + len]).map_err(|e| DbError::Other(format!("Invalid JSON on page: {}", e)))?;
93        Ok(json)
94    }
95
96    // cache removed — use pager directly
97
98    /// Put key -> JSON value (replaces existing if present).
99    pub fn put(&mut self, key: &str, value: &Value) -> Result<(), DbError> {
100        if key.is_empty() {
101            return Err(DbError::Other("Key must not be empty".into()));
102        }
103
104        let json_bytes = serde_json::to_vec(value).map_err(|e| DbError::Other(format!("Serialize error: {}", e)))?;
105        if (json_bytes.len() + 4) > crate::util::PAGE_SIZE {
106            return Err(DbError::Other("JSON too large for page".into()));
107        }
108
109        // Remove old secondary index entries if present
110        if let Some(existing_val) = self.get(key)? {
111            if let Some(obj) = existing_val.as_object() {
112                for (field, field_value) in obj {
113                    if !self.indexed_fields.is_empty() && !self.indexed_fields.contains(field) {
114                        continue;
115                    }
116                    if let Some(val_map) = self.secondary_indexes.get_mut(field) {
117                        if let Some(keys) = val_map.get_mut(field_value) {
118                            keys.remove(key);
119                            if keys.is_empty() {
120                                val_map.remove(field_value);
121                            }
122                        }
123                        if val_map.is_empty() {
124                            self.secondary_indexes.remove(field);
125                        }
126                    }
127                }
128            }
129        }
130
131        // Prepare page data
132        let mut page_data = vec![0u8; crate::util::PAGE_SIZE];
133        page_data[..4].copy_from_slice(&(json_bytes.len() as u32).to_le_bytes());
134        page_data[4..4 + json_bytes.len()].copy_from_slice(&json_bytes);
135
136        // choose page id: reuse existing if present (optional), otherwise next_page_id
137        let page_id = if let Some(&existing_page_id) = self.index.get(key) {
138            existing_page_id
139        } else {
140            let pid = self.next_page_id;
141            self.next_page_id = self.next_page_id.saturating_add(1);
142            pid
143        };
144
145        // write page and update index
146        self.pager.write_page(page_id, &page_data)?;
147        // no cache layer; pager writes directly
148
149        self.index.insert(key.to_string(), page_id);
150
151        // Update secondary indexes for the new value
152        if let Some(obj) = value.as_object() {
153            for (field, field_value) in obj {
154                if !self.indexed_fields.is_empty() && !self.indexed_fields.contains(field) {
155                    continue;
156                }
157                let entry = self.secondary_indexes
158                    .entry(field.clone())
159                    .or_default()
160                    .entry(field_value.clone())
161                    .or_default();
162                entry.insert(key.to_string());
163            }
164        }
165
166        Ok(())
167    }
168
169    /// Get full JSON value for a key.
170    pub fn get(&self, key: &str) -> Result<Option<Value>, DbError> {
171        if let Some(&page_id) = self.index.get(key) {
172            let data = self.pager.get_page(page_id)?;
173            match Self::read_json_from_page(&data) {
174                Ok(json) => Ok(Some(json)),
175                Err(_) => Ok(None), // treat corrupted page as missing
176            }
177        } else {
178            Ok(None)
179        }
180    }
181
182    /// Flush: write index to page 0 as length-prefixed JSON.
183    pub fn flush(&mut self) -> Result<(), DbError> {
184        let index_bytes = serde_json::to_vec(&self.index).map_err(|e| DbError::Other(format!("Index serialize error: {}", e)))?;
185        let mut page = vec![0u8; crate::util::PAGE_SIZE];
186        if index_bytes.len() + 4 > page.len() {
187            return Err(DbError::Other("Index too large for a single page".into()));
188        }
189        page[..4].copy_from_slice(&(index_bytes.len() as u32).to_le_bytes());
190        page[4..4 + index_bytes.len()].copy_from_slice(&index_bytes);
191        self.pager.write_page(0, &page)?;
192        self.pager.flush()
193    }
194
195    /// Delete a key and remove from secondary indexes.
196    pub fn delete(&mut self, key: &str) -> Result<(), DbError> {
197        // remove from secondary indexes
198        if let Some(existing_val) = self.get(key)? {
199            if let Some(obj) = existing_val.as_object() {
200                for (field, field_value) in obj {
201                    if !self.indexed_fields.is_empty() && !self.indexed_fields.contains(field) {
202                        continue;
203                    }
204                    if let Some(val_map) = self.secondary_indexes.get_mut(field) {
205                        if let Some(keys) = val_map.get_mut(field_value) {
206                            keys.remove(key);
207                            if keys.is_empty() {
208                                val_map.remove(field_value);
209                            }
210                        }
211                        if val_map.is_empty() {
212                            self.secondary_indexes.remove(field);
213                        }
214                    }
215                }
216            }
217        }
218
219        if self.index.remove(key).is_some() {
220            Ok(())
221        } else {
222            Err(DbError::Other("Key not found".into()))
223        }
224    }
225
226    /// Return a specific field from the JSON stored at `key`.
227    pub fn get_field(&self, key: &str, field: &str) -> Result<Option<Value>, DbError> {
228        if let Some(val) = self.get(key)? {
229            Ok(val.get(field).cloned())
230        } else {
231            Ok(None)
232        }
233    }
234
235    /// Filter all records with a predicate function. Be careful: this iterates records.
236    pub fn filter<F>(&self, predicate: F) -> Result<Vec<(String, Value)>, DbError>
237    where
238        F: Fn(&Value) -> bool,
239    {
240        let mut results = Vec::new();
241        for key in self.index.keys() {
242            if let Some(val) = self.get(key)? {
243                if predicate(&val) {
244                    results.push((key.clone(), val));
245                }
246            }
247        }
248        Ok(results)
249    }
250
251    /// Query by field = value (via secondary index)
252    pub fn query(&self, field: &str, value: impl Into<Value>) -> Result<Vec<String>, DbError> {
253        let val = value.into();
254        if let Some(val_map) = self.secondary_indexes.get(field) {
255            if let Some(keys) = val_map.get(&val) {
256                return Ok(keys.iter().cloned().collect());
257            }
258        }
259        Ok(vec![])
260    }
261
262    /// Query page (limit/offset applied)
263    pub fn query_page(&self, field: &str, value: impl Into<Value>, limit: usize, offset: usize) -> Result<Vec<String>, DbError> {
264        let keys = self.query(field, value)?;
265        Ok(keys.into_iter().skip(offset).take(limit).collect())
266    }
267
268    /// Export query results (key -> value) to a JSON file
269    pub fn export_query(&self, field: &str, value: impl Into<Value>, path: &str) -> Result<(), DbError> {
270        let keys = self.query(field, value)?;
271        let mut map = HashMap::new();
272        for k in keys {
273            if let Some(v) = self.get(&k)? {
274                map.insert(k, v);
275            }
276        }
277        fs::write(path, serde_json::to_string_pretty(&map).map_err(|e| DbError::Other(format!("{}", e)))?)?;
278        Ok(())
279    }
280
281    /// Export entire DB to a JSON file.
282    pub fn export_to_file(&self, path: &str) -> Result<(), DbError> {
283        let mut map = HashMap::new();
284        for (k, _) in &self.index {
285            if let Some(v) = self.get(k)? {
286                map.insert(k.clone(), v);
287            }
288        }
289        let json = serde_json::to_string_pretty(&map).map_err(|e| DbError::Other(format!("{}", e)))?;
290        fs::write(path, json)?;
291        Ok(())
292    }
293
294    /// Print all keys->json to stdout (for debugging)
295    pub fn show_all(&self) -> Result<(), DbError> {
296        for (key, &page_id) in &self.index {
297            let page = self.pager.get_page(page_id)?;
298            let json: serde_json::Value = match Self::read_json_from_page(&page) {
299                Ok(v) => v,
300                Err(_) => serde_json::json!(null),
301            };
302            println!("{} => {}", key, json);
303        }
304        Ok(())
305    }
306
307    /// Range query (numeric) for a field between min..=max
308    pub fn range_query(&self, field: &str, min: Value, max: Value) -> Result<Vec<String>, DbError> {
309        if let Some(val_map) = self.secondary_indexes.get(field) {
310            let mut results = Vec::new();
311            let min_n = min.as_f64();
312            let max_n = max.as_f64();
313            if min_n.is_none() || max_n.is_none() {
314                return Ok(vec![]);
315            }
316            let min_n = min_n.unwrap();
317            let max_n = max_n.unwrap();
318            for (val, keys) in val_map {
319                if let Some(n) = val.as_f64() {
320                    if n >= min_n && n <= max_n {
321                        results.extend(keys.iter().cloned());
322                    }
323                }
324            }
325            return Ok(results);
326        }
327        Ok(vec![])
328    }
329
330    /// Update a single field on a record (updates secondary indexes accordingly).
331    pub fn update_field(&mut self, key: &str, field: &str, new_value: Value) -> Result<(), DbError> {
332        if let Some(mut val) = self.get(key)? {
333            if let Some(obj) = val.as_object_mut() {
334                // remove old secondary index reference
335                if let Some(old_val) = obj.get(field) {
336                    if !self.indexed_fields.is_empty() && !self.indexed_fields.contains(field) {
337                        // not indexed -> just update JSON
338                    } else if let Some(val_map) = self.secondary_indexes.get_mut(field) {
339                        if let Some(keys) = val_map.get_mut(old_val) {
340                            keys.remove(key);
341                            if keys.is_empty() {
342                                val_map.remove(old_val);
343                            }
344                        }
345                        if val_map.is_empty() {
346                            self.secondary_indexes.remove(field);
347                        }
348                    }
349                }
350                // insert new value
351                obj.insert(field.to_string(), new_value.clone());
352            }
353            // call put to write page and update indexes (put will re-add secondary index entries)
354            self.put(key, &val)?;
355        } else {
356            return Err(DbError::Other("Key not found".into()));
357        }
358        Ok(())
359    }
360
361    /// Search text fields for substring (only works for string-valued indexed fields).
362    pub fn search_contains(&self, field: &str, substring: &str) -> Result<Vec<String>, DbError> {
363        let mut results = Vec::new();
364        if let Some(val_map) = self.secondary_indexes.get(field) {
365            for (val, keys) in val_map {
366                if let Some(s) = val.as_str() {
367                    if s.contains(substring) {
368                        results.extend(keys.iter().cloned());
369                    }
370                }
371            }
372        }
373        Ok(results)
374    }
375
376    /// Return all keys (lightweight)
377    pub fn list_keys(&self) -> Vec<String> {
378        self.index.keys().cloned().collect()
379    }
380
381    /// Count records
382    pub fn count(&self) -> usize {
383        self.index.len()
384    }
385
386    /// Compact DB: rewrite record pages sequentially, rebuild index and secondary indexes.
387    /// Note: this rewrites pages starting at 1 and will update page ids accordingly.
388    pub fn compact(&mut self) -> Result<(), DbError> {
389        let mut new_index = HashMap::new();
390        let mut new_secondary: HashMap<String, HashMap<Value, HashSet<String>>> = HashMap::new();
391        let mut new_page_id: u32 = 1;
392
393        for key in self.index.keys().cloned().collect::<Vec<_>>() {
394            if let Some(val) = self.get(&key)? {
395                let json_bytes = serde_json::to_vec(&val).map_err(|e| DbError::Other(format!("Serialize error: {}", e)))?;
396                if (json_bytes.len() + 4) > crate::util::PAGE_SIZE {
397                    return Err(DbError::Other(format!("Record too large during compact: key={}", key)));
398                }
399                let mut page_data = vec![0u8; crate::util::PAGE_SIZE];
400                page_data[..4].copy_from_slice(&(json_bytes.len() as u32).to_le_bytes());
401                page_data[4..4 + json_bytes.len()].copy_from_slice(&json_bytes);
402
403                self.pager.write_page(new_page_id, &page_data)?;
404
405                new_index.insert(key.clone(), new_page_id);
406
407                if let Some(obj) = val.as_object() {
408                    for (field, field_value) in obj {
409                        if !self.indexed_fields.is_empty() && !self.indexed_fields.contains(field) {
410                            continue;
411                        }
412                        new_secondary
413                            .entry(field.clone())
414                            .or_default()
415                            .entry(field_value.clone())
416                            .or_default()
417                            .insert(key.clone());
418                    }
419                }
420
421                new_page_id = new_page_id.saturating_add(1);
422            }
423        }
424
425        self.index = new_index;
426        self.secondary_indexes = new_secondary;
427        self.next_page_id = new_page_id;
428        self.flush()
429    }
430}