1use derive_more::{Deref, DerefMut, From, Into};
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15use tokio::sync::RwLock;
16use std::sync::Arc;
17
18pub mod segment;
19pub mod wal;
20pub mod event;
21pub mod error;
22pub mod memory;
23
24pub mod ultra_performance;
26
27pub mod gpu_acceleration;
29
30pub mod security;
32
33pub use error::{StorageError, StorageResult};
34pub use event::{Event, EventId, EventData};
35pub use segment::{Segment, SegmentId, SegmentManager};
36pub use wal::{WriteAheadLog, WalConfig};
37pub use memory::{MemoryPool, PooledBuffer};
38
39pub use ultra_performance::{
41 UltraPerformanceEngine,
42 LockFreeRingBuffer,
43 SIMDBatchProcessor,
44 PerformanceMetrics,
45 NUMATopology,
46};
47
48pub use gpu_acceleration::{
50 GPUAcceleratedEngine,
51 GPUAccelerationType,
52 GPUContext,
53 GPUMetrics,
54 HybridConfig,
55 DistributionStrategy,
56};
57
58pub use security::{
60 SecurityManager,
61 SecurityConfig,
62 SecurityMetrics,
63 SecurityError,
64 RateLimiter,
65 ResourceMonitor,
66 GPUTimeoutManager,
67};
68
69#[derive(Debug)]
85pub struct StorageEngine {
86 wal: RwLock<WriteAheadLog>,
88 segment_manager: RwLock<SegmentManager>,
90 memory_pool: Arc<MemoryPool>,
92 config: StorageConfig,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct StorageConfig {
106 pub data_dir: PathBuf,
108 pub segment_size: u64,
110 pub sync_interval_ms: u64,
112 pub enable_compression: bool,
114 pub memory_pool_size: usize,
116 pub batch_size: usize,
118 pub worker_count: Option<usize>,
120}
121
122impl Default for StorageConfig {
123 fn default() -> Self {
124 Self {
125 data_dir: PathBuf::from("./data"),
126 segment_size: 1024 * 1024 * 1024, sync_interval_ms: 1000,
128 enable_compression: false,
129 memory_pool_size: 128 * 1024 * 1024, batch_size: 1000,
131 worker_count: None, }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
138pub struct Offset(pub u64);
139
140impl Offset {
141 pub fn new(value: u64) -> Self {
143 Self(value)
144 }
145
146 pub fn next(self) -> Self {
148 Self(self.0 + 1)
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
154pub struct Partition(pub u32);
155
156impl Partition {
157 pub fn new(value: u32) -> Self {
159 Self(value)
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
165pub struct Topic(pub String);
166
167impl Topic {
168 pub fn new(name: impl Into<String>) -> Self {
170 Self(name.into())
171 }
172}
173
174impl StorageEngine {
175 pub async fn new(config: StorageConfig) -> StorageResult<Self> {
188 tokio::fs::create_dir_all(&config.data_dir).await?;
190
191 let memory_pool = Arc::new(MemoryPool::new(
193 config.memory_pool_size,
194 config.batch_size,
195 )?);
196
197 let wal_config = WalConfig {
198 data_dir: config.data_dir.clone(),
199 segment_size: config.segment_size,
200 sync_interval_ms: config.sync_interval_ms,
201 };
202
203 let wal = WriteAheadLog::new(wal_config).await?;
204 let segment_manager = SegmentManager::new(config.data_dir.clone()).await?;
205
206 Ok(Self {
207 wal: RwLock::new(wal),
208 segment_manager: RwLock::new(segment_manager),
209 memory_pool,
210 config,
211 })
212 }
213
214 pub async fn append_event(
229 &self,
230 topic: &Topic,
231 partition: Partition,
232 event_data: EventData,
233 ) -> StorageResult<(EventId, Offset)> {
234 let event_id = EventId::new();
235 let event = Event::new(event_id, topic.clone(), partition, event_data);
236
237 let mut wal = self.wal.write().await;
238 let offset = wal.append(&event).await?;
239
240 Ok((event_id, offset))
241 }
242
243 pub async fn read_events(
259 &self,
260 topic: &Topic,
261 partition: Partition,
262 start_offset: Offset,
263 max_events: usize,
264 ) -> StorageResult<Vec<Event>> {
265 let wal = self.wal.read().await;
266 wal.read_events(topic, partition, start_offset, max_events).await
267 }
268
269 pub async fn get_latest_offset(
278 &self,
279 topic: &Topic,
280 partition: Partition,
281 ) -> StorageResult<Option<Offset>> {
282 let wal = self.wal.read().await;
283 wal.get_latest_offset(topic, partition).await
284 }
285
286 pub async fn append_events_batch(
297 &self,
298 events: Vec<(Topic, Partition, EventData)>,
299 ) -> StorageResult<Vec<(EventId, Offset)>> {
300 if events.is_empty() {
301 return Ok(Vec::new());
302 }
303
304 let mut pooled_buffer = self.memory_pool.get_buffer().await?;
306
307 let mut batch_events = Vec::with_capacity(events.len());
309 let mut result_ids = Vec::with_capacity(events.len());
310
311 for (topic, partition, event_data) in events {
312 let event_id = EventId::new();
313 let event = Event::new(event_id, topic, partition, event_data);
314
315 batch_events.push(event);
316 result_ids.push(event_id);
317 }
318
319 let mut wal = self.wal.write().await;
321 let offsets = wal.append_batch(&batch_events, &mut pooled_buffer).await?;
322
323 let results: Vec<(EventId, Offset)> = result_ids.into_iter()
325 .zip(offsets.into_iter())
326 .collect();
327
328 Ok(results)
329 }
330
331 pub fn memory_pool_stats(&self) -> memory::PoolStats {
333 self.memory_pool.stats()
334 }
335
336 pub async fn shutdown(&self) -> StorageResult<()> {
343 let wal = self.wal.write().await;
344 wal.sync().await?;
345
346 tracing::info!("Storage engine shutdown completed");
347 Ok(())
348 }
349}