Skip to main content

moltendb_core/engine/
mod.rs

1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13//   state        — the actual document data: collection → (key → JSON value)
14//   storage      — the persistence layer (disk, encrypted, or OPFS)
15//   tx           — broadcast channel for real-time WebSocket notifications
16//   indexes      — field indexes for fast WHERE queries
17//   query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20//   open()      — native (server) build, opens a disk file
21//   open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types;      // LogEntry, DbError
27mod indexing;   // index_doc, unindex_doc, track_query, create_index
28mod storage;    // StorageBackend trait + concrete implementations
29mod config;     // DbConfig struct
30#[cfg(feature = "schema")]
31mod schema;     // JSON Schema validation
32mod operations; // get, get_all, insert_batch, update, delete, etc.
33
34// Re-export LogEntry so it can be used by tests and other crates.
35pub use types::{DbError, LogEntry};
36// Re-export DbConfig
37pub use config::DbConfig;
38// Re-export the StorageBackend trait so callers can use it without knowing
39// the internal module structure.
40pub use storage::{StorageBackend, EncryptedStorage};
41#[cfg(not(target_arch = "wasm32"))]
42pub use storage::{AsyncDiskStorage, SyncDiskStorage};
43
44// DashMap = concurrent hash map. DashSet = concurrent hash set.
45use dashmap::{DashMap, DashSet};
46use tracing::{info};
47// Value = dynamically-typed JSON value.
48use serde_json::Value;
49// Standard HashMap — used for return values from get operations.
50use std::collections::HashMap;
51// Arc = thread-safe reference-counted pointer.
52// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
53// the same underlying data.
54use std::ops::ControlFlow;
55use std::sync::Arc;
56// Tokio's broadcast channel: one sender, many receivers.
57// Used to push real-time change notifications to WebSocket subscribers.
58use tokio::sync::broadcast;
59
60/// The central database handle. Cheap to clone — all clones share the same state.
61///
62/// This struct is the public API of the engine. All database operations go
63/// through methods on this struct, which delegate to the operations module.
64#[derive(Clone)]
65pub struct Db {
66    /// The main document store.
67    /// Outer map: collection name (e.g. "users") → inner map.
68    /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
69    /// DashMap allows concurrent reads and writes from multiple threads.
70    state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
71
72    /// The storage backend — handles persistence to disk or OPFS.
73    /// `pub` so handlers can access it directly if needed (e.g. for compaction).
74    /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
75    pub storage: Arc<dyn StorageBackend>,
76
77    /// Broadcast channel sender for real-time change notifications.
78    /// When a document is inserted, updated, or deleted, a JSON event is sent
79    /// on this channel. WebSocket handlers subscribe to receive these events.
80    /// `pub` so the WebSocket handler in main.rs can call subscribe().
81    pub tx: broadcast::Sender<String>,
82
83    /// The index store.
84    /// Key format: "collection:field" (e.g. "users:role").
85    /// Value: field_value → set of document keys with that value.
86    /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
87    /// `pub` so handlers.rs can check for index existence directly.
88    pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
89
90    /// Query frequency counter for auto-indexing.
91    /// Key: "collection:field". Value: number of times queried.
92    /// When a field reaches 3 queries, an index is auto-created.
93    pub query_heatmap: Arc<DashMap<String, u32>>,
94
95    /// The maximum number of documents per collection to keep in RAM (Hot).
96    /// If a collection exceeds this, older documents are paged out to disk (Cold).
97    /// Default is 50,000.
98    pub hot_threshold: usize,
99
100    /// Max requests per window.
101    pub rate_limit_requests: u32,
102
103    /// Window size in seconds.
104    pub rate_limit_window: u64,
105
106    /// Maximum request body size in bytes.
107    pub max_body_size: usize,
108
109    /// Maximum keys allowed per request.
110    pub max_keys_per_request: usize,
111
112    /// Registered JSON schemas per collection.
113    /// Key: collection name → Value: (Original JSON, Compiled Validator).
114    #[cfg(feature = "schema")]
115    pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
116
117    /// Optional shell command to execute after a successful backup.
118    /// Supports the {SNAPSHOT_PATH} placeholder.
119    pub post_backup_script: Option<String>,
120
121    /// Whether tiered (hot+cold) storage mode is active.
122    pub tiered_mode: bool,
123
124    /// Timestamp of when this Db instance was opened, used for uptime calculation.
125    pub started_at: std::time::Instant,
126}
127
128impl Db {
129    /// Open (or create) a database at the given file path.
130    /// Only available on native (non-WASM) builds.
131    ///
132    /// `sync_mode`      — if true, use SyncDiskStorage (flush on every write).
133    ///                    if false, use AsyncDiskStorage (flush every 50ms).
134    ///                    Ignored when `tiered_mode` is true.
135    /// `tiered_mode`    — if true, use TieredStorage (hot + cold two-tier backend).
136    ///                    Hot writes go to the active log; cold data is archived and
137    ///                    read via mmap on startup. Best for large datasets (100k+ docs).
138    ///                    Enable with STORAGE_MODE=tiered environment variable.
139    /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
140    ///                    if None, data is stored in plaintext (not recommended).
141    #[cfg(not(target_arch = "wasm32"))]
142    pub fn open(config: DbConfig) -> Result<Self, DbError> {
143        let path = &config.path;
144        let sync_mode = config.sync_mode;
145        let tiered_mode = config.tiered_mode;
146        let hot_threshold = config.hot_threshold;
147        let rate_limit_requests = config.rate_limit_requests;
148        let rate_limit_window = config.rate_limit_window;
149        let max_body_size = config.max_body_size;
150        let max_keys_per_request = config.max_keys_per_request;
151        let encryption_key = config.encryption_key;
152        let post_backup_script = config.post_backup_script;
153
154        // Create the shared in-memory state containers.
155        let state = Arc::new(DashMap::new());
156        // Create the broadcast channel with a buffer of 100 messages.
157        // If the buffer fills up (no subscribers reading), old messages are dropped.
158        let (tx, _rx) = broadcast::channel(100);
159        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
160            Arc::new(Default::default());
161        let query_heatmap = Arc::new(Default::default());
162        #[cfg(feature = "schema")]
163        let schemas = Arc::new(DashMap::new());
164
165        // Ensure the parent directory exists.
166        if let Some(parent) = std::path::Path::new(path).parent() {
167            std::fs::create_dir_all(parent)?;
168        }
169
170        // Choose the base storage backend based on the configured mode.
171        //
172        //   tiered_mode = true  → TieredStorage: hot log (async writes) + cold log
173        //                         (mmap reads). Best for large datasets. The cold log
174        //                         accumulates promoted hot data and is paged by the OS.
175        //
176        //   sync_mode = true    → SyncDiskStorage: every write is flushed to disk
177        //                         immediately. Zero data loss, lower throughput.
178        //
179        //   default             → AsyncDiskStorage: writes buffered in memory, flushed
180        //                         every 50ms. Highest throughput, up to 50ms data loss.
181        let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
182            Arc::new(storage::TieredStorage::new(path)?)
183        } else if sync_mode {
184            Arc::new(storage::SyncDiskStorage::new(path)?)
185        } else {
186            Arc::new(storage::AsyncDiskStorage::new(path)?)
187        };
188
189        // Optionally wrap the base storage in EncryptedStorage.
190        // EncryptedStorage is transparent — it encrypts on write and decrypts
191        // on read, so the rest of the engine doesn't know encryption is happening.
192        let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
193            Arc::new(storage::EncryptedStorage::new(base_storage, &key))
194        } else {
195            base_storage
196        };
197
198        // Replay the log (or snapshot + delta) into the in-memory state.
199        // After this call, `state` and `indexes` reflect the persisted data.
200        storage::stream_into_state(
201            &*storage,
202            &state,
203            &indexes,
204            #[cfg(feature = "schema")] &schemas,
205        )?;
206
207        Ok(Self {
208            state,
209            storage,
210            tx,
211            indexes,
212            query_heatmap,
213            hot_threshold,
214            rate_limit_requests,
215            rate_limit_window,
216            max_body_size,
217            max_keys_per_request,
218            #[cfg(feature = "schema")]
219            schemas,
220            post_backup_script,
221            tiered_mode,
222            started_at: std::time::Instant::now(),
223        })
224    }
225
226    /// Open (or create) a database in the browser using OPFS.
227    /// Only available on WASM builds. Async because OPFS APIs return Promises.
228    ///
229    /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
230    #[cfg(target_arch = "wasm32")]
231    pub async fn open_wasm(config: DbConfig) -> Result<Self, DbError> {
232        let db_name = &config.path;
233        let hot_threshold = config.hot_threshold;
234        let rate_limit_requests = config.rate_limit_requests;
235        let rate_limit_window = config.rate_limit_window;
236        let max_body_size = config.max_body_size;
237        let max_keys_per_request = config.max_keys_per_request;
238        let encryption_key = config.encryption_key;
239        let sync_mode = config.sync_mode;
240        let post_backup_script = config.post_backup_script;
241
242        let state = Arc::new(DashMap::new());
243        let (tx, _rx) = broadcast::channel(100);
244        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
245            Arc::new(Default::default());
246        let query_heatmap = Arc::new(Default::default());
247        #[cfg(feature = "schema")]
248        let schemas = Arc::new(DashMap::new());
249
250        // Open the OPFS file. This is async because the browser's OPFS API
251        // uses Promises which we must await.
252        let mut storage: Arc<dyn StorageBackend> =
253            Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
254
255        // Apply encryption wrapper if a key is provided.
256        if let Some(key) = encryption_key {
257            storage = Arc::new(storage::EncryptedStorage::new(storage, &key));
258        }
259
260        // Replay the log into the in-memory state.
261        storage::stream_into_state(
262            &*storage,
263            &state,
264            &indexes,
265            #[cfg(feature = "schema")] &schemas,
266        )?;
267
268        Ok(Self {
269            state,
270            storage,
271            tx,
272            indexes,
273            query_heatmap,
274            hot_threshold,
275            rate_limit_requests,
276            rate_limit_window,
277            max_body_size,
278            max_keys_per_request,
279            #[cfg(feature = "schema")]
280            schemas,
281            post_backup_script,
282            tiered_mode: config.tiered_mode,
283            started_at: std::time::Instant::now(),
284        })
285    }
286
287    /// Returns the total number of hot (in-memory) keys across all collections.
288    pub fn hot_keys_count(&self) -> usize {
289        self.state.iter().map(|c| c.value().len()).sum()
290    }
291
292    /// Create a new broadcast receiver for real-time change notifications.
293    /// Each call returns an independent receiver — multiple WebSocket handlers
294    /// can each subscribe and receive all events independently.
295    pub fn subscribe(&self) -> broadcast::Receiver<String> {
296        self.tx.subscribe()
297    }
298
299    /// Retrieve a single document by key. Returns None if not found.
300    pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
301        operations::get(&self.state, &self.storage, collection, key)
302    }
303
304    /// Retrieve all documents in a collection as a HashMap.
305    pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
306        operations::get_all(&self.state, &self.storage, collection)
307    }
308
309    /// Retrieve a specific set of documents by their keys.
310    pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
311        operations::get_batch(&self.state, &self.storage, collection, keys)
312    }
313
314    /// Insert or overwrite multiple documents in one call.
315    /// Each item is a (key, value) pair. Writes are persisted to storage.
316    pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
317        operations::insert_batch(
318            &self.state,
319            &self.indexes,
320            &self.storage,
321            &self.tx,
322            #[cfg(feature = "schema")] &self.schemas,
323            collection,
324            items,
325        )?;
326
327        // Auto-evict if the collection exceeds the threshold.
328        let _ = self.evict_collection(collection, self.hot_threshold);
329        Ok(())
330    }
331
332    /// Partially update a document — merges `updates` into the existing document.
333    /// Returns true if the document was found and updated, false if not found.
334    pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
335        let updated = operations::update(
336            &self.state,
337            &self.indexes,
338            &self.storage,
339            &self.tx,
340            #[cfg(feature = "schema")] &self.schemas,
341            collection,
342            key,
343            updates,
344        )?;
345
346        if updated {
347            // Auto-evict if the collection exceeds the threshold.
348            let _ = self.evict_collection(collection, self.hot_threshold);
349        }
350        Ok(updated)
351    }
352
353    /// Delete a single document by key.
354    pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
355        operations::delete(
356            &self.state,
357            &self.indexes,
358            &self.storage,
359            &self.tx,
360            collection,
361            key,
362        )
363    }
364
365    /// Delete multiple documents by key in one call.
366    pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
367        operations::delete_batch(
368            &self.state,
369            &self.indexes,
370            &self.storage,
371            &self.tx,
372            collection,
373            keys,
374        )
375    }
376
377    /// Drop an entire collection — removes all documents and its indexes.
378    pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
379        operations::delete_collection(
380            &self.state,
381            &self.indexes,
382            &self.storage,
383            &self.tx,
384            collection,
385        )
386    }
387
388    /// Track that `field` was queried in `collection` and auto-create an index
389    /// if this field has been queried 3 or more times.
390    /// Errors are silently ignored — auto-indexing is best-effort.
391    pub fn track_query(&self, collection: &str, field: &str) {
392        // The `let _ =` discards the Result — a failed auto-index is not fatal.
393        let _ = indexing::track_query(
394            &self.indexes,
395            &self.query_heatmap,
396            collection,
397            field,
398            &self.storage,
399            &self.state,
400        );
401    }
402
403    /// Register a JSON schema for a collection.
404    /// All subsequent writes to this collection must conform to this schema.
405    #[cfg(feature = "schema")]
406    pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
407        schema::set_schema(
408            &self.schemas,
409            &self.storage,
410            &self.tx,
411            collection,
412            schema
413        )
414    }
415    
416    /// Compact the log file — rewrite it to contain only the current state.
417    ///
418    /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
419    /// and writes a binary snapshot for fast next startup.
420    ///
421    /// The compacted log contains:
422    ///   - One INSERT entry per live document (current value only).
423    ///   - One INDEX entry per registered index (index data is rebuilt on replay).
424    pub fn compact(&self) -> Result<(), DbError> {
425        info!("🔨 Starting Log Compaction...");
426
427        // Build the minimal set of entries representing the current state.
428        let mut entries = Vec::new();
429
430        // One INSERT per live document across all collections.
431        for col_ref in self.state.iter() {
432            let col_name = col_ref.key();
433            for item_ref in col_ref.value().iter() {
434                // To compact, we need the full Value. If it's Cold, we fetch it from storage.
435                let entry = match item_ref.value() {
436                    crate::engine::types::DocumentState::Hot(v) => {
437                        types::LogEntry::new(
438                            "INSERT".to_string(),
439                            col_name.clone(),
440                            item_ref.key().clone(),
441                            v.clone(),
442                        )
443                    }
444                    crate::engine::types::DocumentState::Cold(ptr) => {
445                        let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
446                        serde_json::from_slice(&bytes)?
447                    }
448                };
449                entries.push(entry);
450            }
451        }
452
453        // One SCHEMA entry per collection.
454        #[cfg(feature = "schema")]
455        for schema_ref in self.schemas.iter() {
456            let col_name = schema_ref.key();
457            let (schema_json, _) = &**schema_ref.value();
458            entries.push(types::LogEntry::new(
459                "SCHEMA".to_string(),
460                col_name.clone(),
461                "".to_string(),
462                schema_json.clone(),
463            ));
464        }
465
466        // One INDEX entry per registered index.
467        // The index name format is "collection:field" — we split it to get both parts.
468        for index_ref in self.indexes.iter() {
469            let parts: Vec<&str> = index_ref.key().split(':').collect();
470            if parts.len() == 2 {
471                entries.push(types::LogEntry::new(
472                    "INDEX".to_string(),
473                    parts[0].to_string(),
474                    parts[1].to_string(),       // field name
475                    serde_json::json!(null),
476                ));
477            }
478        }
479
480        // Delegate the actual file rewrite (and snapshot write) to the storage backend.
481        self.storage.compact_with_hook(entries.clone(), self.post_backup_script.clone())?;
482
483        // After compaction the log is rewritten and all old RecordPointers are invalid.
484        // Promote every Cold entry in the in-memory state to Hot so subsequent reads
485        // don't try to seek to stale byte offsets in the now-truncated log file.
486        for entry in &entries {
487            if entry.cmd == "INSERT" {
488                if let Some(col) = self.state.get(&entry.collection) {
489                    if let Some(mut doc) = col.get_mut(&entry.key) {
490                        if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
491                            *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
492                        }
493                    }
494                }
495            }
496        }
497
498        info!("✅ Log Compaction Finished!");
499        Ok(())
500    }
501
502    /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
503    ///
504    /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
505    /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
506    pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
507        let col_len = if let Some(col) = self.state.get(collection) {
508            col.len()
509        } else {
510            return Err(DbError::CollectionNotFound);
511        };
512
513        if col_len <= limit {
514            return Ok(0);
515        }
516
517        let mut evicted_count = 0;
518        let mut offset = 0u64;
519        let to_evict = col_len - limit;
520
521        // To evict properly, we need the pointers. Since we don't store them for
522        // Hot documents, we re-scan the log to find them.
523        self.storage.stream_log_into(&mut |entry, length| {
524            if entry.collection == collection {
525                if evicted_count < to_evict {
526                    if let Some(col) = self.state.get(collection) {
527                        if let Some(mut doc_state) = col.get_mut(&entry.key) {
528                            if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
529                                *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
530                                    offset,
531                                    length,
532                                });
533                                evicted_count += 1;
534                            }
535                        }
536                    }
537                }
538            }
539            offset += (length + 1) as u64;
540            ControlFlow::Continue(())
541        })?;
542
543        Ok(evicted_count)
544    }
545
546    /// Recover the database state to a specific point in time or sequence number.
547    /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
548    ///
549    /// This is a utility function used by the CLI for PITR.
550    #[cfg(not(target_arch = "wasm32"))]
551    pub fn recover_to(
552        storage: &dyn StorageBackend,
553        to_time: Option<u64>,
554        to_seq: Option<u64>,
555    ) -> Result<Vec<LogEntry>, DbError> {
556        let state: DashMap<String, DashMap<String, crate::engine::types::DocumentState>> = DashMap::new();
557        let indexes: DashMap<String, DashMap<String, DashSet<String>>> = DashMap::new();
558        #[cfg(feature = "schema")]
559        let schemas: DashMap<String, Arc<(serde_json::Value, jsonschema::Validator)>> = DashMap::new();
560
561            let mut offset = 0u64;
562            let mut count = 0u64;
563            let mut current_tx_entries = Vec::new();
564            let mut current_tx_id = None;
565            
566            storage.stream_log_into(&mut |entry, length| {
567                // Condition 1: Check Timestamp
568                if let Some(t) = to_time {
569                    if entry._t > t {
570                        return ControlFlow::Break(());
571                    }
572                }
573    
574                // Condition 2: Check Sequence
575                if let Some(s) = to_seq {
576                    if count >= s {
577                        return ControlFlow::Break(());
578                    }
579                }
580
581            let pointer = crate::engine::types::RecordPointer {
582                offset,
583                length,
584            };
585
586            match entry.cmd.as_str() {
587                "TX_BEGIN" => {
588                    current_tx_id = Some(entry.key.clone());
589                    current_tx_entries.clear();
590                }
591                "TX_COMMIT" => {
592                    if current_tx_id.as_ref() == Some(&entry.key) {
593                        for (e, p) in current_tx_entries.drain(..) {
594                            crate::engine::storage::apply_entry(
595                                &e,
596                                &state,
597                                &indexes,
598                                #[cfg(feature = "schema")] &schemas,
599                                Some(p),
600                            );
601                        }
602                        current_tx_id = None;
603                    }
604                }
605                _ => {
606                    if current_tx_id.is_some() {
607                        current_tx_entries.push((entry, pointer));
608                    } else {
609                        crate::engine::storage::apply_entry(
610                            &entry,
611                            &state,
612                            &indexes,
613                            #[cfg(feature = "schema")] &schemas,
614                            Some(pointer),
615                        );
616                    }
617                }
618            }
619
620            count += 1;
621            offset += (length + 1) as u64;
622            ControlFlow::Continue(())
623        })?;
624
625        // Convert the recovered state into LogEntries (similar to compact logic)
626        let mut entries = Vec::new();
627        for col_ref in state.iter() {
628            let col_name = col_ref.key();
629            for item_ref in col_ref.value().iter() {
630                let entry = match item_ref.value() {
631                    crate::engine::types::DocumentState::Hot(v) => {
632                        LogEntry::new(
633                            "INSERT".to_string(),
634                            col_name.clone(),
635                            item_ref.key().clone(),
636                            v.clone(),
637                        )
638                    }
639                    crate::engine::types::DocumentState::Cold(ptr) => {
640                        let bytes = storage.read_at(ptr.offset, ptr.length).unwrap_or_default();
641                        serde_json::from_slice(&bytes).unwrap_or_else(|_| {
642                            LogEntry::new("INSERT".to_string(), col_name.clone(), item_ref.key().clone(), serde_json::Value::Null)
643                        })
644                    }
645                };
646                entries.push(entry);
647            }
648        }
649
650        #[cfg(feature = "schema")]
651        for schema_ref in schemas.iter() {
652            let col_name = schema_ref.key();
653            let (schema_json, _) = &**schema_ref.value();
654            entries.push(LogEntry::new(
655                "SCHEMA".to_string(),
656                col_name.clone(),
657                "".to_string(),
658                schema_json.clone(),
659            ));
660        }
661
662        for index_ref in indexes.iter() {
663            let parts: Vec<&str> = index_ref.key().split(':').collect();
664            if parts.len() == 2 {
665                entries.push(LogEntry::new(
666                    "INDEX".to_string(),
667                    parts[0].to_string(),
668                    parts[1].to_string(),
669                    serde_json::json!(null),
670                ));
671            }
672        }
673
674        Ok(entries)
675    }
676}