use crate::{DataResult, DataStore, Equivalent};
use serde_json::json;
use serde_json::Value;
use std::any::Any;
use std::collections::VecDeque;
use std::io::Result;
impl Equivalent for Value {
fn equals(&self, other: &dyn Equivalent) -> bool {
if let Some(other_value) = other.as_any().downcast_ref::<Value>() {
self == other_value
} else {
false
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Clone)]
pub struct MemoryConfig {
pub write_key: String,
pub max_items: usize,
pub max_fetch_size: usize,
}
pub struct MemoryStore {
config: MemoryConfig,
items: VecDeque<Value>,
}
impl MemoryStore {
pub fn new(config: MemoryConfig) -> Self {
if config.max_fetch_size < 100 {
panic!("max_fetch_size < 100 bytes? What are you even trying to fetch, empty arrays?");
}
if config.max_items == 0 {
panic!("max_items = 0? So... you want a store that stores nothing? That's what /dev/null is for.");
}
Self {
config,
items: VecDeque::new(),
}
}
fn create_batch(&self, items: &[Value]) -> Value {
json!({
"batch": items,
"sentAt": chrono::Utc::now().to_rfc3339(),
"writeKey": self.config.write_key
})
}
fn get_item_size(item: &Value) -> usize {
item.to_string().len()
}
}
impl DataStore for MemoryStore {
type Output = Value;
fn has_data(&self) -> bool {
!self.items.is_empty()
}
fn reset(&mut self) {
self.items.clear();
}
fn append(&mut self, data: Value) -> Result<()> {
self.items.push_back(data);
while self.items.len() > self.config.max_items {
self.items.pop_front();
}
Ok(())
}
fn fetch(
&mut self,
count: Option<usize>,
max_bytes: Option<usize>,
) -> Result<Option<DataResult<Self::Output>>> {
let max_bytes = max_bytes.unwrap_or(self.config.max_fetch_size);
let mut accumulated_size = 0;
let mut num_items = 0;
for item in self.items.iter() {
let item_size = Self::get_item_size(item);
if accumulated_size + item_size > max_bytes {
break;
}
if let Some(count) = count {
if num_items >= count {
break;
}
}
accumulated_size += item_size;
num_items += 1;
}
if num_items == 0 {
return Ok(None);
}
let items: Vec<Value> = self.items.iter().take(num_items).cloned().collect();
let removable: Vec<Box<dyn Equivalent>> = items
.iter()
.map(|item| Box::new(item.clone()) as Box<dyn Equivalent>)
.collect();
let batch = self.create_batch(&items);
Ok(Some(DataResult {
data: Some(batch),
removable: Some(removable),
}))
}
fn remove(&mut self, data: &[Box<dyn Equivalent>]) -> Result<()> {
self.items
.retain(|item| !data.iter().any(|removable| removable.equals(item)));
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::memory::{MemoryConfig, MemoryStore};
use crate::DataStore;
use serde_json::{json, Value};
use std::io::Result;
#[test]
fn test_basic_operations() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 1000,
max_fetch_size: 1024,
};
let mut store = MemoryStore::new(config);
assert!(!store.has_data());
let event = json!({"event": "test", "value": 123});
store.append(event.clone())?;
assert!(store.has_data());
if let Some(result) = store.fetch(None, None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0]["value"], 123);
assert!(store.has_data());
if let Some(removable) = result.removable {
store.remove(&removable)?;
assert!(!store.has_data());
} else {
panic!("Expected removable items but got none");
}
} else {
panic!("Expected data but got none");
}
Ok(())
}
#[test]
fn test_fifo_behavior() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 3, max_fetch_size: 1024,
};
let mut store = MemoryStore::new(config);
for i in 0..5 {
store.append(json!({"index": i}))?;
}
assert!(store.has_data());
if let Some(result) = store.fetch(None, None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0]["index"], 2);
assert_eq!(items[1]["index"], 3);
assert_eq!(items[2]["index"], 4);
}
Ok(())
}
#[test]
fn test_fetch_limits() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 100,
max_fetch_size: 1000,
};
let mut store = MemoryStore::new(config);
for i in 0..10 {
let padding = "x".repeat(50); store.append(json!({
"index": i,
"padding": padding
}))?;
}
if let Some(result) = store.fetch(Some(3), None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 3, "Count limit not respected");
}
for i in 0..10 {
let padding = "x".repeat(50);
store.append(json!({
"index": i,
"padding": padding
}))?;
}
if let Some(result) = store.fetch(None, Some(200))? {
let items = result.data.unwrap();
let items = items["batch"].as_array().unwrap();
assert!(items.len() <= 3, "Too many items for byte limit");
let total_raw_size: usize = items
.iter()
.map(|item| MemoryStore::get_item_size(item))
.sum();
assert!(total_raw_size <= 200, "Raw items exceed byte limit");
}
Ok(())
}
#[test]
fn test_reset() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 100,
max_fetch_size: 1000,
};
let mut store = MemoryStore::new(config);
for i in 0..5 {
store.append(json!({"index": i}))?;
}
assert!(store.has_data());
store.reset();
assert!(!store.has_data());
Ok(())
}
#[test]
fn test_memory_store_max_fetch_size_edge_cases() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 100,
max_fetch_size: 300, };
let mut store = MemoryStore::new(config);
store.append(json!({"type": "small", "value": "tiny"}))?;
store.append(json!({
"type": "large",
"value": "x".repeat(400), }))?;
if let Some(result) = store.fetch(None, None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1, "Should only fetch the small item");
assert_eq!(items[0]["type"], "small");
if let Some(removable) = result.removable {
store.remove(&removable)?;
}
}
if let Some(result) = store.fetch(None, None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1, "Should fetch the large item");
assert_eq!(items[0]["type"], "large");
if let Some(removable) = result.removable {
store.remove(&removable)?;
}
}
Ok(())
}
#[test]
fn test_memory_store_json_types() -> Result<()> {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 100,
max_fetch_size: 1024,
};
let mut store = MemoryStore::new(config);
store.append(json!(null))?;
store.append(json!(true))?;
store.append(json!(42))?;
store.append(json!(42.5))?;
store.append(json!("string"))?;
store.append(json!([1, 2, 3]))?;
store.append(json!({"key": "value"}))?;
if let Some(result) = store.fetch(None, None)? {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(
items.len(),
7,
"All JSON types should be stored and retrieved"
);
}
Ok(())
}
#[test]
#[should_panic(
expected = "max_fetch_size < 100 bytes? What are you even trying to fetch, empty arrays?"
)]
fn test_rejects_tiny_max_fetch_size() {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 1000,
max_fetch_size: 50, };
let _store = MemoryStore::new(config);
}
#[test]
#[should_panic(
expected = "max_items = 0? So... you want a store that stores nothing? That's what /dev/null is for."
)]
fn test_rejects_zero_max_items() {
let config = MemoryConfig {
write_key: "test-key".to_string(),
max_items: 0, max_fetch_size: 1024,
};
let _store = MemoryStore::new(config);
}
}