use std::collections::HashMap;
use std::sync::{RwLock, RwLockWriteGuard};
use uuid::Uuid;
use crate::memory_stream_store::index::LogPositionIndex;
use crate::store::{
Message, MessagePosition, ReadDirection, ReadFromCategory, ReadFromStream, Stream,
StreamMessage, StreamVersion, WriteResult, WriteToStream,
};
pub struct MemoryStreamStore {
log: RwLock<Vec<StreamMessage>>,
streams: RwLock<LogPositionIndex>,
categories: RwLock<LogPositionIndex>,
stream_revisions: RwLock<HashMap<String, usize>>,
}
impl MemoryStreamStore {
pub fn new() -> Self {
Self {
log: RwLock::new(Vec::new()),
streams: RwLock::new(LogPositionIndex::new()),
categories: RwLock::new(LogPositionIndex::new()),
stream_revisions: RwLock::new(HashMap::new()),
}
}
fn do_write(
log: &mut RwLockWriteGuard<Vec<StreamMessage>>,
stream_index: &mut RwLockWriteGuard<LogPositionIndex>,
category_index: &mut RwLockWriteGuard<LogPositionIndex>,
stream_metadata: &mut RwLockWriteGuard<HashMap<String, usize>>,
stream_name: &str,
event: StreamMessage,
) -> WriteResult {
let pos = event.position.clone();
log.push(event);
stream_index.write_position(stream_name, pos.position);
let category = stream_name
.split('-')
.next()
.expect("No category can be inferred from stream");
category_index.write_position(category, pos.position);
stream_metadata.insert(stream_name.to_owned(), pos.revision);
WriteResult::Ok(pos)
}
}
impl Default for MemoryStreamStore {
fn default() -> Self {
Self::new()
}
}
impl ReadFromStream for MemoryStreamStore {
fn read_from_stream(
&self,
stream_name: &str,
direction: ReadDirection,
) -> (StreamVersion, Stream) {
let log = self.log.read().unwrap();
let index = self.streams.read().unwrap();
let meta = self.stream_revisions.read().unwrap();
let revision = meta.get(stream_name).map(|n| StreamVersion::Revision(*n)).unwrap_or(StreamVersion::NoStream);
if revision == StreamVersion::NoStream {
return (revision, vec![]);
}
let log_positions = index.get_positions(stream_name);
let mut messages = Vec::with_capacity(log_positions.len() - 1);
for position in log_positions {
let message = log.get(*position).unwrap();
messages.push(message.clone());
}
let stream = if direction == ReadDirection::Forwards {
messages
} else {
messages.into_iter().rev().collect()
};
(revision, stream)
}
}
impl ReadFromCategory for MemoryStreamStore {
fn read_from_category(
&self,
category_name: &str,
offset: usize,
max_messages: Option<usize>,
) -> Stream {
let log = self.log.read().unwrap();
let index = self.categories.read().unwrap();
let log_positions = index.get_positions_from(category_name, offset);
let n = max_messages.unwrap_or(log_positions.len());
let mut messages = Vec::with_capacity(n);
for position in log_positions.iter().take(n) {
messages.push(log.get(*position).unwrap().clone());
}
messages
}
}
impl WriteToStream for MemoryStreamStore {
fn write_to_stream(
&mut self,
stream_name: &str,
expected_version: StreamVersion,
messages: &[Message],
) -> WriteResult {
let mut log = self.log.write().unwrap();
let mut streams = self.streams.write().unwrap();
let mut categories = self.categories.write().unwrap();
let mut stream_metadata = self.stream_revisions.write().unwrap();
let version = stream_metadata
.get(stream_name)
.map(|revision| StreamVersion::Revision(*revision))
.unwrap_or(StreamVersion::NoStream);
if version != expected_version {
return WriteResult::WrongExpectedVersion;
}
let next_rev = match version {
StreamVersion::NoStream => 0,
StreamVersion::Revision(n) => n + 1,
};
let mut next_pos = MessagePosition {
revision: next_rev,
position: log.len(),
};
let mut append_result = WriteResult::Ok(next_pos);
for message in messages {
let m = StreamMessage {
id: Uuid::new_v4().to_string(),
message_type: message.message_type.clone(),
data: message.data.clone(),
metadata: message.metadata.clone(),
position: next_pos,
};
append_result = MemoryStreamStore::do_write(&mut log, &mut streams, &mut categories, &mut stream_metadata, stream_name, m);
next_pos = MessagePosition {
revision: &next_pos.revision + 1,
position: &next_pos.position + 1,
}
}
append_result
}
}
#[cfg(test)]
mod test {
use super::MemoryStreamStore;
use crate::store::{
Message, ReadDirection, ReadFromCategory, ReadFromStream, StreamVersion, WriteResult,
WriteToStream,
};
#[test]
fn it_handles_empty_reads() {
let store = MemoryStreamStore::new();
let (version, messages) = store.read_from_stream("TestStream-1", ReadDirection::Forwards);
assert_eq!(version, StreamVersion::NoStream);
assert_eq!(messages.len(), 0);
}
#[test]
fn it_reads_events_forwards() {
let mut store = MemoryStreamStore::new();
let data_1 = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg_1 = Message {
message_type: "TestMessage".to_owned(),
metadata: vec![],
data: data_1,
};
let data_2 = r#"{"test2": "data2"}"#.as_bytes().to_vec();
let msg_2 = Message {
metadata: vec![],
message_type: "AnotherMessage".to_owned(),
data: data_2,
};
let _ = store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg_1, msg_2]);
let (version, messages) = store.read_from_stream("TestStream-1", ReadDirection::Forwards);
assert_eq!(version, StreamVersion::Revision(1));
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].message_type, "TestMessage");
assert_eq!(messages[1].message_type, "AnotherMessage");
}
#[test]
fn it_reads_a_category() {
let mut store = MemoryStreamStore::new();
let data_1 = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg_1 = Message {
metadata: vec![],
message_type: "TestMessage".to_owned(),
data: data_1,
};
let data_2 = r#"{"test2": "data2"}"#.as_bytes().to_vec();
let msg_2 = Message {
metadata: vec![],
message_type: "AnotherMessage".to_owned(),
data: data_2,
};
store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg_1, msg_2]);
let data = r#"{"test3": "data3"}"#.as_bytes().to_vec();
let msg = Message {
metadata: vec![],
message_type: "A third message".to_owned(),
data: data.clone(),
};
store.write_to_stream("TestStream-2", StreamVersion::NoStream, &[msg]);
let msg = Message {
metadata: vec![],
message_type: "A fourth message".to_owned(),
data: data.clone(),
};
store.write_to_stream("DifferentCategory", StreamVersion::NoStream, &[msg]);
let messages = store.read_from_category("TestStream", 0, None);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].message_type, "TestMessage");
assert_eq!(messages[1].message_type, "AnotherMessage");
assert_eq!(messages[2].message_type, "A third message");
}
#[test]
fn it_reads_a_category_max_messages() {
let mut store = MemoryStreamStore::new();
let data = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg = Message {
metadata: vec![],
message_type: "TestMessage".to_owned(),
data,
};
store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg]);
let data = r#"{"test3": "data3"}"#.as_bytes().to_vec();
let msg = Message {
metadata: vec![],
message_type: "A second message".to_owned(),
data: data.clone(),
};
store.write_to_stream("TestStream-2", StreamVersion::NoStream, &[msg]);
let msg = Message {
message_type: "A third message".to_owned(),
metadata: vec![],
data,
};
store.write_to_stream("TestStream-1", StreamVersion::Revision(0), &[msg]);
let messages = store.read_from_category("TestStream", 0, Some(2));
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].message_type, "TestMessage");
assert_eq!(messages[1].message_type, "A second message");
}
#[test]
fn it_reads_a_category_from_offset() {
let mut store = MemoryStreamStore::new();
let data = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg = Message {
message_type: "TestMessage".to_owned(),
metadata: vec![],
data,
};
store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg]);
let data = r#"{"test3": "data3"}"#.as_bytes().to_vec();
let msg = Message {
message_type: "A second message".to_owned(),
data: data.clone(),
metadata: vec![],
};
let global_position =
match store.write_to_stream("TestStream-2", StreamVersion::NoStream, &[msg]) {
WriteResult::Ok(position) => position.position,
_ => unreachable!(),
};
let msg = Message {
message_type: "A third message".to_owned(),
data,
metadata: vec![],
};
store.write_to_stream("TestStream-1", StreamVersion::Revision(0), &[msg]);
let messages = store.read_from_category("TestStream", global_position, Some(2));
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].message_type, "A second message");
assert_eq!(messages[1].message_type, "A third message");
}
#[test]
fn it_reads_events_backwards() {
let mut store = MemoryStreamStore::new();
let data_1 = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg_1 = Message {
message_type: "TestMessage".to_owned(),
data: data_1,
metadata: vec![],
};
let data_2 = r#"{"test2": "data2"}"#.as_bytes().to_vec();
let msg_2 = Message {
message_type: "AnotherMessage".to_owned(),
metadata: vec![],
data: data_2,
};
let _ = store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg_1, msg_2]);
let (version, messages) = store.read_from_stream("TestStream-1", ReadDirection::Backwards);
assert_eq!(version, StreamVersion::Revision(1));
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].message_type, "AnotherMessage");
assert_eq!(messages[1].message_type, "TestMessage");
}
#[test]
fn it_handles_conflict() {
let data = r#"{"test": "data"}"#.as_bytes().to_vec();
let msg = Message {
metadata: vec![],
message_type: "TestMessage".to_owned(),
data,
};
let mut store = MemoryStreamStore::new();
store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg.clone()]);
let append_result = store.write_to_stream("TestStream-1", StreamVersion::NoStream, &[msg]);
assert_eq!(append_result, WriteResult::WrongExpectedVersion);
let (version, messages) = store.read_from_stream("TestStream-1", ReadDirection::Forwards);
assert_eq!(messages.len(), 1);
assert_eq!(version, StreamVersion::Revision(0));
}
}