use chrono::{DateTime, Utc};
use std::collections::HashMap;
use uuid::Uuid;
use ucm_graph_core::event::{EventPayload, UcmEvent};
pub struct EventStore {
events: Vec<UcmEvent>,
id_index: HashMap<Uuid, usize>,
stream_index: HashMap<String, Vec<usize>>,
checkpoint: usize,
}
impl EventStore {
pub fn new() -> Self {
Self {
events: Vec::new(),
id_index: HashMap::new(),
stream_index: HashMap::new(),
checkpoint: 0,
}
}
pub fn append(&mut self, event: UcmEvent) {
let pos = self.events.len();
self.id_index.insert(event.event_id, pos);
if let Some(stream_key) = Self::extract_stream_key(&event.payload) {
self.stream_index.entry(stream_key).or_default().push(pos);
}
self.events.push(event);
}
pub fn append_batch(&mut self, events: Vec<UcmEvent>) {
for event in events {
self.append(event);
}
}
pub fn get_by_id(&self, id: &Uuid) -> Option<&UcmEvent> {
self.id_index.get(id).and_then(|&pos| self.events.get(pos))
}
pub fn replay(&self, from: Option<DateTime<Utc>>) -> Vec<&UcmEvent> {
match from {
Some(timestamp) => self
.events
.iter()
.filter(|e| e.timestamp >= timestamp)
.collect(),
None => self.events.iter().collect(),
}
}
pub fn events_since_checkpoint(&self) -> &[UcmEvent] {
&self.events[self.checkpoint..]
}
pub fn advance_checkpoint(&mut self) {
self.checkpoint = self.events.len();
}
pub fn stream(&self, stream_key: &str) -> Vec<&UcmEvent> {
self.stream_index
.get(stream_key)
.map(|positions| {
positions
.iter()
.filter_map(|&pos| self.events.get(pos))
.collect()
})
.unwrap_or_default()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn causation_chain(&self, event_id: &Uuid) -> Vec<&UcmEvent> {
let mut chain = Vec::new();
let mut current_id = Some(*event_id);
while let Some(id) = current_id {
if let Some(event) = self.get_by_id(&id) {
chain.push(event);
current_id = event.causation_id;
} else {
break;
}
}
chain.reverse(); chain
}
fn extract_stream_key(payload: &EventPayload) -> Option<String> {
match payload {
EventPayload::EntityDiscovered { file_path, .. } => Some(file_path.clone()),
EventPayload::ChangeDetected { file_path, .. } => Some(file_path.clone()),
EventPayload::EntityRemoved { entity_id, .. } => {
entity_id.file_path().map(|s| s.to_string())
}
_ => None,
}
}
}
impl Default for EventStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use ucm_graph_core::entity::*;
use ucm_graph_core::event::*;
#[test]
fn test_append_and_retrieve() {
let mut store = EventStore::new();
let event = UcmEvent::new(EventPayload::EntityDiscovered {
entity_id: EntityId::local("src/main.rs", "main"),
kind: EntityKind::Function {
is_async: false,
parameter_count: 0,
return_type: None,
},
name: "main".into(),
file_path: "src/main.rs".into(),
language: "rust".into(),
source: DiscoverySource::StaticAnalysis,
line_range: Some((1, 10)),
});
let event_id = event.event_id;
store.append(event);
assert_eq!(store.len(), 1);
assert!(store.get_by_id(&event_id).is_some());
}
#[test]
fn test_replay() {
let mut store = EventStore::new();
for i in 0..5 {
store.append(UcmEvent::new(EventPayload::EntityDiscovered {
entity_id: EntityId::local("src/main.rs", &format!("fn_{i}")),
kind: EntityKind::Function {
is_async: false,
parameter_count: 0,
return_type: None,
},
name: format!("fn_{i}"),
file_path: "src/main.rs".into(),
language: "rust".into(),
source: DiscoverySource::StaticAnalysis,
line_range: None,
}));
}
let all = store.replay(None);
assert_eq!(all.len(), 5);
}
#[test]
fn test_checkpoint() {
let mut store = EventStore::new();
for i in 0..3 {
store.append(UcmEvent::new(EventPayload::EntityDiscovered {
entity_id: EntityId::local("src/main.rs", &format!("fn_{i}")),
kind: EntityKind::Function {
is_async: false,
parameter_count: 0,
return_type: None,
},
name: format!("fn_{i}"),
file_path: "src/main.rs".into(),
language: "rust".into(),
source: DiscoverySource::StaticAnalysis,
line_range: None,
}));
}
store.advance_checkpoint();
for i in 3..5 {
store.append(UcmEvent::new(EventPayload::EntityDiscovered {
entity_id: EntityId::local("src/main.rs", &format!("fn_{i}")),
kind: EntityKind::Function {
is_async: false,
parameter_count: 0,
return_type: None,
},
name: format!("fn_{i}"),
file_path: "src/main.rs".into(),
language: "rust".into(),
source: DiscoverySource::StaticAnalysis,
line_range: None,
}));
}
assert_eq!(store.events_since_checkpoint().len(), 2);
}
#[test]
fn test_causation_chain() {
let mut store = EventStore::new();
let root = UcmEvent::new(EventPayload::ChangeDetected {
file_path: "src/auth/service.ts".into(),
change_type: ChangeType::SignatureChange,
affected_entities: vec![],
before_snapshot: None,
after_snapshot: None,
});
let root_id = root.event_id;
store.append(root);
let child = UcmEvent::caused_by(
EventPayload::ConfidenceUpdated {
source_entity: EntityId::local("src/auth/service.ts", "validateToken"),
target_entity: EntityId::local("src/api/middleware.ts", "authMiddleware"),
new_evidence_confidence: 0.9,
source: DiscoverySource::StaticAnalysis,
description: "re-analyzed".into(),
},
root_id,
);
let child_id = child.event_id;
store.append(child);
let chain = store.causation_chain(&child_id);
assert_eq!(chain.len(), 2);
assert_eq!(chain[0].event_id, root_id);
}
}