1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use futures::TryStreamExt;
19
20use hirn_core::HirnResult;
21
22use hirn_storage::PhysicalStore;
23use hirn_storage::datasets::events::{self, DATASET_NAME, EventRow};
24use hirn_storage::store::{ScanOptions, ScanOrdering};
25
26use crate::event::{EventEnvelope, MemoryEvent};
27
28#[derive(Debug, Default, Clone)]
30pub struct EventFilter {
31 pub realm: Option<String>,
33 pub namespace: Option<String>,
35 pub event_type: Option<String>,
37 pub agent_id: Option<String>,
39 pub after_us: Option<i64>,
41 pub before_us: Option<i64>,
43}
44
45#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47pub struct SnapshotMeta {
48 pub seq: u64,
50 pub timestamp_us: i64,
52 pub event_count: u64,
54}
55
56#[derive(Debug, Clone)]
58pub enum RetentionPolicy {
59 SnapshotBased,
61 MaxEvents(u64),
63 TimeBased(u64),
65}
66
67#[derive(Debug, Clone)]
69pub struct CompactionResult {
70 pub events_removed: u64,
72 pub compacted_before_seq: u64,
74}
75
76pub struct EventLog {
81 storage: Arc<dyn PhysicalStore>,
82 next_seq: AtomicU64,
84 tx: tokio::sync::broadcast::Sender<EventEnvelope>,
86}
87
88impl EventLog {
89 pub async fn open(storage: Arc<dyn PhysicalStore>) -> HirnResult<Self> {
94 let (tx, _) = tokio::sync::broadcast::channel(4096);
95
96 let next_seq = Self::recover_next_seq(&*storage).await?;
98
99 Ok(Self {
100 storage,
101 next_seq: AtomicU64::new(next_seq),
102 tx,
103 })
104 }
105
106 async fn recover_next_seq(storage: &dyn PhysicalStore) -> HirnResult<u64> {
108 let exists = storage.exists(DATASET_NAME).await?;
109 if !exists {
110 return Ok(0);
111 }
112
113 let count = storage.count(DATASET_NAME, None).await?;
114 if count == 0 {
115 return Ok(0);
116 }
117
118 let mut batches = storage
121 .scan_stream(
122 DATASET_NAME,
123 ScanOptions {
124 columns: Some(vec!["seq".into()]),
125 filter: None,
126 exact_filter: None,
127 order_by: Some(vec![ScanOrdering::desc("seq")]),
128 limit: Some(1),
129 offset: None,
130 },
131 )
132 .await?;
133
134 let mut max_seq: u64 = 0;
135 while let Some(batch) = batches.try_next().await? {
136 if let Some(col) = batch.column_by_name("seq") {
137 let arr = col
138 .as_any()
139 .downcast_ref::<arrow_array::UInt64Array>()
140 .ok_or_else(|| {
141 hirn_core::HirnError::storage("event_log seq column is not UInt64")
142 })?;
143 for i in 0..arr.len() {
144 if arr.value(i) > max_seq {
145 max_seq = arr.value(i);
146 }
147 }
148 }
149 }
150
151 Ok(max_seq + 1)
152 }
153
154 pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EventEnvelope> {
156 self.tx.subscribe()
157 }
158
159 pub fn subscribe_filtered(
166 &self,
167 filter: EventFilter,
168 ) -> tokio::sync::mpsc::Receiver<EventEnvelope> {
169 let mut rx = self.tx.subscribe();
170 let (tx, filtered_rx) = tokio::sync::mpsc::channel(256);
171
172 tokio::spawn(async move {
173 loop {
174 match rx.recv().await {
175 Ok(env) => {
176 if filter_matches(&filter, &env) {
177 if tx.send(env).await.is_err() {
178 break; }
180 }
181 }
182 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
183 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
184 tracing::warn!(skipped = n, "event subscriber lagged, lost events");
185 metrics::counter!("hirn_event_subscriber_lagged_total").increment(n);
186 continue;
187 }
188 }
189 }
190 });
191
192 filtered_rx
193 }
194
195 pub fn next_seq(&self) -> u64 {
198 self.next_seq.load(Ordering::Acquire)
199 }
200
201 pub async fn append(
208 &self,
209 realm: impl Into<String>,
210 namespace: impl Into<String>,
211 agent_id: impl Into<String>,
212 event: MemoryEvent,
213 ) -> HirnResult<EventEnvelope> {
214 let seq = self.next_seq.fetch_add(1, Ordering::AcqRel);
215 let envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
216
217 let payload = bincode::serialize(&envelope.event)
218 .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
219
220 let row = EventRow {
221 seq: envelope.seq,
222 timestamp_us: envelope.timestamp_us,
223 realm: envelope.realm.clone(),
224 namespace: envelope.namespace.clone(),
225 agent_id: envelope.agent_id.clone(),
226 event_type: envelope.event_type().to_string(),
227 payload,
228 hmac: envelope.hmac.clone(),
229 };
230
231 let batch = events::to_batch(std::slice::from_ref(&row))?;
232 self.storage.append(DATASET_NAME, batch).await?;
233
234 let _ = self.tx.send(envelope.clone());
236
237 Ok(envelope)
238 }
239
240 pub async fn append_signed(
246 &self,
247 event: MemoryEvent,
248 realm: impl Into<String>,
249 namespace: impl Into<String>,
250 agent_id: impl Into<String>,
251 secret: &[u8],
252 ) -> HirnResult<EventEnvelope> {
253 let seq = self.next_seq.fetch_add(1, Ordering::AcqRel);
254 let mut envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
255 envelope.sign(secret);
256
257 let payload = bincode::serialize(&envelope.event)
258 .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
259
260 let row = EventRow {
261 seq: envelope.seq,
262 timestamp_us: envelope.timestamp_us,
263 realm: envelope.realm.clone(),
264 namespace: envelope.namespace.clone(),
265 agent_id: envelope.agent_id.clone(),
266 event_type: envelope.event_type().to_string(),
267 payload,
268 hmac: envelope.hmac.clone(),
269 };
270
271 let batch = events::to_batch(std::slice::from_ref(&row))?;
272 self.storage.append(DATASET_NAME, batch).await?;
273
274 let _ = self.tx.send(envelope.clone());
275
276 Ok(envelope)
277 }
278
279 pub async fn append_batch(
283 &self,
284 realm: &str,
285 namespace: &str,
286 agent_id: &str,
287 events_in: Vec<MemoryEvent>,
288 ) -> HirnResult<Vec<EventEnvelope>> {
289 if events_in.is_empty() {
290 return Ok(vec![]);
291 }
292
293 let base_seq = self
294 .next_seq
295 .fetch_add(events_in.len() as u64, Ordering::AcqRel);
296
297 let mut envelopes = Vec::with_capacity(events_in.len());
298 let mut rows = Vec::with_capacity(events_in.len());
299
300 for (i, event) in events_in.into_iter().enumerate() {
301 let seq = base_seq + i as u64;
302 let envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
303
304 let payload = bincode::serialize(&envelope.event)
305 .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
306
307 rows.push(EventRow {
308 seq: envelope.seq,
309 timestamp_us: envelope.timestamp_us,
310 realm: envelope.realm.clone(),
311 namespace: envelope.namespace.clone(),
312 agent_id: envelope.agent_id.clone(),
313 event_type: envelope.event_type().to_string(),
314 payload,
315 hmac: envelope.hmac.clone(),
316 });
317
318 envelopes.push(envelope);
319 }
320
321 let batch = events::to_batch(&rows)?;
322 self.storage.append(DATASET_NAME, batch).await?;
323
324 for env in &envelopes {
326 let _ = self.tx.send(env.clone());
327 }
328
329 Ok(envelopes)
330 }
331
332 pub async fn read(&self, from_seq: u64, to_seq: u64) -> HirnResult<Vec<EventEnvelope>> {
336 let filter = format!("seq >= {from_seq} AND seq <= {to_seq}");
337 self.read_filtered(Some(&filter)).await
338 }
339
340 pub async fn tail(&self, from_seq: u64) -> HirnResult<Vec<EventEnvelope>> {
342 let filter = format!("seq >= {from_seq}");
343 self.read_filtered(Some(&filter)).await
344 }
345
346 pub async fn read_all(&self) -> HirnResult<Vec<EventEnvelope>> {
348 self.read_filtered(None).await
349 }
350
351 pub async fn read_with_filter(&self, filter: &EventFilter) -> HirnResult<Vec<EventEnvelope>> {
353 let mut predicates = Vec::new();
354
355 if let Some(ref realm) = filter.realm {
356 let escaped = realm.replace('\'', "''");
357 predicates.push(format!("realm = '{escaped}'"));
358 }
359 if let Some(ref ns) = filter.namespace {
360 let escaped = ns.replace('\'', "''");
361 predicates.push(format!("namespace = '{escaped}'"));
362 }
363 if let Some(ref et) = filter.event_type {
364 let escaped = et.replace('\'', "''");
365 predicates.push(format!("event_type = '{escaped}'"));
366 }
367 if let Some(ref aid) = filter.agent_id {
368 let escaped = aid.replace('\'', "''");
369 predicates.push(format!("agent_id = '{escaped}'"));
370 }
371 if let Some(after) = filter.after_us {
372 predicates.push(format!("timestamp_us >= {after}"));
373 }
374 if let Some(before) = filter.before_us {
375 predicates.push(format!("timestamp_us <= {before}"));
376 }
377
378 let combined = if predicates.is_empty() {
379 None
380 } else {
381 Some(predicates.join(" AND "))
382 };
383
384 self.read_filtered(combined.as_deref()).await
385 }
386
387 pub async fn replay<F>(&self, mut handler: F) -> HirnResult<u64>
391 where
392 F: FnMut(&EventEnvelope) -> HirnResult<()>,
393 {
394 let envelopes = self.read_all().await?;
395 let count = envelopes.len() as u64;
396 for env in &envelopes {
397 handler(env)?;
398 }
399 Ok(count)
400 }
401
402 pub async fn verify_integrity(&self, secret: &[u8]) -> HirnResult<Vec<u64>> {
408 let events = self.read_all().await?;
409 let failures: Vec<u64> = events
410 .iter()
411 .filter(|env| !env.verify_hmac(secret))
412 .map(|env| env.seq)
413 .collect();
414 Ok(failures)
415 }
416
417 pub async fn replay_from<F>(&self, from_seq: u64, mut handler: F) -> HirnResult<u64>
419 where
420 F: FnMut(&EventEnvelope) -> HirnResult<()>,
421 {
422 let envelopes = self.tail(from_seq).await?;
423 let count = envelopes.len() as u64;
424 for env in &envelopes {
425 handler(env)?;
426 }
427 Ok(count)
428 }
429
430 async fn read_filtered(&self, filter: Option<&str>) -> HirnResult<Vec<EventEnvelope>> {
432 self.read_filtered_limited(filter, None).await
433 }
434
435 async fn read_filtered_limited(
437 &self,
438 filter: Option<&str>,
439 limit: Option<usize>,
440 ) -> HirnResult<Vec<EventEnvelope>> {
441 self.read_filtered_limited_ordered(filter, limit, vec![ScanOrdering::asc("seq")])
442 .await
443 }
444
445 async fn read_filtered_limited_ordered(
446 &self,
447 filter: Option<&str>,
448 limit: Option<usize>,
449 order_by: Vec<ScanOrdering>,
450 ) -> HirnResult<Vec<EventEnvelope>> {
451 let exists = self.storage.exists(DATASET_NAME).await?;
452 if !exists {
453 return Ok(vec![]);
454 }
455
456 let mut batches = self
457 .storage
458 .scan_stream(
459 DATASET_NAME,
460 ScanOptions {
461 columns: None,
462 filter: filter.map(String::from),
463 exact_filter: None,
464 order_by: Some(order_by),
465 limit,
466 offset: None,
467 },
468 )
469 .await?;
470
471 let mut envelopes = Vec::new();
472 while let Some(batch) = batches.try_next().await? {
473 let rows = events::from_batch(&batch)?;
474 for row in rows {
475 let event: MemoryEvent = bincode::deserialize(&row.payload).map_err(|e| {
476 hirn_core::HirnError::storage(format!(
477 "event deserialize at seq {}: {e}",
478 row.seq
479 ))
480 })?;
481
482 envelopes.push(EventEnvelope {
483 seq: row.seq,
484 timestamp_us: row.timestamp_us,
485 realm: row.realm,
486 namespace: row.namespace,
487 agent_id: row.agent_id,
488 event: event,
489 hmac: row.hmac,
490 });
491 }
492 }
493 Ok(envelopes)
494 }
495
496 pub async fn snapshot(&self, materialized_tables: &[&str]) -> HirnResult<SnapshotMeta> {
503 let current_seq = self.next_seq.load(Ordering::Acquire).saturating_sub(1);
504 let tag = format!("snapshot-{current_seq}");
505
506 for table_name in materialized_tables {
508 if self.storage.exists(table_name).await? {
509 self.storage.tag(table_name, &tag).await?;
510 }
511 }
512
513 let _ = self
515 .append(
516 "system",
517 "system",
518 "system",
519 MemoryEvent::SnapshotTaken {
520 seq: current_seq,
521 tag: tag.clone(),
522 },
523 )
524 .await?;
525
526 let event_count = self.storage.count(DATASET_NAME, None).await.unwrap_or(0);
527
528 let meta = SnapshotMeta {
529 seq: current_seq,
530 timestamp_us: chrono::Utc::now().timestamp_micros(),
531 event_count,
532 };
533
534 Ok(meta)
535 }
536
537 pub async fn compact(&self, before_seq: u64) -> HirnResult<CompactionResult> {
542 let exists = self.storage.exists(DATASET_NAME).await?;
543 if !exists {
544 return Ok(CompactionResult {
545 events_removed: 0,
546 compacted_before_seq: before_seq,
547 });
548 }
549
550 let predicate = format!(
551 "seq < {before_seq} AND event_type NOT IN ('access_granted', 'access_denied', 'policy_changed')"
552 );
553 let deleted = self.storage.delete(DATASET_NAME, &predicate).await?;
554
555 self.storage
557 .compact(DATASET_NAME, Default::default())
558 .await?;
559 self.storage.optimize_indices(DATASET_NAME).await?;
560
561 let _ = self
563 .append(
564 "system",
565 "system",
566 "system",
567 MemoryEvent::CompactionCompleted {
568 before_seq,
569 events_removed: deleted,
570 },
571 )
572 .await?;
573
574 Ok(CompactionResult {
575 events_removed: deleted,
576 compacted_before_seq: before_seq,
577 })
578 }
579
580 pub async fn apply_retention(&self, policy: &RetentionPolicy) -> HirnResult<CompactionResult> {
582 match policy {
583 RetentionPolicy::SnapshotBased => {
584 let snapshots = self
585 .read_filtered_limited_ordered(
586 Some("event_type = 'snapshot_taken'"),
587 Some(1),
588 vec![ScanOrdering::desc("seq")],
589 )
590 .await?;
591 let last_snapshot_seq = snapshots.iter().find_map(|e| {
592 if let MemoryEvent::SnapshotTaken { seq, .. } = &e.event {
593 Some(*seq)
594 } else {
595 None
596 }
597 });
598
599 match last_snapshot_seq {
600 Some(seq) => self.compact(seq).await,
601 None => Ok(CompactionResult {
602 events_removed: 0,
603 compacted_before_seq: 0,
604 }),
605 }
606 }
607 RetentionPolicy::MaxEvents(max) => {
608 let count = self.storage.count(DATASET_NAME, None).await.unwrap_or(0);
609 if count <= *max {
610 return Ok(CompactionResult {
611 events_removed: 0,
612 compacted_before_seq: 0,
613 });
614 }
615 let to_remove = count - max;
616 let cutoff_events = self
619 .read_filtered_limited(None, Some((to_remove + 1) as usize))
620 .await?;
621 if let Some(env) = cutoff_events.get(to_remove as usize) {
622 self.compact(env.seq).await
623 } else {
624 Ok(CompactionResult {
625 events_removed: 0,
626 compacted_before_seq: 0,
627 })
628 }
629 }
630 RetentionPolicy::TimeBased(max_age_secs) => {
631 let cutoff_us =
632 chrono::Utc::now().timestamp_micros() - (*max_age_secs as i64 * 1_000_000);
633 let filter = format!("timestamp_us >= {cutoff_us}");
636 let after_cutoff = self.read_filtered_limited(Some(&filter), Some(1)).await?;
637 let compact_seq = after_cutoff.first().map(|e| e.seq);
638 match compact_seq {
639 Some(seq) => self.compact(seq).await,
640 None => Ok(CompactionResult {
641 events_removed: 0,
642 compacted_before_seq: 0,
643 }),
644 }
645 }
646 }
647 }
648}
649
650fn filter_matches(filter: &EventFilter, env: &EventEnvelope) -> bool {
652 if let Some(ref realm) = filter.realm {
653 if env.realm != *realm {
654 return false;
655 }
656 }
657 if let Some(ref ns) = filter.namespace {
658 if env.namespace != *ns {
659 return false;
660 }
661 }
662 if let Some(ref et) = filter.event_type {
663 if env.event_type() != et.as_str() {
664 return false;
665 }
666 }
667 if let Some(ref aid) = filter.agent_id {
668 if env.agent_id != *aid {
669 return false;
670 }
671 }
672 if let Some(after) = filter.after_us {
673 if env.timestamp_us < after {
674 return false;
675 }
676 }
677 if let Some(before) = filter.before_us {
678 if env.timestamp_us > before {
679 return false;
680 }
681 }
682 true
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use hirn_storage::memory_store::MemoryStore;
689
690 fn null_storage() -> Arc<dyn PhysicalStore> {
691 Arc::new(MemoryStore::new())
692 }
693
694 #[tokio::test]
695 async fn open_on_empty_storage() {
696 let log = EventLog::open(null_storage()).await.unwrap();
697 assert_eq!(log.next_seq(), 0);
698 }
699
700 #[tokio::test]
701 async fn append_assigns_sequential_seqs() {
702 let log = EventLog::open(null_storage()).await.unwrap();
703
704 let e1 = log
705 .append(
706 "r",
707 "ns",
708 "a",
709 MemoryEvent::WorkingPushed {
710 id: hirn_core::id::MemoryId::new(),
711 },
712 )
713 .await
714 .unwrap();
715 assert_eq!(e1.seq, 0);
716
717 let e2 = log
718 .append(
719 "r",
720 "ns",
721 "a",
722 MemoryEvent::Archived {
723 id: hirn_core::id::MemoryId::new(),
724 },
725 )
726 .await
727 .unwrap();
728 assert_eq!(e2.seq, 1);
729
730 assert_eq!(log.next_seq(), 2);
731 }
732
733 #[tokio::test]
734 async fn append_batch_consecutive_seqs() {
735 let log = EventLog::open(null_storage()).await.unwrap();
736
737 let events = vec![
738 MemoryEvent::WorkingPushed {
739 id: hirn_core::id::MemoryId::new(),
740 },
741 MemoryEvent::Archived {
742 id: hirn_core::id::MemoryId::new(),
743 },
744 MemoryEvent::Consolidated {
745 records_processed: 5,
746 },
747 ];
748
749 let envs = log.append_batch("r", "ns", "a", events).await.unwrap();
750 assert_eq!(envs.len(), 3);
751 assert_eq!(envs[0].seq, 0);
752 assert_eq!(envs[1].seq, 1);
753 assert_eq!(envs[2].seq, 2);
754 assert_eq!(log.next_seq(), 3);
755 }
756
757 #[tokio::test]
758 async fn broadcast_subscriber_receives_events() {
759 let log = EventLog::open(null_storage()).await.unwrap();
760 let mut rx = log.subscribe();
761
762 let id = hirn_core::id::MemoryId::new();
763 log.append("r", "ns", "a", MemoryEvent::WorkingPushed { id })
764 .await
765 .unwrap();
766
767 let received = rx.try_recv().unwrap();
768 assert_eq!(received.seq, 0);
769 assert_eq!(received.event_type(), "working_pushed");
770 }
771}