1use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::id::{LogId, LogIdGenerator};
7use crate::storage::schema::Value;
8use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
9use crate::storage::unified::store::UnifiedStore;
10
11#[derive(Debug, Clone, Default)]
13pub enum LogRetention {
14 Days(u64),
16 MaxEntries(u64),
18 MaxBytes(u64),
20 #[default]
22 Forever,
23}
24
25#[derive(Debug, Clone)]
27pub struct LogCollectionConfig {
28 pub name: String,
29 pub columns: Vec<String>,
30 pub retention: LogRetention,
31 pub batch_size: usize,
32}
33
34impl LogCollectionConfig {
35 pub fn new(name: &str) -> Self {
36 Self {
37 name: name.to_string(),
38 columns: Vec::new(),
39 retention: LogRetention::Forever,
40 batch_size: 64,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct LogEntry {
48 pub id: LogId,
49 pub fields: HashMap<String, Value>,
50}
51
52pub struct LogCollection {
54 config: LogCollectionConfig,
55 id_gen: LogIdGenerator,
56 store: Arc<UnifiedStore>,
57 write_buffer: std::sync::Mutex<Vec<UnifiedEntity>>,
58}
59
60impl LogCollection {
61 pub fn new(store: Arc<UnifiedStore>, config: LogCollectionConfig) -> Self {
62 let _ = store.get_or_create_collection(&config.name);
63
64 let id_gen = LogIdGenerator::new();
66 if let Some(manager) = store.get_collection(&config.name) {
67 let mut max_id = 0u64;
68 manager.for_each_entity(|entity| {
69 if let Some(row) = entity.data.as_row() {
70 if let Some(Value::UnsignedInteger(id)) = row.get_field("id") {
71 if *id > max_id {
72 max_id = *id;
73 }
74 }
75 }
76 true
77 });
78 if max_id > 0 {
79 id_gen.restore(max_id);
80 }
81 }
82
83 Self {
84 config,
85 id_gen,
86 store,
87 write_buffer: std::sync::Mutex::new(Vec::new()),
88 }
89 }
90
91 pub fn append(&self, fields: HashMap<String, Value>) -> LogId {
93 let id = self.id_gen.next();
94
95 let mut named = HashMap::with_capacity(fields.len() + 1);
96 named.insert("id".to_string(), Value::UnsignedInteger(id.raw()));
97 for (k, v) in fields {
98 named.insert(k, v);
99 }
100
101 let entity = UnifiedEntity::new(
102 EntityId::new(0),
103 EntityKind::TableRow {
104 table: Arc::from(self.config.name.as_str()),
105 row_id: 0,
106 },
107 EntityData::Row(RowData {
108 columns: Vec::new(),
109 named: Some(named),
110 schema: None,
111 }),
112 );
113
114 let batch_size = self.config.batch_size;
115 let should_flush = {
116 let mut buf = self.write_buffer.lock().unwrap_or_else(|e| e.into_inner());
117 buf.push(entity);
118 buf.len() >= batch_size
119 };
120
121 if should_flush {
122 self.flush_buffer();
123 }
124
125 id
126 }
127
128 pub fn append_fields(&self, fields: Vec<(&str, Value)>) -> LogId {
130 let map: HashMap<String, Value> = fields
131 .into_iter()
132 .map(|(k, v)| (k.to_string(), v))
133 .collect();
134 self.append(map)
135 }
136
137 pub fn flush_buffer(&self) {
139 let entities = {
140 let mut buf = self.write_buffer.lock().unwrap_or_else(|e| e.into_inner());
141 std::mem::take(&mut *buf)
142 };
143
144 if entities.is_empty() {
145 return;
146 }
147
148 for entity in entities {
149 let _ = self.store.insert_auto(&self.config.name, entity);
150 }
151 }
152
153 pub fn recent(&self, limit: usize) -> Vec<LogEntry> {
155 self.flush_buffer();
156
157 let manager = match self.store.get_collection(&self.config.name) {
158 Some(m) => m,
159 None => return Vec::new(),
160 };
161
162 use std::cmp::Reverse;
165 use std::collections::BinaryHeap;
166
167 let mut heap: BinaryHeap<Reverse<(u64, crate::storage::unified::entity::EntityId)>> =
168 BinaryHeap::with_capacity(limit + 1);
169
170 manager.for_each_entity(|entity| {
171 if let Some(row) = entity.data.as_row() {
172 let id_val = row
173 .get_field("id")
174 .and_then(|v| match v {
175 Value::UnsignedInteger(n) => Some(*n),
176 _ => None,
177 })
178 .unwrap_or(0);
179
180 if heap.len() < limit {
181 heap.push(Reverse((id_val, entity.id)));
182 } else if let Some(&Reverse((min_id, _))) = heap.peek() {
183 if id_val > min_id {
184 heap.pop();
185 heap.push(Reverse((id_val, entity.id)));
186 }
187 }
188 }
189 true
190 });
191
192 let mut top_ids: Vec<(u64, crate::storage::unified::entity::EntityId)> = heap
194 .into_vec()
195 .into_iter()
196 .map(|Reverse(pair)| pair)
197 .collect();
198 top_ids.sort_by_key(|b| std::cmp::Reverse(b.0)); top_ids
201 .into_iter()
202 .filter_map(|(log_id, entity_id)| {
203 let entity = manager.get(entity_id)?;
204 let row = entity.data.as_row()?;
205 let mut fields = HashMap::new();
206 for (key, value) in row.iter_fields() {
207 if key != "id" {
208 fields.insert(key.to_string(), value.clone());
209 }
210 }
211 Some(LogEntry {
212 id: LogId(log_id),
213 fields,
214 })
215 })
216 .collect()
217 }
218
219 pub fn range(&self, from_id: LogId, to_id: LogId, limit: usize) -> Vec<LogEntry> {
221 self.flush_buffer();
222
223 let manager = match self.store.get_collection(&self.config.name) {
224 Some(m) => m,
225 None => return Vec::new(),
226 };
227
228 let mut entries = Vec::new();
229 manager.for_each_entity(|entity| {
230 if let Some(row) = entity.data.as_row() {
231 let id_val = row
232 .get_field("id")
233 .and_then(|v| match v {
234 Value::UnsignedInteger(n) => Some(*n),
235 _ => None,
236 })
237 .unwrap_or(0);
238
239 if id_val >= from_id.raw() && id_val <= to_id.raw() {
240 let mut fields = HashMap::new();
241 for (key, value) in row.iter_fields() {
242 if key != "id" {
243 fields.insert(key.to_string(), value.clone());
244 }
245 }
246 entries.push(LogEntry {
247 id: LogId(id_val),
248 fields,
249 });
250 }
251 }
252 true
253 });
254
255 entries.sort_by_key(|a| a.id);
256 entries.truncate(limit);
257 entries
258 }
259
260 pub fn apply_retention(&self) -> u64 {
262 match &self.config.retention {
263 LogRetention::Forever => 0,
264 LogRetention::Days(days) => {
265 let cutoff_ms = std::time::SystemTime::now()
266 .duration_since(std::time::UNIX_EPOCH)
267 .unwrap_or_default()
268 .as_millis() as u64
269 - days * 86_400_000;
270 let cutoff_id = LogId::from_ms(cutoff_ms);
271 self.delete_before(cutoff_id)
272 }
273 LogRetention::MaxEntries(max) => {
274 let manager = match self.store.get_collection(&self.config.name) {
275 Some(m) => m,
276 None => return 0,
277 };
278
279 let mut ids: Vec<(u64, EntityId)> = Vec::new();
280 manager.for_each_entity(|entity| {
281 if let Some(row) = entity.data.as_row() {
282 if let Some(Value::UnsignedInteger(log_id)) = row.get_field("id") {
283 ids.push((*log_id, entity.id));
284 }
285 }
286 true
287 });
288
289 if ids.len() as u64 <= *max {
290 return 0;
291 }
292
293 ids.sort_by_key(|(log_id, _)| *log_id);
294 let to_delete = ids.len() as u64 - max;
295 let mut deleted = 0u64;
296 for (_, entity_id) in ids.iter().take(to_delete as usize) {
297 if self
298 .store
299 .delete(&self.config.name, *entity_id)
300 .unwrap_or(false)
301 {
302 deleted += 1;
303 }
304 }
305 deleted
306 }
307 LogRetention::MaxBytes(max_bytes) => {
308 let manager = match self.store.get_collection(&self.config.name) {
309 Some(m) => m,
310 None => return 0,
311 };
312
313 let mut entries: Vec<(u64, EntityId, u64)> = Vec::new();
315 manager.for_each_entity(|entity| {
316 if let Some(row) = entity.data.as_row() {
317 let log_id = row
318 .get_field("id")
319 .and_then(|v| match v {
320 Value::UnsignedInteger(n) => Some(*n),
321 _ => None,
322 })
323 .unwrap_or(0);
324
325 let mut size = 8u64; for (key, value) in row.iter_fields() {
328 size += key.len() as u64 + estimate_value_size(value);
329 }
330 entries.push((log_id, entity.id, size));
331 }
332 true
333 });
334
335 entries.sort_by_key(|(log_id, _, _)| *log_id);
336
337 let total_size: u64 = entries.iter().map(|(_, _, s)| s).sum();
338 if total_size <= *max_bytes {
339 return 0;
340 }
341
342 let mut to_free = total_size - max_bytes;
344 let mut deleted = 0u64;
345 for (_, entity_id, size) in &entries {
346 if to_free == 0 {
347 break;
348 }
349 if self
350 .store
351 .delete(&self.config.name, *entity_id)
352 .unwrap_or(false)
353 {
354 deleted += 1;
355 to_free = to_free.saturating_sub(*size);
356 }
357 }
358 deleted
359 }
360 }
361 }
362
363 fn delete_before(&self, cutoff: LogId) -> u64 {
364 let manager = match self.store.get_collection(&self.config.name) {
365 Some(m) => m,
366 None => return 0,
367 };
368
369 let mut to_delete = Vec::new();
370 manager.for_each_entity(|entity| {
371 if let Some(row) = entity.data.as_row() {
372 if let Some(Value::UnsignedInteger(log_id)) = row.get_field("id") {
373 if *log_id < cutoff.raw() {
374 to_delete.push(entity.id);
375 }
376 }
377 }
378 true
379 });
380
381 let mut deleted = 0u64;
382 for entity_id in to_delete {
383 if self
384 .store
385 .delete(&self.config.name, entity_id)
386 .unwrap_or(false)
387 {
388 deleted += 1;
389 }
390 }
391 deleted
392 }
393
394 pub fn len(&self) -> usize {
396 self.flush_buffer();
397 self.store
398 .get_collection(&self.config.name)
399 .map(|m| m.stats().total_entities)
400 .unwrap_or(0)
401 }
402
403 pub fn config(&self) -> &LogCollectionConfig {
405 &self.config
406 }
407}
408
409fn estimate_value_size(value: &Value) -> u64 {
410 match value {
411 Value::Null => 1,
412 Value::Boolean(_) => 1,
413 Value::Integer(_) | Value::UnsignedInteger(_) | Value::Float(_) => 8,
414 Value::Text(s) => s.len() as u64,
415 Value::Blob(b) => b.len() as u64,
416 Value::Vector(v) => v.len() as u64 * 4,
417 Value::Array(a) => a.iter().map(estimate_value_size).sum::<u64>() + 8,
418 _ => 16, }
420}
421
422impl Drop for LogCollection {
423 fn drop(&mut self) {
424 self.flush_buffer();
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 fn test_store() -> Arc<UnifiedStore> {
433 Arc::new(UnifiedStore::new())
434 }
435
436 #[test]
437 fn test_append_and_query() {
438 let store = test_store();
439 let log = LogCollection::new(store, LogCollectionConfig::new("test_log"));
440
441 let id1 = log.append_fields(vec![
442 ("level", Value::text("info")),
443 ("message", Value::text("hello")),
444 ]);
445 let id2 = log.append_fields(vec![
446 ("level", Value::text("error")),
447 ("message", Value::text("oops")),
448 ]);
449
450 assert!(id2.raw() > id1.raw());
451
452 let recent = log.recent(10);
453 assert_eq!(recent.len(), 2);
454 assert_eq!(recent[0].id, id2); assert_eq!(recent[1].id, id1);
456 }
457
458 #[test]
459 fn test_retention_max_entries() {
460 let store = test_store();
461 let mut config = LogCollectionConfig::new("retention_test");
462 config.retention = LogRetention::MaxEntries(3);
463 config.batch_size = 1;
464
465 let log = LogCollection::new(store, config);
466
467 for i in 0..5 {
468 log.append_fields(vec![("seq", Value::Integer(i))]);
469 }
470
471 assert_eq!(log.len(), 5);
472 let deleted = log.apply_retention();
473 assert_eq!(deleted, 2);
474 assert_eq!(log.len(), 3);
475 }
476
477 #[test]
478 fn test_retention_max_bytes() {
479 let store = test_store();
480 let mut config = LogCollectionConfig::new("bytes_retention_test");
481 config.retention = LogRetention::MaxBytes(200);
482 config.batch_size = 1;
483
484 let log = LogCollection::new(store, config);
485
486 for i in 0..10 {
488 log.append_fields(vec![("msg", Value::text(format!("entry-{}", i)))]);
489 }
490
491 let before = log.len();
492 assert_eq!(before, 10);
493
494 let deleted = log.apply_retention();
495 assert!(
496 deleted > 0,
497 "should delete some entries to fit under 200 bytes"
498 );
499 assert!(log.len() < 10, "should have fewer entries after retention");
500 }
501
502 #[test]
503 fn test_batch_buffering() {
504 let store = test_store();
505 let mut config = LogCollectionConfig::new("batch_test");
506 config.batch_size = 4;
507
508 let log = LogCollection::new(store.clone(), config);
509
510 for _ in 0..3 {
512 log.append_fields(vec![("msg", Value::text("buffered"))]);
513 }
514
515 let entries = log.recent(10);
518 assert_eq!(entries.len(), 3);
519 }
520
521 #[test]
522 fn test_id_is_time_ordered() {
523 let store = test_store();
524 let log = LogCollection::new(store, LogCollectionConfig::new("time_test"));
525
526 let ids: Vec<LogId> = (0..100)
527 .map(|i| log.append_fields(vec![("i", Value::Integer(i))]))
528 .collect();
529
530 for i in 1..ids.len() {
531 assert!(ids[i].raw() > ids[i - 1].raw());
532 assert!(ids[i].timestamp_us() >= ids[i - 1].timestamp_us());
533 }
534 }
535}