pub mod rdf_store_mod;
mod simple;
mod store;
#[cfg(test)]
mod simple_tests;
#[cfg(test)]
mod tests;
pub use rdf_store_mod as rdf_store;
pub use simple::{
EventStreamIter, ProjectionRunner, SimpleEvent, SimpleEventBus, SimpleEventHandler,
SimpleEventStore, SimpleSnapshot, SimpleSnapshotStore,
};
pub use store::{EventIndexes, EventMetadataAccessor, EventStore, PersistenceManager};
use crate::StreamEvent;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventStoreConfig {
pub max_memory_events: usize,
pub enable_persistence: bool,
pub persistence_backend: PersistenceBackend,
pub snapshot_config: SnapshotConfig,
pub retention_policy: RetentionPolicy,
pub indexing_config: IndexingConfig,
pub enable_compression: bool,
pub persistence_batch_size: usize,
}
impl Default for EventStoreConfig {
fn default() -> Self {
Self {
max_memory_events: 1_000_000,
enable_persistence: true,
persistence_backend: PersistenceBackend::FileSystem {
base_path: "/tmp/oxirs-event-store".to_string(),
},
snapshot_config: SnapshotConfig::default(),
retention_policy: RetentionPolicy::default(),
indexing_config: IndexingConfig::default(),
enable_compression: true,
persistence_batch_size: 1000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PersistenceBackend {
FileSystem { base_path: String },
Database { connection_string: String },
ObjectStorage {
endpoint: String,
bucket: String,
access_key: String,
secret_key: String,
},
Memory,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotConfig {
pub enable_snapshots: bool,
pub snapshot_interval: usize,
pub max_snapshots: usize,
pub compress_snapshots: bool,
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self {
enable_snapshots: true,
snapshot_interval: 10000,
max_snapshots: 10,
compress_snapshots: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicy {
pub max_age: Option<ChronoDuration>,
pub max_events: Option<u64>,
pub enable_archiving: bool,
pub archive_backend: Option<PersistenceBackend>,
}
impl Default for RetentionPolicy {
fn default() -> Self {
Self {
max_age: Some(ChronoDuration::days(365)), max_events: Some(10_000_000), enable_archiving: true,
archive_backend: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexingConfig {
pub index_by_event_type: bool,
pub index_by_timestamp: bool,
pub index_by_source: bool,
pub custom_indexes: Vec<CustomIndex>,
}
impl Default for IndexingConfig {
fn default() -> Self {
Self {
index_by_event_type: true,
index_by_timestamp: true,
index_by_source: true,
custom_indexes: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomIndex {
pub name: String,
pub field_path: String,
pub index_type: IndexType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexType {
Hash,
BTree,
FullText,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEvent {
pub event_id: Uuid,
pub sequence_number: u64,
pub stream_id: String,
pub stream_version: u64,
pub event_data: StreamEvent,
pub stored_at: DateTime<Utc>,
pub storage_metadata: StorageMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageMetadata {
pub checksum: String,
pub compressed_size: Option<usize>,
pub original_size: usize,
pub storage_location: String,
pub persistence_status: PersistenceStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PersistenceStatus {
InMemory,
Persisted,
Archived,
Failed { error: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventSnapshot {
pub snapshot_id: Uuid,
pub stream_id: String,
pub stream_version: u64,
pub sequence_number: u64,
pub created_at: DateTime<Utc>,
pub state_data: Vec<u8>,
pub metadata: SnapshotMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotMetadata {
pub compression: Option<String>,
pub original_size: usize,
pub compressed_size: usize,
pub checksum: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventQuery {
pub stream_id: Option<String>,
pub event_types: Option<Vec<String>>,
pub time_range: Option<TimeRange>,
pub sequence_range: Option<SequenceRange>,
pub source: Option<String>,
pub custom_filters: HashMap<String, String>,
pub limit: Option<usize>,
pub order: QueryOrder,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SequenceRange {
pub start: u64,
pub end: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryOrder {
SequenceAsc,
SequenceDesc,
TimestampAsc,
TimestampDesc,
}
#[derive(Debug, Default)]
pub struct EventSourcingStats {
pub total_events_stored: AtomicU64,
pub total_events_retrieved: AtomicU64,
pub snapshots_created: AtomicU64,
pub events_archived: AtomicU64,
pub persistence_operations: AtomicU64,
pub failed_operations: AtomicU64,
pub memory_usage_bytes: AtomicU64,
pub disk_usage_bytes: AtomicU64,
pub average_store_latency_ms: AtomicU64,
pub average_retrieve_latency_ms: AtomicU64,
}
#[async_trait::async_trait]
pub trait EventStoreTrait: Send + Sync {
async fn store_event(
&self,
stream_id: String,
event: StreamEvent,
) -> anyhow::Result<StoredEvent>;
async fn query_events(&self, query: EventQuery) -> anyhow::Result<Vec<StoredEvent>>;
async fn get_stream_events(
&self,
stream_id: &str,
from_version: Option<u64>,
) -> anyhow::Result<Vec<StoredEvent>>;
async fn replay_from_timestamp(
&self,
timestamp: DateTime<Utc>,
) -> anyhow::Result<Vec<StoredEvent>>;
async fn get_latest_snapshot(&self, stream_id: &str) -> anyhow::Result<Option<EventSnapshot>>;
async fn rebuild_stream_state(&self, stream_id: &str) -> anyhow::Result<Vec<u8>>;
async fn append_events(
&self,
aggregate_id: &str,
events: &[StreamEvent],
expected_version: Option<u64>,
) -> anyhow::Result<u64>;
}
#[async_trait::async_trait]
pub trait EventStream: Send + Sync {
async fn next_event(&mut self) -> Option<StoredEvent>;
async fn has_events(&self) -> bool;
async fn read_events_from_position(
&self,
position: u64,
max_events: usize,
) -> anyhow::Result<Vec<StoredEvent>>;
}
#[async_trait::async_trait]
pub trait SnapshotStore: Send + Sync {
async fn store_snapshot(&self, snapshot: EventSnapshot) -> anyhow::Result<()>;
async fn get_snapshot(
&self,
stream_id: &str,
version: Option<u64>,
) -> anyhow::Result<Option<EventSnapshot>>;
async fn list_snapshots(&self, stream_id: &str) -> anyhow::Result<Vec<EventSnapshot>>;
}
#[derive(Debug, Clone)]
pub enum PersistenceOperation {
StoreEvent(Box<StoredEvent>),
StoreSnapshot(EventSnapshot),
ArchiveEvents(Vec<StoredEvent>),
DeleteEvents(Vec<u64>),
}
#[derive(Debug, Default)]
pub struct PersistenceStats {
pub operations_queued: AtomicU64,
pub operations_completed: AtomicU64,
pub operations_failed: AtomicU64,
pub bytes_written: AtomicU64,
pub bytes_read: AtomicU64,
}