use allsource_core::{
QueryEventsRequest,
domain::entities::Event,
infrastructure::persistence::{CompactionConfig, SnapshotConfig, SnapshotType, WALConfig},
store::{EventStore, EventStoreConfig},
};
use chrono::Utc;
use serde_json::json;
use std::sync::Arc;
use tempfile::TempDir;
fn create_test_event(entity_id: &str, event_type: &str, payload: serde_json::Value) -> Event {
Event::from_strings(
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
None,
)
.unwrap()
}
#[test]
fn test_full_lifecycle_in_memory() {
let store = EventStore::new();
for i in 0..100 {
let event = create_test_event(
"user-1",
"score.updated",
json!({"score": i * 10, "timestamp": Utc::now()}),
);
store.ingest(&event).unwrap();
}
let query = QueryEventsRequest {
entity_id: Some("user-1".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 100);
let state = store.reconstruct_state("user-1", None).unwrap();
assert!(state["current_state"]["score"].is_number());
assert!(
!state["history"].as_array().unwrap().is_empty(),
"Should have events in history"
);
let stats = store.stats();
assert_eq!(stats.total_events, 100);
assert_eq!(stats.total_entities, 1);
}
#[test]
fn test_parquet_persistence_and_recovery() {
let temp_dir = TempDir::new().unwrap();
{
let config = EventStoreConfig::with_persistence(temp_dir.path());
let store = EventStore::with_config(config);
for i in 0..50 {
let event = create_test_event(
"order-1",
"order.updated",
json!({"amount": i * 100, "status": "pending"}),
);
store.ingest(&event).unwrap();
}
store.flush_storage().unwrap();
}
{
let config = EventStoreConfig::with_persistence(temp_dir.path());
let store = EventStore::with_config(config);
let stats = store.stats();
assert_eq!(
stats.total_events, 50,
"Should recover all events from Parquet"
);
let state = store.reconstruct_state("order-1", None).unwrap();
assert_eq!(state["event_count"], 50);
}
}
#[test]
fn test_wal_durability_and_recovery() {
let storage_dir = TempDir::new().unwrap();
let wal_dir = TempDir::new().unwrap();
{
let config = EventStoreConfig {
storage_dir: Some(storage_dir.path().to_path_buf()),
wal_dir: Some(wal_dir.path().to_path_buf()),
wal_config: WALConfig::default(),
..Default::default()
};
let store = EventStore::with_config(config);
for i in 0..30 {
let event = create_test_event(
"user-2",
"login.attempt",
json!({"attempt": i, "success": i % 2 == 0}),
);
store.ingest(&event).unwrap();
}
}
{
let config = EventStoreConfig {
storage_dir: Some(storage_dir.path().to_path_buf()),
wal_dir: Some(wal_dir.path().to_path_buf()),
wal_config: WALConfig::default(),
..Default::default()
};
let store = EventStore::with_config(config);
let stats = store.stats();
assert!(
stats.total_events >= 30,
"Should recover at least 30 events from WAL after 'crash', got {}",
stats.total_events
);
let query = QueryEventsRequest {
entity_id: Some("user-2".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 30, "Should have exactly 30 events for user-2");
}
}
#[test]
fn test_snapshot_optimization() {
let temp_dir = TempDir::new().unwrap();
let snapshot_config = SnapshotConfig {
event_threshold: 10,
auto_snapshot: true,
..Default::default()
};
let config = EventStoreConfig {
storage_dir: Some(temp_dir.path().to_path_buf()),
snapshot_config,
..Default::default()
};
let store = EventStore::with_config(config);
for i in 0..50 {
let event = create_test_event(
"account-1",
"transaction.processed",
json!({"amount": i * 50, "balance": 1000 + (i * 50)}),
);
store.ingest(&event).unwrap();
}
let snapshot_manager = store.snapshot_manager();
let latest = snapshot_manager.get_latest_snapshot("account-1");
assert!(
latest.is_some(),
"Snapshot should be created automatically after threshold"
);
let snapshot = latest.unwrap();
assert!(snapshot.event_count >= 10);
assert!(
snapshot.metadata.snapshot_type == SnapshotType::Automatic
|| snapshot.metadata.snapshot_type == SnapshotType::Manual
);
let stats = snapshot_manager.stats();
assert!(stats.total_snapshots > 0);
}
#[test]
fn test_time_travel_queries() {
let store = EventStore::new();
let timestamps: Vec<_> = (0..10)
.map(|i| {
std::thread::sleep(std::time::Duration::from_millis(50));
let event = create_test_event(
"document-1",
"content.updated",
json!({"version": i, "content": format!("Version {}", i)}),
);
let ts = event.timestamp();
store.ingest(&event).unwrap();
ts
})
.collect();
let state_at_v5 = store
.reconstruct_state("document-1", Some(timestamps[5]))
.unwrap();
assert_eq!(state_at_v5["event_count"], 6);
let state_at_v9 = store
.reconstruct_state("document-1", Some(timestamps[9]))
.unwrap();
assert_eq!(state_at_v9["event_count"], 10);
let current_state = store.reconstruct_state("document-1", None).unwrap();
assert_eq!(current_state["event_count"], 10);
}
#[test]
fn test_multi_entity_queries() {
let store = EventStore::new();
for user_id in 1..=5 {
for event_num in 1..=10 {
let event = create_test_event(
&format!("user-{user_id}"),
"activity.logged",
json!({"activity": event_num, "user_id": user_id}),
);
store.ingest(&event).unwrap();
}
}
let query = QueryEventsRequest {
entity_id: Some("user-3".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 10);
let query = QueryEventsRequest {
entity_id: None,
event_type: Some("activity.logged".to_string()),
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 50);
let stats = store.stats();
assert_eq!(stats.total_entities, 5);
assert_eq!(stats.total_events, 50);
}
#[test]
fn test_compaction_reduces_files() {
let temp_dir = TempDir::new().unwrap();
let compaction_config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 1024, auto_compact: false, ..Default::default()
};
let config = EventStoreConfig {
storage_dir: Some(temp_dir.path().to_path_buf()),
compaction_config,
..Default::default()
};
let store = EventStore::with_config(config);
for batch in 0..3 {
for i in 0..5 {
let event = create_test_event(
&format!("item-{batch}"),
"item.created",
json!({"batch": batch, "item": i}),
);
store.ingest(&event).unwrap();
}
store.flush_storage().unwrap();
}
if let Some(compaction_manager) = store.compaction_manager() {
let result = compaction_manager.compact_now().unwrap();
if result.files_compacted > 0 {
assert!(result.bytes_before > 0);
assert!(result.events_compacted >= 15);
}
}
}
#[test]
fn test_projection_aggregations() {
let store = EventStore::new();
for i in 0..20 {
let event_type = if i % 2 == 0 {
"user.created"
} else {
"user.updated"
};
let event = create_test_event(&format!("user-{i}"), event_type, json!({"index": i}));
store.ingest(&event).unwrap();
}
let snapshot = store.get_snapshot("user-1");
assert!(snapshot.is_ok() || snapshot.is_err());
let stats = store.stats();
assert_eq!(stats.total_event_types, 2);
}
#[test]
fn test_concurrent_ingestion() {
let store = Arc::new(EventStore::new());
let mut handles = vec![];
for thread_id in 0..5 {
let store_clone = Arc::clone(&store);
let handle = std::thread::spawn(move || {
for i in 0..20 {
let event = create_test_event(
&format!("thread-{thread_id}-entity-{i}"),
"concurrent.write",
json!({"thread": thread_id, "index": i}),
);
store_clone.ingest(&event).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let stats = store.stats();
assert_eq!(stats.total_events, 100); }
#[test]
fn test_event_stream_ordering() {
let store = EventStore::new();
let mut timestamps = vec![];
for i in 0..50 {
let event = create_test_event("ordered-entity", "sequence.event", json!({"sequence": i}));
timestamps.push(event.timestamp);
store.ingest(&event).unwrap();
}
let query = QueryEventsRequest {
entity_id: Some("ordered-entity".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
for i in 1..events.len() {
assert!(
events[i - 1].timestamp() <= events[i].timestamp(),
"Events should be ordered by timestamp"
);
}
}
#[test]
fn test_full_production_config() {
let storage_dir = TempDir::new().unwrap();
let wal_dir = TempDir::new().unwrap();
let snapshot_config = SnapshotConfig {
event_threshold: 10,
auto_snapshot: true,
..Default::default()
};
let wal_config = WALConfig {
sync_on_write: true,
..Default::default()
};
let compaction_config = CompactionConfig {
auto_compact: false, ..Default::default()
};
let config = EventStoreConfig::production(
storage_dir.path(),
wal_dir.path(),
snapshot_config,
wal_config,
compaction_config,
);
let store = EventStore::with_config(config);
for i in 0..100 {
let event = create_test_event("production-entity", "production.event", json!({"value": i}));
store.ingest(&event).unwrap();
}
let stats = store.stats();
assert_eq!(stats.total_events, 100);
let snapshot_manager = store.snapshot_manager();
assert!(
snapshot_manager
.get_latest_snapshot("production-entity")
.is_some()
);
store.flush_storage().unwrap();
store.create_snapshot("production-entity").unwrap();
}
#[test]
fn test_entity_not_found_error() {
let store = EventStore::new();
let result = store.reconstruct_state("non-existent", None);
assert!(result.is_err());
let query = QueryEventsRequest {
entity_id: Some("non-existent".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 0);
}
#[test]
fn test_event_validation() {
let result = Event::from_strings(
"test".to_string(),
String::new(),
"default".to_string(),
json!({}),
None,
);
assert!(result.is_err(), "Empty entity_id should fail validation");
let result = Event::from_strings(
String::new(),
"entity-1".to_string(),
"default".to_string(),
json!({}),
None,
);
assert!(result.is_err(), "Empty event_type should fail validation");
}
#[test]
fn test_snapshot_time_travel_optimization() {
let store = EventStore::new();
for i in 0..100 {
let event = create_test_event(
"heavy-entity",
"data.update",
json!({"value": i, "data": format!("Data {}", i)}),
);
store.ingest(&event).unwrap();
}
store.create_snapshot("heavy-entity").unwrap();
let _state = store.reconstruct_state("heavy-entity", None).unwrap();
let snapshot_manager = store.snapshot_manager();
let snapshot = snapshot_manager
.get_latest_snapshot("heavy-entity")
.unwrap();
assert_eq!(
snapshot.event_count, 100,
"Snapshot should contain 100 events"
);
for i in 100..110 {
let event = create_test_event("heavy-entity", "data.update", json!({"value": i}));
store.ingest(&event).unwrap();
}
let state = store.reconstruct_state("heavy-entity", None).unwrap();
let history_len = state["history"].as_array().unwrap().len();
assert!(
history_len <= 11,
"With snapshot optimization, history should contain only events after snapshot, got {history_len}"
);
assert_eq!(
state["current_state"]["value"], 109,
"Final state should reflect all events"
);
}
#[test]
fn test_metadata_preservation() {
let store = EventStore::new();
let event = Event::from_strings(
"metadata.test".to_string(),
"meta-entity".to_string(),
"default".to_string(),
json!({"key": "value"}),
Some(json!({"source": "test", "trace_id": "12345"})),
)
.unwrap();
store.ingest(&event).unwrap();
let query = QueryEventsRequest {
entity_id: Some("meta-entity".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
};
let events = store.query(&query).unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].metadata().is_some());
assert_eq!(events[0].metadata().unwrap()["source"], "test");
}