use allsource_core::{
QueryEventsRequest,
auth::{AuthManager, Permission, Role},
domain::{
entities::{Event, TenantQuotas},
repositories::TenantRepository,
value_objects::TenantId,
},
infrastructure::repositories::InMemoryTenantRepository,
rate_limit::{RateLimitConfig, RateLimiter},
store::EventStore,
};
use serde_json::json;
use std::{sync::Arc, thread};
#[test]
fn test_embedded_eventstore_creation() {
let store = EventStore::new();
let stats = store.stats();
assert_eq!(stats.total_events, 0);
assert_eq!(stats.total_entities, 0);
println!("✅ Test 1: Embedded EventStore creation successful");
}
#[test]
fn test_embedded_event_ingestion() {
let store = EventStore::new();
let event = Event::from_strings(
"user.created".to_string(),
"user-001".to_string(),
"default".to_string(),
json!({
"name": "Alice",
"email": "alice@example.com",
"role": "developer"
}),
Some(json!({
"source": "embedded_test",
"version": "1.0"
})),
)
.expect("Failed to create event");
store.ingest(&event).expect("Failed to ingest event");
let stats = store.stats();
assert_eq!(stats.total_events, 1);
println!("✅ Test 2: Embedded event ingestion successful");
}
#[test]
fn test_embedded_event_querying() {
let store = EventStore::new();
let events_data = vec![
("order.created", "order-001", json!({"total": 100.0})),
(
"order.updated",
"order-001",
json!({"status": "processing"}),
),
(
"order.shipped",
"order-001",
json!({"tracking": "TRACK123"}),
),
("order.created", "order-002", json!({"total": 250.0})),
];
for (event_type, entity_id, payload) in events_data {
let event = Event::from_strings(
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
None,
)
.unwrap();
store.ingest(&event).unwrap();
}
let order1_events = store
.query(&QueryEventsRequest {
entity_id: Some("order-001".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
})
.expect("Failed to query events");
assert_eq!(order1_events.len(), 3);
let created_events = store
.query(&QueryEventsRequest {
entity_id: None,
event_type: Some("order.created".to_string()),
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
})
.expect("Failed to query by event type");
assert_eq!(created_events.len(), 2);
println!("✅ Test 3: Embedded event querying successful");
}
#[test]
fn test_embedded_state_reconstruction() {
let store = EventStore::new();
let events = [
json!({"name": "John", "email": "john@example.com"}),
json!({"status": "active", "verified": true}),
json!({"department": "engineering", "level": 5}),
];
for (i, payload) in events.iter().enumerate() {
let event = Event::from_strings(
format!("user.updated.{i}"),
"user-state-test".to_string(),
"default".to_string(),
payload.clone(),
None,
)
.unwrap();
store.ingest(&event).unwrap();
}
let state = store
.reconstruct_state("user-state-test", None)
.expect("Failed to reconstruct state");
let current_state = &state["current_state"];
assert_eq!(current_state["name"], "John");
assert_eq!(current_state["email"], "john@example.com");
assert_eq!(current_state["status"], "active");
assert_eq!(current_state["verified"], true);
assert_eq!(current_state["department"], "engineering");
assert_eq!(current_state["level"], 5);
println!("✅ Test 4: Embedded state reconstruction successful");
}
#[test]
fn test_embedded_snapshot_management() {
let store = EventStore::new();
for i in 0..10 {
let event = Event::from_strings(
"counter.incremented".to_string(),
"counter-snapshot-test".to_string(),
"default".to_string(),
json!({"count": i, "timestamp": i * 1000}),
None,
)
.unwrap();
store.ingest(&event).unwrap();
}
store
.create_snapshot("counter-snapshot-test")
.expect("Failed to create snapshot");
let snapshot_manager = store.snapshot_manager();
let snapshot = snapshot_manager.get_latest_snapshot("counter-snapshot-test");
assert!(snapshot.is_some(), "Snapshot should exist");
let snapshot = snapshot.unwrap();
assert_eq!(snapshot.entity_id, "counter-snapshot-test");
assert_eq!(snapshot.event_count, 10);
println!("✅ Test 5: Embedded snapshot management successful");
}
#[test]
fn test_embedded_schema_registry() {
let store = EventStore::new();
let schema_registry = store.schema_registry();
assert!(Arc::strong_count(&schema_registry) >= 1);
println!("✅ Test 6: Embedded schema registry access successful");
}
#[tokio::test]
async fn test_embedded_multi_tenant_isolation() {
let store = Arc::new(EventStore::new());
let repo = InMemoryTenantRepository::new();
let tenant_a = repo
.create(
TenantId::new("tenant-a".to_string()).unwrap(),
"Tenant A".to_string(),
TenantQuotas::professional(),
)
.await
.expect("Failed to create tenant A");
let tenant_b = repo
.create(
TenantId::new("tenant-b".to_string()).unwrap(),
"Tenant B".to_string(),
TenantQuotas::free_tier(),
)
.await
.expect("Failed to create tenant B");
let event_a = Event::from_strings(
"data.created".to_string(),
"entity-1".to_string(),
tenant_a.id().as_str().to_string(),
json!({"value": "tenant_a_data"}),
None,
)
.unwrap();
let event_b = Event::from_strings(
"data.created".to_string(),
"entity-1".to_string(), tenant_b.id().as_str().to_string(),
json!({"value": "tenant_b_data"}),
None,
)
.unwrap();
store.ingest(&event_a).unwrap();
store.ingest(&event_b).unwrap();
let all_events = store
.query(&QueryEventsRequest {
entity_id: Some("entity-1".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
})
.unwrap();
assert_eq!(all_events.len(), 2);
let tenant_ids: Vec<&str> = all_events
.iter()
.map(allsource_core::Event::tenant_id_str)
.collect();
assert!(tenant_ids.contains(&"tenant-a"));
assert!(tenant_ids.contains(&"tenant-b"));
println!("Test 7: Embedded multi-tenant isolation successful");
}
#[test]
fn test_embedded_concurrent_operations() {
let store = Arc::new(EventStore::new());
let mut handles = vec![];
for thread_id in 0..4 {
let store_clone = Arc::clone(&store);
let handle = thread::spawn(move || {
for i in 0..25 {
let event = Event::from_strings(
"concurrent.event".to_string(),
format!("thread-{thread_id}-entity-{i}"),
"default".to_string(),
json!({
"thread_id": thread_id,
"event_index": i
}),
None,
)
.unwrap();
store_clone
.ingest(&event)
.expect("Concurrent ingest failed");
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let stats = store.stats();
assert_eq!(stats.total_events, 100);
println!("✅ Test 8: Embedded concurrent operations successful");
}
#[test]
fn test_embedded_projection_manager() {
let store = EventStore::new();
for i in 0..5 {
let event = Event::from_strings(
"item.added".to_string(),
"projection-test-entity".to_string(),
"default".to_string(),
json!({"item": format!("item-{}", i)}),
None,
)
.unwrap();
store.ingest(&event).unwrap();
}
let projections = store.projection_manager();
let registered_count = projections.list_projections().len();
assert!(
registered_count >= 2,
"Should have at least 2 built-in projections"
);
println!("✅ Test 9: Embedded projection manager access successful");
}
#[test]
fn test_embedded_pipeline_manager() {
let store = EventStore::new();
let pipeline_manager = store.pipeline_manager();
assert!(Arc::strong_count(&pipeline_manager) >= 1);
println!("✅ Test 10: Embedded pipeline manager access successful");
}
#[test]
fn test_embedded_replay_manager() {
let store = EventStore::new();
for i in 0..5 {
let event = Event::from_strings(
"replay.test".to_string(),
"replay-entity".to_string(),
"default".to_string(),
json!({"sequence": i}),
None,
)
.unwrap();
store.ingest(&event).unwrap();
}
let replay_manager = store.replay_manager();
assert!(Arc::strong_count(&replay_manager) >= 1);
println!("✅ Test 11: Embedded replay manager access successful");
}
#[test]
fn test_embedded_auth_manager() {
let auth_manager = Arc::new(AuthManager::default());
let user = auth_manager
.register_user(
"embedded_user".to_string(),
"embedded@example.com".to_string(),
"secure_password_123",
Role::Developer,
"default".to_string(),
)
.expect("Failed to register user");
assert_eq!(user.username, "embedded_user");
let token = auth_manager
.authenticate("embedded_user", "secure_password_123")
.expect("Failed to authenticate");
assert!(!token.is_empty());
let claims = auth_manager
.validate_token(&token)
.expect("Failed to validate token");
assert_eq!(claims.sub, user.id.to_string());
assert!(user.role.has_permission(Permission::Read));
assert!(user.role.has_permission(Permission::Write));
assert!(!user.role.has_permission(Permission::Admin));
println!("✅ Test 12: Embedded auth manager successful");
}
#[test]
fn test_embedded_rate_limiter() {
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig {
requests_per_minute: 10,
burst_size: 5,
}));
for i in 0..5 {
let result = rate_limiter.check_rate_limit("embedded_tenant");
assert!(result.allowed, "Request {} should be allowed", i + 1);
}
let result = rate_limiter.check_rate_limit("embedded_tenant");
assert!(!result.allowed, "6th request should be rate limited");
println!("✅ Test 13: Embedded rate limiter successful");
}
#[test]
fn test_embedded_metrics_registry() {
let store = EventStore::new();
let event = Event::from_strings(
"metrics.test".to_string(),
"metrics-entity".to_string(),
"default".to_string(),
json!({"test": true}),
None,
)
.unwrap();
store.ingest(&event).unwrap();
let metrics = store.metrics();
assert!(Arc::strong_count(&metrics) >= 1);
println!("✅ Test 14: Embedded metrics registry access successful");
}
#[test]
fn test_embedded_projection_state_cache() {
let store = EventStore::new();
let event = Event::from_strings(
"cache.test".to_string(),
"cache-entity".to_string(),
"default".to_string(),
json!({"cached": true}),
None,
)
.unwrap();
store.ingest(&event).unwrap();
let cache = store.projection_state_cache();
assert!(Arc::strong_count(&cache) >= 1);
println!("✅ Test 15: Embedded projection state cache access successful");
}
#[tokio::test]
async fn test_embedded_full_workflow() {
let store = Arc::new(EventStore::new());
let repo = InMemoryTenantRepository::new();
let auth_manager = Arc::new(AuthManager::default());
let tenant = repo
.create(
TenantId::new("workflow-tenant".to_string()).unwrap(),
"Workflow Test Tenant".to_string(),
TenantQuotas::professional(),
)
.await
.unwrap();
let tenant_id_str = tenant.id().as_str().to_string();
let user = auth_manager
.register_user(
"workflow_user".to_string(),
"workflow@example.com".to_string(),
"password123",
Role::Developer,
tenant_id_str.clone(),
)
.unwrap();
let order_created = Event::from_strings(
"order.created".to_string(),
"order-workflow-1".to_string(),
tenant_id_str.clone(),
json!({
"customer_id": user.id.to_string(),
"items": [
{"sku": "ITEM-001", "qty": 2, "price": 29.99},
{"sku": "ITEM-002", "qty": 1, "price": 49.99}
],
"total": 109.97
}),
Some(json!({"created_by": user.username.clone()})),
)
.unwrap();
store.ingest(&order_created).unwrap();
let order_confirmed = Event::from_strings(
"order.confirmed".to_string(),
"order-workflow-1".to_string(),
tenant_id_str.clone(),
json!({
"status": "confirmed",
"payment_id": "PAY-12345"
}),
None,
)
.unwrap();
store.ingest(&order_confirmed).unwrap();
let order_shipped = Event::from_strings(
"order.shipped".to_string(),
"order-workflow-1".to_string(),
tenant_id_str.clone(),
json!({
"status": "shipped",
"tracking_number": "TRACK-67890",
"carrier": "FastShip"
}),
None,
)
.unwrap();
store.ingest(&order_shipped).unwrap();
let order_events = store
.query(&QueryEventsRequest {
entity_id: Some("order-workflow-1".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
..Default::default()
})
.unwrap();
assert_eq!(order_events.len(), 3);
let order_state = store.reconstruct_state("order-workflow-1", None).unwrap();
let current = &order_state["current_state"];
assert_eq!(current["status"], "shipped");
assert_eq!(current["tracking_number"], "TRACK-67890");
store.create_snapshot("order-workflow-1").unwrap();
let stats = store.stats();
assert_eq!(stats.total_events, 3);
println!("Test 16: Full embedded workflow successful");
println!(" - Tenant created: {tenant_id_str}");
println!(" - User registered: {}", user.username);
println!(" - Order events: 3");
println!(" - Final status: shipped");
}
#[test]
fn test_embedded_mode_verification_summary() {
println!("\n");
println!("{}", "=".repeat(70));
println!(" US-004: EMBEDDED MODE VERIFICATION COMPLETE");
println!("{}", "=".repeat(70));
println!("\n The following capabilities were verified WITHOUT HTTP server:\n");
println!(" ✓ EventStore creation and configuration");
println!(" ✓ Event ingestion using library API");
println!(" ✓ Event querying with filters");
println!(" ✓ State reconstruction from events");
println!(" ✓ Snapshot management");
println!(" ✓ Schema registry access");
println!(" ✓ Multi-tenant support");
println!(" ✓ Concurrent thread-safe operations");
println!(" ✓ Projection manager access");
println!(" ✓ Pipeline manager access");
println!(" ✓ Replay manager access");
println!(" ✓ Authentication and authorization");
println!(" ✓ Rate limiting");
println!(" ✓ Metrics registry access");
println!(" ✓ Projection state cache");
println!(" ✓ Full embedded workflow");
println!("\n AllSource Core can be used as an embedded library.");
println!("{}", "=".repeat(70));
println!("\n");
}