use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use eventcore_types::{
CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
StreamVersion, StreamWriteEntry, StreamWrites,
};
use uuid::Uuid;
type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
#[derive(Debug, Clone)]
struct GlobalLogEntry {
event_id: Uuid,
stream_id: String,
event_type: String,
event_data: serde_json::Value,
}
struct StoreData {
streams: HashMap<StreamId, StreamData>,
global_log: Vec<GlobalLogEntry>,
checkpoints: HashMap<String, StreamPosition>,
locks: Arc<RwLock<HashMap<String, ()>>>,
}
pub struct InMemoryEventStore {
data: std::sync::Mutex<StoreData>,
}
impl InMemoryEventStore {
pub fn new() -> Self {
Self {
data: std::sync::Mutex::new(StoreData {
streams: HashMap::new(),
global_log: Vec::new(),
checkpoints: HashMap::new(),
locks: Arc::new(RwLock::new(HashMap::new())),
}),
}
}
}
impl Default for InMemoryEventStore {
fn default() -> Self {
Self::new()
}
}
impl EventStore for InMemoryEventStore {
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
let data = self
.data
.lock()
.map_err(|_| EventStoreError::StoreFailure {
operation: Operation::ReadStream,
})?;
let events = match data.streams.get(&stream_id) {
None => Vec::new(),
Some((boxed_events, _version)) => {
let mut events = Vec::with_capacity(boxed_events.len());
for boxed in boxed_events {
match boxed.downcast_ref::<E>() {
Some(event) => events.push(event.clone()),
None => {
return Err(EventStoreError::DeserializationFailed {
stream_id,
detail: format!(
"event could not be downcast to {}",
std::any::type_name::<E>()
),
});
}
}
}
events
}
};
Ok(EventStreamReader::new(events))
}
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let mut data = self
.data
.lock()
.map_err(|_| EventStoreError::StoreFailure {
operation: Operation::AppendEvents,
})?;
let expected_versions = writes.expected_versions().clone();
for (stream_id, expected_version) in &expected_versions {
let current_version = data
.streams
.get(stream_id)
.map(|(_events, version)| *version)
.unwrap_or_else(|| StreamVersion::new(0));
if current_version != *expected_version {
return Err(EventStoreError::VersionConflict {
stream_id: stream_id.clone(),
expected: *expected_version,
actual: current_version,
});
}
}
for entry in writes.into_entries() {
let StreamWriteEntry {
stream_id,
event,
event_type,
event_data,
} = entry;
let event_id = Uuid::now_v7();
data.global_log.push(GlobalLogEntry {
event_id,
stream_id: stream_id.as_ref().to_string(),
event_type: event_type.to_string(),
event_data,
});
let (events, version) = data
.streams
.entry(stream_id)
.or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
events.push(event);
*version = version.increment();
}
Ok(EventStreamSlice)
}
}
impl EventReader for InMemoryEventStore {
type Error = EventStoreError;
async fn read_events<E: Event>(
&self,
filter: EventFilter,
page: EventPage,
) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
let data = self
.data
.lock()
.map_err(|_| EventStoreError::StoreFailure {
operation: Operation::ReadStream,
})?;
let after_event_id = page.after_position().map(|p| p.into_inner());
let events: Vec<(E, StreamPosition)> = data
.global_log
.iter()
.filter(|entry| {
match after_event_id {
None => true,
Some(after_id) => entry.event_id > after_id,
}
})
.filter(|entry| {
match filter.stream_prefix() {
None => true,
Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
}
})
.filter(|entry| {
let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
entry.event_type == type_filter
})
.take(page.limit().into_inner())
.filter_map(|entry| {
serde_json::from_value::<E>(entry.event_data.clone())
.ok()
.map(|e| (e, StreamPosition::new(entry.event_id)))
})
.collect();
Ok(events)
}
}
impl CheckpointStore for InMemoryEventStore {
type Error = InMemoryCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let data = self
.data
.lock()
.map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
Ok(data.checkpoints.get(name).copied())
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let mut data = self
.data
.lock()
.map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
let _ = data.checkpoints.insert(name.to_string(), position);
Ok(())
}
}
impl ProjectorCoordinator for InMemoryEventStore {
type Error = InMemoryCoordinationError;
type Guard = InMemoryCoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
let data = self
.data
.lock()
.map_err(|e| InMemoryCoordinationError::LockPoisoned {
message: e.to_string(),
})?;
let mut guard =
data.locks
.write()
.map_err(|e| InMemoryCoordinationError::LockPoisoned {
message: e.to_string(),
})?;
if guard.contains_key(subscription_name) {
return Err(InMemoryCoordinationError::LeadershipNotAcquired {
subscription_name: subscription_name.to_string(),
});
}
let _ = guard.insert(subscription_name.to_string(), ());
Ok(InMemoryCoordinationGuard {
subscription_name: subscription_name.to_string(),
locks: Arc::clone(&data.locks),
})
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryCheckpointStore {
checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
}
impl InMemoryCheckpointStore {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum InMemoryCheckpointError {
#[error("failed to acquire lock: {0}")]
LockFailed(String),
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum InMemoryCoordinationError {
#[error(
"leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
)]
LeadershipNotAcquired { subscription_name: String },
#[error("lock poisoned: {message}")]
LockPoisoned { message: String },
}
#[derive(Debug)]
pub struct InMemoryCoordinationGuard {
subscription_name: String,
locks: Arc<RwLock<HashMap<String, ()>>>,
}
impl Drop for InMemoryCoordinationGuard {
fn drop(&mut self) {
if let Ok(mut guard) = self.locks.write() {
let _ = guard.remove(&self.subscription_name);
} else {
tracing::error!(
subscription_name = %self.subscription_name,
"failed to release coordination lock: RwLock poisoned"
);
}
}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryProjectorCoordinator {
locks: Arc<RwLock<HashMap<String, ()>>>,
}
impl InMemoryProjectorCoordinator {
pub fn new() -> Self {
Self::default()
}
}
impl ProjectorCoordinator for InMemoryProjectorCoordinator {
type Error = InMemoryCoordinationError;
type Guard = InMemoryCoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
let mut guard =
self.locks
.write()
.map_err(|e| InMemoryCoordinationError::LockPoisoned {
message: e.to_string(),
})?;
if guard.contains_key(subscription_name) {
return Err(InMemoryCoordinationError::LeadershipNotAcquired {
subscription_name: subscription_name.to_string(),
});
}
let _ = guard.insert(subscription_name.to_string(), ());
Ok(InMemoryCoordinationGuard {
subscription_name: subscription_name.to_string(),
locks: Arc::clone(&self.locks),
})
}
}
impl CheckpointStore for InMemoryCheckpointStore {
type Error = InMemoryCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let guard = self
.checkpoints
.read()
.map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
Ok(guard.get(name).copied())
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let mut guard = self
.checkpoints
.write()
.map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
let _ = guard.insert(name.to_string(), position);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use eventcore_types::{BatchSize, EventFilter, EventPage};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestEvent {
stream_id: StreamId,
data: String,
}
impl Event for TestEvent {
fn stream_id(&self) -> &StreamId {
&self.stream_id
}
fn event_type_name() -> &'static str {
"TestEvent"
}
}
#[tokio::test]
async fn test_append_and_read_single_event() {
let store = InMemoryEventStore::new();
let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
let event = TestEvent {
stream_id: stream_id.clone(),
data: "test event data".to_string(),
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(event.clone()))
.expect("append should succeed");
let _ = store
.append_events(writes)
.await
.expect("append to succeed");
let reader = store
.read_stream::<TestEvent>(stream_id)
.await
.expect("read to succeed");
let observed = (
reader.is_empty(),
reader.len(),
reader.iter().next().is_none(),
);
assert_eq!(observed, (false, 1usize, false));
}
#[tokio::test]
async fn event_stream_reader_is_empty_reflects_stream_population() {
let store = InMemoryEventStore::new();
let stream_id =
StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
let initial_reader = store
.read_stream::<TestEvent>(stream_id.clone())
.await
.expect("initial read to succeed");
let event = TestEvent {
stream_id: stream_id.clone(),
data: "populated event".to_string(),
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(event))
.expect("append should succeed");
let _ = store
.append_events(writes)
.await
.expect("append to succeed");
let populated_reader = store
.read_stream::<TestEvent>(stream_id)
.await
.expect("populated read to succeed");
let observed = (
initial_reader.is_empty(),
initial_reader.len(),
populated_reader.is_empty(),
populated_reader.len(),
);
assert_eq!(observed, (true, 0usize, false, 1usize));
}
#[tokio::test]
async fn read_stream_iterates_through_events_in_order() {
let store = InMemoryEventStore::new();
let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
let first_event = TestEvent {
stream_id: stream_id.clone(),
data: "first".to_string(),
};
let second_event = TestEvent {
stream_id: stream_id.clone(),
data: "second".to_string(),
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(first_event))
.and_then(|writes| writes.append(second_event))
.expect("append chain should succeed");
let _ = store
.append_events(writes)
.await
.expect("append to succeed");
let reader = store
.read_stream::<TestEvent>(stream_id)
.await
.expect("read to succeed");
let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
let observed = (reader.is_empty(), collected);
assert_eq!(
observed,
(false, vec!["first".to_string(), "second".to_string()])
);
}
#[test]
fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
.expect("valid stream id");
let first_event = TestEvent {
stream_id: stream_id.clone(),
data: "first-event".to_string(),
};
let second_event = TestEvent {
stream_id: stream_id.clone(),
data: "second-event".to_string(),
};
let writes_result = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(first_event))
.and_then(|writes| writes.append(second_event));
assert!(writes_result.is_ok());
}
#[test]
fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
let stream_id =
StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
let first_event = TestEvent {
stream_id: stream_id.clone(),
data: "first-event-conflict".to_string(),
};
let second_event = TestEvent {
stream_id: stream_id.clone(),
data: "second-event-conflict".to_string(),
};
let conflict = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(first_event))
.and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
.and_then(|writes| writes.append(second_event));
let message = conflict.unwrap_err().to_string();
assert_eq!(
message,
"conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
);
}
#[tokio::test]
async fn stream_writes_registers_stream_before_appending_multiple_events() {
let store = InMemoryEventStore::new();
let stream_id =
StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
let first_event = TestEvent {
stream_id: stream_id.clone(),
data: "first-registered-event".to_string(),
};
let second_event = TestEvent {
stream_id: stream_id.clone(),
data: "second-registered-event".to_string(),
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(first_event))
.and_then(|writes| writes.append(second_event))
.expect("registered stream should accept events");
let result = store.append_events(writes).await;
assert!(
result.is_ok(),
"append should succeed when stream registered before events"
);
}
#[test]
fn stream_writes_rejects_appends_for_unregistered_streams() {
let stream_id =
StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
let event = TestEvent {
stream_id: stream_id.clone(),
data: "unregistered-event".to_string(),
};
let error = StreamWrites::new()
.append(event)
.expect_err("append without prior registration should fail");
assert!(matches!(
error,
EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
));
}
#[test]
fn expected_versions_returns_registered_streams_and_versions() {
let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
let writes = StreamWrites::new()
.register_stream(stream_a.clone(), StreamVersion::new(0))
.and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
.expect("registration should succeed");
let versions = writes.expected_versions();
assert_eq!(versions.len(), 2);
assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
}
#[test]
fn stream_id_rejects_asterisk_metacharacter() {
let result = StreamId::try_new("account-*");
assert!(
result.is_err(),
"StreamId should reject asterisk glob metacharacter"
);
}
#[test]
fn stream_id_rejects_question_mark_metacharacter() {
let result = StreamId::try_new("account-?");
assert!(
result.is_err(),
"StreamId should reject question mark glob metacharacter"
);
}
#[test]
fn stream_id_rejects_open_bracket_metacharacter() {
let result = StreamId::try_new("account-[");
assert!(
result.is_err(),
"StreamId should reject open bracket glob metacharacter"
);
}
#[test]
fn stream_id_rejects_close_bracket_metacharacter() {
let result = StreamId::try_new("account-]");
assert!(
result.is_err(),
"StreamId should reject close bracket glob metacharacter"
);
}
#[tokio::test]
async fn event_reader_after_position_excludes_event_at_position() {
let store = InMemoryEventStore::new();
let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
let event1 = TestEvent {
stream_id: stream_id.clone(),
data: "first".to_string(),
};
let event2 = TestEvent {
stream_id: stream_id.clone(),
data: "second".to_string(),
};
let event3 = TestEvent {
stream_id: stream_id.clone(),
data: "third".to_string(),
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|w| w.append(event1))
.and_then(|w| w.append(event2))
.and_then(|w| w.append(event3))
.expect("append should succeed");
let _ = store
.append_events(writes)
.await
.expect("append to succeed");
let all_events = store
.read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
.await
.expect("read all events to succeed");
assert_eq!(all_events.len(), 3, "Should have 3 events total");
let (first_event, first_position) = &all_events[0];
let page = EventPage::after(*first_position, BatchSize::new(100));
let filter = EventFilter::all();
let events = store
.read_events::<TestEvent>(filter, page)
.await
.expect("read to succeed");
assert_eq!(events.len(), 2, "Should get 2 events after first position");
assert_eq!(
events[0].0.data, "second",
"First returned event should be 'second'"
);
assert_eq!(
events[1].0.data, "third",
"Second returned event should be 'third'"
);
for (event, _pos) in &events {
assert_ne!(
event.data, first_event.data,
"First event should be excluded"
);
}
for (_event, pos) in &events {
assert!(
*pos > *first_position,
"Returned position {} should be > first position {}",
pos,
first_position
);
}
}
#[tokio::test]
async fn in_memory_event_store_implements_checkpoint_store() {
let store = InMemoryEventStore::new();
let position = StreamPosition::new(Uuid::now_v7());
CheckpointStore::save(&store, "test-projector", position)
.await
.expect("save should succeed");
let loaded = CheckpointStore::load(&store, "test-projector")
.await
.expect("load should succeed");
assert_eq!(loaded, Some(position));
}
#[tokio::test]
async fn in_memory_event_store_implements_projector_coordinator() {
let store = InMemoryEventStore::new();
let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
assert!(guard.is_ok(), "should acquire leadership");
}
}