use async_trait::async_trait;
use dashmap::DashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::backend::StreamBackend;
use crate::consumer::ConsumerGroup;
use crate::error::{StreamError, StreamResult};
use crate::event::StreamEvent;
use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
static MEMORY_STORAGE: std::sync::OnceLock<Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>>> =
std::sync::OnceLock::new();
fn get_memory_storage() -> Arc<DashMap<TopicName, Arc<RwLock<TopicData>>>> {
MEMORY_STORAGE
.get_or_init(|| Arc::new(DashMap::new()))
.clone()
}
pub async fn clear_memory_storage() {
let storage = get_memory_storage();
storage.clear();
}
#[derive(Clone)]
struct TopicData {
events: VecDeque<(StreamEvent, Offset)>,
next_offset: u64,
consumer_offsets: HashMap<String, u64>,
}
pub struct MemoryBackend {
connected: bool,
}
impl MemoryBackend {
pub fn new() -> Self {
Self { connected: false }
}
}
impl Default for MemoryBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl StreamBackend for MemoryBackend {
fn name(&self) -> &'static str {
"memory"
}
async fn connect(&mut self) -> StreamResult<()> {
self.connected = true;
Ok(())
}
async fn disconnect(&mut self) -> StreamResult<()> {
self.connected = false;
Ok(())
}
async fn create_topic(&self, topic: &TopicName, _partitions: u32) -> StreamResult<()> {
let storage = get_memory_storage();
storage.entry(topic.clone()).or_insert_with(|| {
Arc::new(RwLock::new(TopicData {
events: VecDeque::new(),
next_offset: 0,
consumer_offsets: HashMap::new(),
}))
});
Ok(())
}
async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()> {
get_memory_storage().remove(topic);
Ok(())
}
async fn list_topics(&self) -> StreamResult<Vec<TopicName>> {
Ok(get_memory_storage()
.iter()
.map(|entry| entry.key().clone())
.collect())
}
async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let mut data = topic_data.write().await;
let offset = Offset::new(data.next_offset);
data.next_offset += 1;
data.events.push_back((event, offset));
if data.events.len() > 10000 {
data.events.pop_front();
}
Ok(offset)
}
async fn send_batch(
&self,
topic: &TopicName,
events: Vec<StreamEvent>,
) -> StreamResult<Vec<Offset>> {
let mut offsets = Vec::new();
for event in events {
let offset = self.send_event(topic, event).await?;
offsets.push(offset);
}
Ok(offsets)
}
async fn receive_events(
&self,
topic: &TopicName,
consumer_group: Option<&ConsumerGroup>,
position: StreamPosition,
max_events: usize,
) -> StreamResult<Vec<(StreamEvent, Offset)>> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let mut data = topic_data.write().await;
let start_offset = if let Some(group) = consumer_group {
let group_name = group.name();
let current_offset = data.consumer_offsets.get(group_name).copied().unwrap_or(0);
match position {
StreamPosition::Beginning => current_offset, StreamPosition::End => data.next_offset,
StreamPosition::Offset(offset) => offset,
}
} else {
match position {
StreamPosition::Beginning => 0,
StreamPosition::End => data.next_offset,
StreamPosition::Offset(offset) => offset,
}
};
let mut events = Vec::new();
for (event, offset) in &data.events {
if offset.value() >= start_offset && events.len() < max_events {
events.push((event.clone(), *offset));
}
}
if let Some(group) = consumer_group {
if let Some((_, last_offset)) = events.last() {
data.consumer_offsets
.insert(group.name().to_string(), last_offset.value() + 1);
}
}
Ok(events)
}
async fn commit_offset(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
_partition: PartitionId,
offset: Offset,
) -> StreamResult<()> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let mut data = topic_data.write().await;
data.consumer_offsets
.insert(consumer_group.name().to_string(), offset.value() + 1);
Ok(())
}
async fn seek(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
_partition: PartitionId,
position: StreamPosition,
) -> StreamResult<()> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let mut data = topic_data.write().await;
let offset = match position {
StreamPosition::Beginning => 0,
StreamPosition::End => data.next_offset,
StreamPosition::Offset(offset) => offset,
};
data.consumer_offsets
.insert(consumer_group.name().to_string(), offset);
Ok(())
}
async fn get_consumer_lag(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
) -> StreamResult<HashMap<PartitionId, u64>> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let data = topic_data.read().await;
let current_offset = data
.consumer_offsets
.get(consumer_group.name())
.copied()
.unwrap_or(0);
let lag = data.next_offset.saturating_sub(current_offset);
let mut result = HashMap::new();
result.insert(PartitionId::new(0), lag);
Ok(result)
}
async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>> {
let storage = get_memory_storage();
let topic_data = storage
.get(topic)
.ok_or_else(|| StreamError::TopicNotFound(topic.to_string()))?;
let data = topic_data.read().await;
let mut metadata = HashMap::new();
metadata.insert("backend".to_string(), "memory".to_string());
metadata.insert("event_count".to_string(), data.events.len().to_string());
metadata.insert("next_offset".to_string(), data.next_offset.to_string());
metadata.insert(
"consumer_groups".to_string(),
data.consumer_offsets.len().to_string(),
);
Ok(metadata)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::StreamEvent;
#[tokio::test]
async fn test_memory_backend_basic_operations() {
clear_memory_storage().await;
let mut backend = MemoryBackend::new();
assert_eq!(backend.name(), "memory");
backend.connect().await.unwrap();
let topic = TopicName::new(format!("test-topic-basic-{}", uuid::Uuid::new_v4()));
backend.create_topic(&topic, 1).await.unwrap();
let topics = backend.list_topics().await.unwrap();
assert!(
topics.iter().any(|t| t.as_str() == topic.as_str()),
"Our topic should exist"
);
let event = StreamEvent::TripleAdded {
subject: "http://example.org/s".to_string(),
predicate: "http://example.org/p".to_string(),
object: "http://example.org/o".to_string(),
graph: None,
metadata: crate::event::EventMetadata::default(),
};
let offset = backend.send_event(&topic, event.clone()).await.unwrap();
assert_eq!(offset.value(), 0);
let events = backend
.receive_events(&topic, None, StreamPosition::Beginning, 10)
.await
.unwrap();
assert_eq!(events.len(), 1);
backend.delete_topic(&topic).await.unwrap();
let topics = backend.list_topics().await.unwrap();
assert!(
!topics.iter().any(|t| t.as_str() == topic.as_str()),
"Our topic should be deleted"
);
}
#[tokio::test]
async fn test_consumer_groups() {
clear_memory_storage().await;
let mut backend = MemoryBackend::new();
backend.connect().await.unwrap();
let topic = TopicName::new(format!("test-topic-groups-{}", uuid::Uuid::new_v4()));
backend.create_topic(&topic, 1).await.unwrap();
for i in 0..5 {
let event = StreamEvent::GraphCreated {
graph: format!("http://example.org/graph{i}"),
metadata: crate::event::EventMetadata::default(),
};
backend.send_event(&topic, event).await.unwrap();
}
let group = ConsumerGroup::new("test-group".to_string());
let events = backend
.receive_events(&topic, Some(&group), StreamPosition::Beginning, 3)
.await
.unwrap();
assert_eq!(events.len(), 3);
let events = backend
.receive_events(&topic, Some(&group), StreamPosition::Beginning, 10)
.await
.unwrap();
assert_eq!(events.len(), 2);
let lag = backend.get_consumer_lag(&topic, &group).await.unwrap();
assert_eq!(lag.get(&PartitionId::new(0)), Some(&0));
}
}