Skip to main content

reddb_server/log/
store.rs

1//! Append-only log collection backed by UnifiedStore.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::id::{LogId, LogIdGenerator};
7use crate::storage::schema::Value;
8use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9use crate::storage::unified::store::UnifiedStore;
10
11/// Retention policy for log collections.
12#[derive(Debug, Clone, Default)]
13pub enum LogRetention {
14    /// Keep entries for N days, then auto-delete.
15    Days(u64),
16    /// Keep at most N entries (oldest evicted first).
17    MaxEntries(u64),
18    /// Keep total size under N bytes (oldest evicted first).
19    MaxBytes(u64),
20    /// Keep forever (no automatic cleanup).
21    #[default]
22    Forever,
23}
24
25/// Configuration for a log collection.
26#[derive(Debug, Clone)]
27pub struct LogCollectionConfig {
28    pub name: String,
29    pub columns: Vec<String>,
30    pub retention: LogRetention,
31    pub batch_size: usize,
32}
33
34impl LogCollectionConfig {
35    pub fn new(name: &str) -> Self {
36        Self {
37            name: name.to_string(),
38            columns: Vec::new(),
39            retention: LogRetention::Forever,
40            batch_size: 64,
41        }
42    }
43}
44
45/// A single log entry with timestamp-based ID and user fields.
46#[derive(Debug, Clone)]
47pub struct LogEntry {
48    pub id: LogId,
49    pub fields: HashMap<String, Value>,
50}
51
52/// Append-only log collection.
53pub struct LogCollection {
54    config: LogCollectionConfig,
55    id_gen: LogIdGenerator,
56    store: Arc<UnifiedStore>,
57    write_buffer: std::sync::Mutex<Vec<UnifiedEntity>>,
58}
59
60impl LogCollection {
61    pub fn new(store: Arc<UnifiedStore>, config: LogCollectionConfig) -> Self {
62        let _ = store.get_or_create_collection(&config.name);
63
64        // Restore ID generator from highest existing entry
65        let id_gen = LogIdGenerator::new();
66        if let Some(manager) = store.get_collection(&config.name) {
67            let mut max_id = 0u64;
68            manager.for_each_entity(|entity| {
69                if let Some(row) = entity.data.as_row() {
70                    if let Some(Value::UnsignedInteger(id)) = row.get_field("id") {
71                        if *id > max_id {
72                            max_id = *id;
73                        }
74                    }
75                }
76                true
77            });
78            if max_id > 0 {
79                id_gen.restore(max_id);
80            }
81        }
82
83        Self {
84            config,
85            id_gen,
86            store,
87            write_buffer: std::sync::Mutex::new(Vec::new()),
88        }
89    }
90
91    /// Append a single log entry. Returns the assigned ID.
92    pub fn append(&self, fields: HashMap<String, Value>) -> LogId {
93        let id = self.id_gen.next();
94
95        let mut named = HashMap::with_capacity(fields.len() + 1);
96        named.insert("id".to_string(), Value::UnsignedInteger(id.raw()));
97        for (k, v) in fields {
98            named.insert(k, v);
99        }
100
101        let entity = UnifiedEntity::new(
102            EntityId::new(0),
103            EntityKind::TableRow {
104                table: Arc::from(self.config.name.as_str()),
105                row_id: 0,
106            },
107            EntityData::Row(RowData {
108                columns: Vec::new(),
109                named: Some(named),
110                schema: None,
111            }),
112        );
113
114        let batch_size = self.config.batch_size;
115        let should_flush = {
116            let mut buf = self.write_buffer.lock().unwrap_or_else(|e| e.into_inner());
117            buf.push(entity);
118            buf.len() >= batch_size
119        };
120
121        if should_flush {
122            self.flush_buffer();
123        }
124
125        id
126    }
127
128    /// Append a log entry from (key, value) pairs.
129    pub fn append_fields(&self, fields: Vec<(&str, Value)>) -> LogId {
130        let map: HashMap<String, Value> = fields
131            .into_iter()
132            .map(|(k, v)| (k.to_string(), v))
133            .collect();
134        self.append(map)
135    }
136
137    /// Flush the write buffer to storage.
138    pub fn flush_buffer(&self) {
139        let entities = {
140            let mut buf = self.write_buffer.lock().unwrap_or_else(|e| e.into_inner());
141            std::mem::take(&mut *buf)
142        };
143
144        if entities.is_empty() {
145            return;
146        }
147
148        for entity in entities {
149            let _ = self.store.insert_auto(&self.config.name, entity);
150        }
151    }
152
153    /// Query recent entries (newest first).
154    pub fn recent(&self, limit: usize) -> Vec<LogEntry> {
155        self.flush_buffer();
156
157        let manager = match self.store.get_collection(&self.config.name) {
158            Some(m) => m,
159            None => return Vec::new(),
160        };
161
162        // Phase 1: collect top-k log IDs + entity IDs using a bounded min-heap.
163        // Only stores (log_id, entity_id) — no field cloning until phase 2.
164        use std::cmp::Reverse;
165        use std::collections::BinaryHeap;
166
167        let mut heap: BinaryHeap<Reverse<(u64, crate::storage::unified::entity::EntityId)>> =
168            BinaryHeap::with_capacity(limit + 1);
169
170        manager.for_each_entity(|entity| {
171            if let Some(row) = entity.data.as_row() {
172                let id_val = row
173                    .get_field("id")
174                    .and_then(|v| match v {
175                        Value::UnsignedInteger(n) => Some(*n),
176                        _ => None,
177                    })
178                    .unwrap_or(0);
179
180                if heap.len() < limit {
181                    heap.push(Reverse((id_val, entity.id)));
182                } else if let Some(&Reverse((min_id, _))) = heap.peek() {
183                    if id_val > min_id {
184                        heap.pop();
185                        heap.push(Reverse((id_val, entity.id)));
186                    }
187                }
188            }
189            true
190        });
191
192        // Phase 2: fetch full entities only for top-k (avoids cloning all fields)
193        let mut top_ids: Vec<(u64, crate::storage::unified::entity::EntityId)> = heap
194            .into_vec()
195            .into_iter()
196            .map(|Reverse(pair)| pair)
197            .collect();
198        top_ids.sort_by_key(|b| std::cmp::Reverse(b.0)); // newest first
199
200        top_ids
201            .into_iter()
202            .filter_map(|(log_id, entity_id)| {
203                let entity = manager.get(entity_id)?;
204                let row = entity.data.as_row()?;
205                let mut fields = HashMap::new();
206                for (key, value) in row.iter_fields() {
207                    if key != "id" {
208                        fields.insert(key.to_string(), value.clone());
209                    }
210                }
211                Some(LogEntry {
212                    id: LogId(log_id),
213                    fields,
214                })
215            })
216            .collect()
217    }
218
219    /// Query entries within a time range (by ID boundaries).
220    pub fn range(&self, from_id: LogId, to_id: LogId, limit: usize) -> Vec<LogEntry> {
221        self.flush_buffer();
222
223        let manager = match self.store.get_collection(&self.config.name) {
224            Some(m) => m,
225            None => return Vec::new(),
226        };
227
228        let mut entries = Vec::new();
229        manager.for_each_entity(|entity| {
230            if let Some(row) = entity.data.as_row() {
231                let id_val = row
232                    .get_field("id")
233                    .and_then(|v| match v {
234                        Value::UnsignedInteger(n) => Some(*n),
235                        _ => None,
236                    })
237                    .unwrap_or(0);
238
239                if id_val >= from_id.raw() && id_val <= to_id.raw() {
240                    let mut fields = HashMap::new();
241                    for (key, value) in row.iter_fields() {
242                        if key != "id" {
243                            fields.insert(key.to_string(), value.clone());
244                        }
245                    }
246                    entries.push(LogEntry {
247                        id: LogId(id_val),
248                        fields,
249                    });
250                }
251            }
252            true
253        });
254
255        entries.sort_by_key(|a| a.id);
256        entries.truncate(limit);
257        entries
258    }
259
260    /// Apply retention policy: delete entries older than the threshold.
261    pub fn apply_retention(&self) -> u64 {
262        match &self.config.retention {
263            LogRetention::Forever => 0,
264            LogRetention::Days(days) => {
265                let cutoff_ms = std::time::SystemTime::now()
266                    .duration_since(std::time::UNIX_EPOCH)
267                    .unwrap_or_default()
268                    .as_millis() as u64
269                    - days * 86_400_000;
270                let cutoff_id = LogId::from_ms(cutoff_ms);
271                self.delete_before(cutoff_id)
272            }
273            LogRetention::MaxEntries(max) => {
274                let manager = match self.store.get_collection(&self.config.name) {
275                    Some(m) => m,
276                    None => return 0,
277                };
278
279                let mut ids: Vec<(u64, EntityId)> = Vec::new();
280                manager.for_each_entity(|entity| {
281                    if let Some(row) = entity.data.as_row() {
282                        if let Some(Value::UnsignedInteger(log_id)) = row.get_field("id") {
283                            ids.push((*log_id, entity.id));
284                        }
285                    }
286                    true
287                });
288
289                if ids.len() as u64 <= *max {
290                    return 0;
291                }
292
293                ids.sort_by_key(|(log_id, _)| *log_id);
294                let to_delete = ids.len() as u64 - max;
295                let mut deleted = 0u64;
296                for (_, entity_id) in ids.iter().take(to_delete as usize) {
297                    if self
298                        .store
299                        .delete(&self.config.name, *entity_id)
300                        .unwrap_or(false)
301                    {
302                        deleted += 1;
303                    }
304                }
305                deleted
306            }
307            LogRetention::MaxBytes(max_bytes) => {
308                let manager = match self.store.get_collection(&self.config.name) {
309                    Some(m) => m,
310                    None => return 0,
311                };
312
313                // Collect (log_id, entity_id, approx_size) sorted by time
314                let mut entries: Vec<(u64, EntityId, u64)> = Vec::new();
315                manager.for_each_entity(|entity| {
316                    if let Some(row) = entity.data.as_row() {
317                        let log_id = row
318                            .get_field("id")
319                            .and_then(|v| match v {
320                                Value::UnsignedInteger(n) => Some(*n),
321                                _ => None,
322                            })
323                            .unwrap_or(0);
324
325                        // Approximate entry size: 8 bytes per field + value sizes
326                        let mut size = 8u64; // id field
327                        for (key, value) in row.iter_fields() {
328                            size += key.len() as u64 + estimate_value_size(value);
329                        }
330                        entries.push((log_id, entity.id, size));
331                    }
332                    true
333                });
334
335                entries.sort_by_key(|(log_id, _, _)| *log_id);
336
337                let total_size: u64 = entries.iter().map(|(_, _, s)| s).sum();
338                if total_size <= *max_bytes {
339                    return 0;
340                }
341
342                // Delete oldest entries until under budget
343                let mut to_free = total_size - max_bytes;
344                let mut deleted = 0u64;
345                for (_, entity_id, size) in &entries {
346                    if to_free == 0 {
347                        break;
348                    }
349                    if self
350                        .store
351                        .delete(&self.config.name, *entity_id)
352                        .unwrap_or(false)
353                    {
354                        deleted += 1;
355                        to_free = to_free.saturating_sub(*size);
356                    }
357                }
358                deleted
359            }
360        }
361    }
362
363    fn delete_before(&self, cutoff: LogId) -> u64 {
364        let manager = match self.store.get_collection(&self.config.name) {
365            Some(m) => m,
366            None => return 0,
367        };
368
369        let mut to_delete = Vec::new();
370        manager.for_each_entity(|entity| {
371            if let Some(row) = entity.data.as_row() {
372                if let Some(Value::UnsignedInteger(log_id)) = row.get_field("id") {
373                    if *log_id < cutoff.raw() {
374                        to_delete.push(entity.id);
375                    }
376                }
377            }
378            true
379        });
380
381        let mut deleted = 0u64;
382        for entity_id in to_delete {
383            if self
384                .store
385                .delete(&self.config.name, entity_id)
386                .unwrap_or(false)
387            {
388                deleted += 1;
389            }
390        }
391        deleted
392    }
393
394    /// Total number of entries.
395    pub fn len(&self) -> usize {
396        self.flush_buffer();
397        self.store
398            .get_collection(&self.config.name)
399            .map(|m| m.stats().total_entities)
400            .unwrap_or(0)
401    }
402
403    /// Config reference.
404    pub fn config(&self) -> &LogCollectionConfig {
405        &self.config
406    }
407}
408
409fn estimate_value_size(value: &Value) -> u64 {
410    match value {
411        Value::Null => 1,
412        Value::Boolean(_) => 1,
413        Value::Integer(_) | Value::UnsignedInteger(_) | Value::Float(_) => 8,
414        Value::Text(s) => s.len() as u64,
415        Value::Blob(b) => b.len() as u64,
416        Value::Vector(v) => v.len() as u64 * 4,
417        Value::Array(a) => a.iter().map(estimate_value_size).sum::<u64>() + 8,
418        _ => 16, // conservative default for other types
419    }
420}
421
422impl Drop for LogCollection {
423    fn drop(&mut self) {
424        self.flush_buffer();
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    fn test_store() -> Arc<UnifiedStore> {
433        Arc::new(UnifiedStore::new())
434    }
435
436    #[test]
437    fn test_append_and_query() {
438        let store = test_store();
439        let log = LogCollection::new(store, LogCollectionConfig::new("test_log"));
440
441        let id1 = log.append_fields(vec![
442            ("level", Value::text("info")),
443            ("message", Value::text("hello")),
444        ]);
445        let id2 = log.append_fields(vec![
446            ("level", Value::text("error")),
447            ("message", Value::text("oops")),
448        ]);
449
450        assert!(id2.raw() > id1.raw());
451
452        let recent = log.recent(10);
453        assert_eq!(recent.len(), 2);
454        assert_eq!(recent[0].id, id2); // newest first
455        assert_eq!(recent[1].id, id1);
456    }
457
458    #[test]
459    fn test_retention_max_entries() {
460        let store = test_store();
461        let mut config = LogCollectionConfig::new("retention_test");
462        config.retention = LogRetention::MaxEntries(3);
463        config.batch_size = 1;
464
465        let log = LogCollection::new(store, config);
466
467        for i in 0..5 {
468            log.append_fields(vec![("seq", Value::Integer(i))]);
469        }
470
471        assert_eq!(log.len(), 5);
472        let deleted = log.apply_retention();
473        assert_eq!(deleted, 2);
474        assert_eq!(log.len(), 3);
475    }
476
477    #[test]
478    fn test_retention_max_bytes() {
479        let store = test_store();
480        let mut config = LogCollectionConfig::new("bytes_retention_test");
481        config.retention = LogRetention::MaxBytes(200);
482        config.batch_size = 1;
483
484        let log = LogCollection::new(store, config);
485
486        // Insert entries with known sizes (~30-50 bytes each)
487        for i in 0..10 {
488            log.append_fields(vec![("msg", Value::text(format!("entry-{}", i)))]);
489        }
490
491        let before = log.len();
492        assert_eq!(before, 10);
493
494        let deleted = log.apply_retention();
495        assert!(
496            deleted > 0,
497            "should delete some entries to fit under 200 bytes"
498        );
499        assert!(log.len() < 10, "should have fewer entries after retention");
500    }
501
502    #[test]
503    fn test_batch_buffering() {
504        let store = test_store();
505        let mut config = LogCollectionConfig::new("batch_test");
506        config.batch_size = 4;
507
508        let log = LogCollection::new(store.clone(), config);
509
510        // Insert 3 — should stay in buffer (batch_size = 4)
511        for _ in 0..3 {
512            log.append_fields(vec![("msg", Value::text("buffered"))]);
513        }
514
515        // Buffer not flushed yet — store might be empty
516        // But recent() flushes first
517        let entries = log.recent(10);
518        assert_eq!(entries.len(), 3);
519    }
520
521    #[test]
522    fn test_id_is_time_ordered() {
523        let store = test_store();
524        let log = LogCollection::new(store, LogCollectionConfig::new("time_test"));
525
526        let ids: Vec<LogId> = (0..100)
527            .map(|i| log.append_fields(vec![("i", Value::Integer(i))]))
528            .collect();
529
530        for i in 1..ids.len() {
531            assert!(ids[i].raw() > ids[i - 1].raw());
532            assert!(ids[i].timestamp_us() >= ids[i - 1].timestamp_us());
533        }
534    }
535}