use crate::{DataResult, DataStore, Equivalent};
use chrono::Utc;
use serde_json::Value;
use std::any::Any;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufWriter, Result, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
impl Equivalent for PathBuf {
fn equals(&self, other: &dyn Equivalent) -> bool {
if let Some(other_path) = other.as_any().downcast_ref::<PathBuf>() {
self == other_path
} else {
false
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Clone)]
pub struct DirectoryConfig {
pub write_key: String,
pub storage_location: PathBuf,
pub base_filename: String,
pub max_file_size: usize,
}
pub type FileValidator = Box<dyn Fn(&Path) -> Result<()> + Send + Sync>;
pub struct DirectoryStore {
config: DirectoryConfig,
writer: Option<BufWriter<File>>,
current_size: usize,
current_path: Option<PathBuf>,
file_validator: Option<FileValidator>,
next_index: AtomicU32,
}
impl DirectoryStore {
const TEMP_EXTENSION: &'static str = "temp";
pub fn new(config: DirectoryConfig) -> Result<Self> {
if config.max_file_size < 100 {
panic!("Seriously? max_file_size < 100 bytes? What exactly do you expect to store in there?");
}
fs::create_dir_all(&config.storage_location)?;
let store = DirectoryStore {
config,
writer: None,
current_size: 0,
current_path: None,
file_validator: None,
next_index: AtomicU32::new(0),
};
let max_index = store.initialize_directory()?;
store.next_index.store(max_index + 1, Ordering::SeqCst);
Ok(store)
}
pub fn set_file_validator<F>(&mut self, validator: F)
where
F: Fn(&Path) -> Result<()> + 'static + Send + Sync,
{
self.file_validator = Some(Box::new(validator));
}
fn next_index(&self) -> u32 {
self.next_index.fetch_add(1, Ordering::SeqCst)
}
fn start_file_if_needed(&mut self) -> Result<bool> {
if self.writer.is_some() {
return Ok(false);
}
let mut index = self.next_index();
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 1000;
loop {
let file_path = self
.config
.storage_location
.join(format!("{}-{}", index, self.config.base_filename));
match OpenOptions::new()
.write(true)
.create_new(true)
.open(&file_path)
{
Ok(file) => {
let mut writer = BufWriter::new(file);
self.current_path = Some(file_path);
if self.current_size == 0 {
writer.write_all(b"{ \"batch\": [")?;
self.current_size = "{ \"batch\": [".len();
self.writer = Some(writer);
return Ok(true);
}
self.writer = Some(writer);
return Ok(false);
}
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
index = self.next_index();
attempts += 1;
if attempts >= MAX_ATTEMPTS {
return Err(io::Error::other(
"Failed to find available index after maximum attempts",
));
}
}
Err(e) => return Err(e), }
}
}
fn initialize_directory(&self) -> Result<u32> {
let entries = fs::read_dir(&self.config.storage_location)?;
let mut max_index = 0;
for entry in entries {
let entry = entry?;
let path = entry.path();
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if let Some(index_str) = file_name.split('-').next() {
if let Ok(index) = index_str.parse::<u32>() {
max_index = max_index.max(index);
if path.extension().and_then(|ext| ext.to_str())
!= Some(Self::TEMP_EXTENSION)
{
if let Err(e) = self.finalize_file(&path) {
eprintln!("Failed to finalize file {:?}: {}", path, e);
}
}
}
}
}
}
Ok(max_index)
}
fn finalize_file(&self, path: &Path) -> Result<()> {
{
let mut file = OpenOptions::new().append(true).open(path)?;
write!(
file,
"],\"sentAt\":\"{}\",\"writeKey\":\"{}\"}}",
Utc::now().format("%Y-%m-%dT%H:%M:%S.%3fZ"),
self.config.write_key
)?;
file.flush()?;
}
if let Some(validator) = &self.file_validator {
validator(path)?;
}
let new_path = path.with_extension(Self::TEMP_EXTENSION);
fs::rename(path, new_path)?;
Ok(())
}
fn finish_file(&mut self) -> Result<()> {
let writer = match self.writer.take() {
Some(mut writer) => {
writer.flush()?;
writer
}
None => return Ok(()),
};
drop(writer);
if let Some(current_path) = self.current_path.take() {
self.finalize_file(¤t_path)?;
}
self.current_size = 0;
Ok(())
}
fn sorted_files(&self, include_unfinished: bool) -> Result<Vec<PathBuf>> {
let mut files: Vec<PathBuf> = fs::read_dir(&self.config.storage_location)?
.filter_map(Result::ok)
.map(|e| e.path())
.filter(|p| {
if include_unfinished {
true
} else {
p.extension().and_then(|ext| ext.to_str()) == Some(Self::TEMP_EXTENSION)
}
})
.collect();
files.sort_by(|a, b| {
a.file_name()
.unwrap_or_default()
.cmp(b.file_name().unwrap_or_default())
});
Ok(files)
}
fn up_to_size(&self, max_bytes: usize, files: &[PathBuf]) -> Result<Vec<PathBuf>> {
let mut result = Vec::new();
let mut total_size: u64 = 0;
for file in files {
if let Ok(metadata) = fs::metadata(file) {
let size = metadata.len();
if total_size + size <= max_bytes as u64 {
result.push(file.clone());
total_size += size;
} else {
break;
}
}
}
Ok(result)
}
}
impl DataStore for DirectoryStore {
type Output = Vec<PathBuf>;
fn has_data(&self) -> bool {
if self.writer.is_some() {
return true;
}
fs::read_dir(&self.config.storage_location)
.map(|entries| {
entries.filter_map(Result::ok).any(|e| {
let path = e.path();
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
let is_our_file = file_name
.split('-')
.next()
.and_then(|s| s.parse::<u32>().ok())
.is_some() && file_name
.contains(&self.config.base_filename);
return is_our_file;
}
false
})
})
.unwrap_or(false)
}
fn reset(&mut self) {
if let Ok(files) = self.sorted_files(true) {
let _ = self.remove(
&files
.iter()
.map(|p| Box::new(p.clone()) as Box<dyn Equivalent>)
.collect::<Vec<_>>(),
);
}
}
fn append(&mut self, data: Value) -> Result<()> {
let started = self.start_file_if_needed()?;
let writer = self
.writer
.as_mut()
.ok_or_else(|| io::Error::other("No active writer"))?;
if self.current_size >= self.config.max_file_size {
self.finish_file()?;
return self.append(data);
}
if !started {
writer.write_all(b",")?;
}
serde_json::to_writer(&mut *writer, &data)?;
writer.flush()?;
self.current_size += data.to_string().len();
Ok(())
}
fn fetch(
&mut self,
count: Option<usize>,
max_bytes: Option<usize>,
) -> Result<Option<DataResult<Self::Output>>> {
if self.writer.is_some() {
self.finish_file()?;
}
let mut files = self.sorted_files(false)?;
if let Some(max_bytes) = max_bytes {
files = self.up_to_size(max_bytes, &files)?;
}
if let Some(count) = count {
files.truncate(count);
}
if files.is_empty() {
return Ok(None);
}
let removable = files
.iter()
.map(|p| Box::new(p.clone()) as Box<dyn Equivalent>)
.collect::<Vec<_>>();
Ok(Some(DataResult {
data: Some(files),
removable: Some(removable),
}))
}
fn remove(&mut self, data: &[Box<dyn Equivalent>]) -> Result<()> {
for item in data {
if let Some(path) = item.as_any().downcast_ref::<PathBuf>() {
if let Err(e) = fs::remove_file(path) {
eprintln!("Failed to remove file {:?}: {}", path, e);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{DirectoryConfig, DirectoryStore};
use crate::DataStore;
use serde_json::json;
use serde_json::Value;
use std::fs;
use std::io;
use std::io::Result;
use tempfile::TempDir;
#[test]
fn test_directory_store_basic_operations() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 1024,
};
let mut store = DirectoryStore::new(config)?;
let event = serde_json::json!({
"event": "test",
"properties": {
"value": 123
}
});
store.append(event.clone())?;
store.append(event.clone())?;
if let Some(result) = store.fetch(None, None)? {
assert!(result.data.is_some());
let files = result.data.unwrap();
assert_eq!(files.len(), 1);
let content = fs::read_to_string(&files[0])?;
assert!(content.contains("\"event\":\"test\""));
assert!(content.contains("\"writeKey\":\"test-key\""));
} else {
panic!("Expected data but got none");
}
Ok(())
}
#[test]
fn test_recovers_unfinished_files() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 1024,
};
{
let mut store = DirectoryStore::new(config.clone())?;
store.append(json!({"event": "test1", "value": 123}))?;
store.append(json!({"event": "test2", "value": 456}))?;
}
{
let mut store = DirectoryStore::new(config.clone())?;
store.append(json!({"event": "finished", "value": 789}))?;
store.finish_file()?; }
let mut store = DirectoryStore::new(config)?;
if let Some(result) = store.fetch(None, None)? {
let files = result.data.unwrap();
assert!(!files.is_empty(), "Should have recovered files");
for path in files {
let content = fs::read_to_string(&path)?;
let json: Value = serde_json::from_str(&content)?;
assert!(json.get("batch").is_some(), "Should have batch field");
assert!(json.get("sentAt").is_some(), "Should have sentAt field");
assert_eq!(
json.get("writeKey").and_then(Value::as_str),
Some("test-key"),
"Should have correct writeKey"
);
assert_eq!(
path.extension().and_then(|ext| ext.to_str()),
Some("temp"),
"All files should be finalized"
);
}
}
store.append(json!({"event": "new", "value": 999}))?;
store.finish_file()?;
Ok(())
}
#[test]
fn test_file_rotation() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 100, };
let mut store = DirectoryStore::new(config)?;
for i in 0..5 {
let event = serde_json::json!({
"event": "test",
"index": i,
"data": "some longer data to help hit size limit...."
});
store.append(event)?;
}
if let Some(result) = store.fetch(None, None)? {
let files = result.data.unwrap();
assert!(files.len() > 1, "Expected multiple files due to size limit");
for file in &files {
let content = fs::read_to_string(file)?.trim().to_string();
assert!(content.starts_with("{ \"batch\": ["));
assert!(content.ends_with("}"));
assert!(content.contains("\"writeKey\":\"test-key\""));
let parsed: serde_json::Value = serde_json::from_str(&content)?;
assert!(parsed.get("batch").is_some(), "Missing 'batch' field");
assert!(parsed.get("writeKey").is_some(), "Missing 'writeKey' field");
assert!(parsed.get("sentAt").is_some(), "Missing 'sentAt' field");
}
} else {
panic!("Expected data but got none");
}
Ok(())
}
#[test]
fn test_fetch_with_size_limit() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 200,
};
let mut store = DirectoryStore::new(config)?;
for i in 0..5 {
let event = serde_json::json!({
"event": "test",
"index": i,
"data": "padding data...."
});
store.append(event)?;
}
if let Some(result) = store.fetch(None, Some(250))? {
let files = result.data.unwrap();
let total_size: u64 = files.iter().map(|f| fs::metadata(f).unwrap().len()).sum();
assert!(total_size <= 250, "Fetch exceeded size limit");
}
Ok(())
}
#[test]
fn test_file_cleanup() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 200,
};
let mut store = DirectoryStore::new(config)?;
let event = serde_json::json!({"event": "test"});
store.append(event)?;
if let Some(result) = store.fetch(None, None)? {
let files = result.data.unwrap();
let removable = result.removable.unwrap();
for file in &files {
assert!(file.exists());
}
store.remove(&removable)?;
for file in &files {
assert!(!file.exists());
}
}
Ok(())
}
#[test]
fn test_file_validator() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 1024,
};
let mut store = DirectoryStore::new(config)?;
store.set_file_validator(|path| {
let metadata = fs::metadata(path)?;
if metadata.len() < 10 {
return Err(io::Error::new(io::ErrorKind::Other, "File too small"));
}
Ok(())
});
let event = serde_json::json!({"event": "test"});
store.append(event)?;
let result = store.fetch(None, None)?;
assert!(result.is_some());
Ok(())
}
#[test]
fn test_fetch_limits() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 100, };
let mut store = DirectoryStore::new(config)?;
for i in 0..10 {
let event = serde_json::json!({
"event": "test",
"index": i,
"data": "padding data to make files bigger..."
});
store.append(event)?;
}
if let Some(result) = store.fetch(Some(3), None)? {
let files = result.data.unwrap();
assert_eq!(files.len(), 3, "Count limit not respected");
}
if let Some(result) = store.fetch(None, Some(250))? {
let files = result.data.unwrap();
let total_size: u64 = files.iter().map(|f| fs::metadata(f).unwrap().len()).sum();
assert!(total_size <= 250, "Byte limit not respected");
}
if let Some(result) = store.fetch(Some(5), Some(200))? {
let files = result.data.unwrap();
assert!(
files.len() <= 5,
"Count limit not respected in combined test"
);
let total_size: u64 = files.iter().map(|f| fs::metadata(f).unwrap().len()).sum();
assert!(
total_size <= 200,
"Byte limit not respected in combined test"
);
}
Ok(())
}
#[test]
#[should_panic(
expected = "Seriously? max_file_size < 100 bytes? What exactly do you expect to store in there?"
)]
fn test_rejects_tiny_max_file_size() {
let temp_dir = TempDir::new().unwrap();
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 50, };
let _store = DirectoryStore::new(config).unwrap();
}
#[test]
#[should_panic(
expected = "Seriously? max_file_size < 100 bytes? What exactly do you expect to store in there?"
)]
fn test_rejects_zero_max_file_size() {
let temp_dir = TempDir::new().unwrap();
let config = DirectoryConfig {
write_key: "test-key".to_string(),
storage_location: temp_dir.path().to_owned(),
base_filename: "events".to_string(),
max_file_size: 0, };
let _store = DirectoryStore::new(config).unwrap();
}
}