1use crate::{Event, Offset, StorageError, StorageResult};
7use derive_more::{Deref, DerefMut, From, Into};
8use memmap2::{MmapMut, MmapOptions};
9use serde::{Deserialize, Serialize};
10use std::{
11 collections::HashMap,
12 fs::{File, OpenOptions},
13 path::{Path, PathBuf},
14 sync::Arc,
15};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
23pub struct SegmentId(pub Uuid);
24
25impl SegmentId {
26 pub fn new() -> Self {
28 Self(Uuid::new_v4())
29 }
30
31 pub fn from_uuid(uuid: Uuid) -> Self {
33 Self(uuid)
34 }
35}
36
37impl Default for SegmentId {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl std::fmt::Display for SegmentId {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "{}", self.0)
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SegmentHeader {
54 pub segment_id: SegmentId,
56 pub created_at: u64,
58 pub size: u64,
60 pub write_offset: u64,
62 pub entry_count: u64,
64 pub header_crc: u32,
66}
67
68impl SegmentHeader {
69 pub fn new(segment_id: SegmentId, size: u64) -> Self {
71 let created_at = std::time::SystemTime::now()
72 .duration_since(std::time::UNIX_EPOCH)
73 .unwrap_or_default()
74 .as_secs();
75
76 let mut header = Self {
77 segment_id,
78 created_at,
79 size,
80 write_offset: 256, entry_count: 0,
82 header_crc: 0,
83 };
84
85 header.header_crc = header.calculate_crc();
86 header
87 }
88
89 fn calculate_crc(&self) -> u32 {
91 let mut temp_header = self.clone();
92 temp_header.header_crc = 0;
93
94 let serialized = bincode::serialize(&temp_header).unwrap_or_default();
95 crc32fast::hash(&serialized)
96 }
97
98 pub fn verify_integrity(&self) -> StorageResult<()> {
100 let expected_crc = self.calculate_crc();
101 if self.header_crc != expected_crc {
102 return Err(StorageError::CrcMismatch {
103 offset: 0,
104 expected: expected_crc,
105 actual: self.header_crc,
106 });
107 }
108 Ok(())
109 }
110}
111
112#[derive(Debug)]
117pub struct Segment {
118 id: SegmentId,
120 file_path: PathBuf,
122 mmap: MmapMut,
124 header: SegmentHeader,
126 read_only: bool,
128}
129
130impl Segment {
131 pub fn create<P: AsRef<Path>>(file_path: P, size: u64) -> StorageResult<Self> {
133 let file_path = file_path.as_ref().to_path_buf();
134 let segment_id = SegmentId::new();
135
136 let file = OpenOptions::new()
138 .read(true)
139 .write(true)
140 .create(true)
141 .open(&file_path)?;
142
143 file.set_len(size)?;
144
145 let mut mmap = unsafe { MmapOptions::new().map_mut(&file)? };
147
148 let header = SegmentHeader::new(segment_id, size);
150 let header_bytes = bincode::serialize(&header)?;
151
152 if header_bytes.len() > mmap.len() {
153 return Err(StorageError::configuration("Header size exceeds segment size"));
154 }
155
156 let header_size = 256;
158 if header_bytes.len() > header_size {
159 return Err(StorageError::configuration("Header size exceeds reserved space"));
160 }
161
162 mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
164 for i in header_bytes.len()..header_size {
166 mmap[i] = 0;
167 }
168
169 Ok(Self {
170 id: segment_id,
171 file_path,
172 mmap,
173 header,
174 read_only: false,
175 })
176 }
177
178 pub fn open<P: AsRef<Path>>(file_path: P, read_only: bool) -> StorageResult<Self> {
180 let file_path = file_path.as_ref().to_path_buf();
181
182 let file = if read_only {
183 File::open(&file_path)?
184 } else {
185 OpenOptions::new()
186 .read(true)
187 .write(true)
188 .open(&file_path)?
189 };
190
191 let mmap = if read_only {
192 unsafe { MmapOptions::new().map(&file)?.make_mut()? }
193 } else {
194 unsafe { MmapOptions::new().map_mut(&file)? }
195 };
196
197 let header_size = 256;
199 if mmap.len() < header_size {
200 return Err(StorageError::invalid_format("Segment file too small for header"));
201 }
202
203 let mut actual_header_end = 0;
205 for i in 0..header_size {
206 if mmap[i] != 0 {
207 actual_header_end = i + 1;
208 }
209 }
210 if actual_header_end == 0 {
211 return Err(StorageError::invalid_format("No header data found"));
212 }
213
214 for end in (actual_header_end..header_size).rev() {
216 if let Ok(header) = bincode::deserialize::<SegmentHeader>(&mmap[..end]) {
217 if header.verify_integrity().is_ok() {
218 return Ok(Self {
219 id: header.segment_id,
220 file_path,
221 mmap,
222 header,
223 read_only,
224 });
225 }
226 }
227 }
228
229 return Err(StorageError::invalid_format("Failed to deserialize segment header"));
230 }
231
232 pub fn append_event(&mut self, event: &Event) -> StorageResult<Offset> {
234 if self.read_only {
235 return Err(StorageError::internal("Cannot write to read-only segment"));
236 }
237
238 let serialized_event = bincode::serialize(event)?;
239 let data_size = serialized_event.len() as u32;
240 let entry_size = std::mem::size_of::<u32>() + serialized_event.len() + std::mem::size_of::<u32>(); if self.header.write_offset + entry_size as u64 > self.header.size {
244 return Err(StorageError::InsufficientSpace {
245 required: entry_size as u64,
246 available: self.header.size - self.header.write_offset,
247 });
248 }
249
250 let write_offset = self.header.write_offset as usize;
251 let offset = Offset::new(self.header.entry_count);
252
253 let crc = crc32fast::hash(&serialized_event);
255
256 let size_bytes = data_size.to_le_bytes();
258 let size_end = write_offset + size_bytes.len();
259 self.mmap[write_offset..size_end].copy_from_slice(&size_bytes);
260
261 let data_end = size_end + serialized_event.len();
263 self.mmap[size_end..data_end].copy_from_slice(&serialized_event);
264
265 let crc_bytes = crc.to_le_bytes();
267 let crc_end = data_end + crc_bytes.len();
268 self.mmap[data_end..crc_end].copy_from_slice(&crc_bytes);
269
270 self.header.write_offset += entry_size as u64;
272 self.header.entry_count += 1;
273
274 self.header.header_crc = self.header.calculate_crc();
276 let header_bytes = bincode::serialize(&self.header)?;
277 let header_size = 256;
278 if header_bytes.len() > header_size {
279 return Err(StorageError::configuration("Header size exceeds reserved space"));
280 }
281 self.mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
282 for i in header_bytes.len()..header_size {
284 self.mmap[i] = 0;
285 }
286
287 Ok(offset)
288 }
289
290 pub fn read_event(&self, offset: Offset) -> StorageResult<Option<Event>> {
292 if offset.0 >= self.header.entry_count {
293 return Ok(None);
294 }
295
296 let mut current_position = 256; let mut current_entry_index = 0u64;
299
300 while current_entry_index <= offset.0 && current_position < self.header.write_offset as usize {
302 if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
304 break;
305 }
306
307 let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
308 let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
309
310 let data_start = current_position + std::mem::size_of::<u32>();
311 let data_end = data_start + data_size;
312 let crc_end = data_end + std::mem::size_of::<u32>();
313
314 if crc_end > self.mmap.len() {
316 return Err(StorageError::invalid_format("Entry extends beyond segment"));
317 }
318
319 if current_entry_index == offset.0 {
320 let event_data = &self.mmap[data_start..data_end];
322 let stored_crc_bytes = &self.mmap[data_end..crc_end];
323 let stored_crc = u32::from_le_bytes([
324 stored_crc_bytes[0], stored_crc_bytes[1],
325 stored_crc_bytes[2], stored_crc_bytes[3]
326 ]);
327
328 let calculated_crc = crc32fast::hash(event_data);
330 if stored_crc != calculated_crc {
331 return Err(StorageError::CrcMismatch {
332 offset: current_position as u64,
333 expected: calculated_crc,
334 actual: stored_crc,
335 });
336 }
337
338 match bincode::deserialize::<Event>(event_data) {
340 Ok(event) => return Ok(Some(event)),
341 Err(_) => {
342 return Err(StorageError::invalid_format("Failed to deserialize event data"));
343 }
344 }
345 }
346
347 current_position = crc_end;
349 current_entry_index += 1;
350 }
351
352 Ok(None)
353 }
354
355 pub fn read_events_range(&self, start_offset: Offset, max_events: usize) -> StorageResult<Vec<Event>> {
357 let mut events = Vec::with_capacity(max_events.min(self.header.entry_count as usize));
358
359 let mut current_position = 256; let mut current_entry_index = 0u64;
361 let end_offset = (start_offset.0 + max_events as u64).min(self.header.entry_count);
362
363 while current_entry_index < start_offset.0 && current_position < self.header.write_offset as usize {
365 if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
367 break;
368 }
369
370 let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
371 let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
372
373 let entry_size = std::mem::size_of::<u32>() + data_size + std::mem::size_of::<u32>(); current_position += entry_size;
375 current_entry_index += 1;
376 }
377
378 while current_entry_index < end_offset && current_position < self.header.write_offset as usize && events.len() < max_events {
380 if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
382 break;
383 }
384
385 let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
386 let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
387
388 let data_start = current_position + std::mem::size_of::<u32>();
389 let data_end = data_start + data_size;
390 let crc_end = data_end + std::mem::size_of::<u32>();
391
392 if crc_end > self.mmap.len() {
394 return Err(StorageError::invalid_format("Entry extends beyond segment"));
395 }
396
397 let event_data = &self.mmap[data_start..data_end];
399 let stored_crc_bytes = &self.mmap[data_end..crc_end];
400 let stored_crc = u32::from_le_bytes([
401 stored_crc_bytes[0], stored_crc_bytes[1],
402 stored_crc_bytes[2], stored_crc_bytes[3]
403 ]);
404
405 let calculated_crc = crc32fast::hash(event_data);
407 if stored_crc != calculated_crc {
408 return Err(StorageError::CrcMismatch {
409 offset: current_position as u64,
410 expected: calculated_crc,
411 actual: stored_crc,
412 });
413 }
414
415 match bincode::deserialize::<Event>(event_data) {
417 Ok(event) => {
418 events.push(event);
419 current_position = crc_end;
420 current_entry_index += 1;
421 }
422 Err(_) => {
423 return Err(StorageError::invalid_format("Failed to deserialize event data during read"));
424 }
425 }
426 }
427
428 Ok(events)
429 }
430
431 pub fn flush(&mut self) -> StorageResult<()> {
433 if !self.read_only {
434 self.mmap.flush()?;
435 }
436 Ok(())
437 }
438
439 pub fn stats(&self) -> SegmentStats {
441 SegmentStats {
442 id: self.id,
443 file_path: self.file_path.clone(),
444 total_size: self.header.size,
445 used_size: self.header.write_offset,
446 entry_count: self.header.entry_count,
447 read_only: self.read_only,
448 }
449 }
450
451 pub fn append_batch_data(&mut self, batch_data: &[u8], event_sizes: &[usize]) -> StorageResult<Offset> {
463 if self.read_only {
464 return Err(StorageError::invalid_format("Cannot write to read-only segment"));
465 }
466
467 let start_entry_index = self.header.entry_count;
468 let current_position = self.header.write_offset as usize;
469
470 let required_space = batch_data.len() + (event_sizes.len() * 8); if current_position + required_space > self.mmap.len() {
473 return Err(StorageError::InsufficientSpace {
474 required: required_space as u64,
475 available: (self.mmap.len() - current_position) as u64,
476 });
477 }
478
479 let mut write_position = current_position;
480 let mut data_offset = 0;
481
482 for &event_size in event_sizes {
484 let size_bytes = (event_size as u32).to_le_bytes();
486 self.mmap[write_position..write_position + 4].copy_from_slice(&size_bytes);
487 write_position += 4;
488
489 let event_data = &batch_data[data_offset..data_offset + event_size];
491 self.mmap[write_position..write_position + event_size].copy_from_slice(event_data);
492 write_position += event_size;
493
494 let crc = crc32fast::hash(event_data);
496 let crc_bytes = crc.to_le_bytes();
497 self.mmap[write_position..write_position + 4].copy_from_slice(&crc_bytes);
498 write_position += 4;
499
500 data_offset += event_size;
501 }
502
503 self.header.write_offset = write_position as u64;
505 self.header.entry_count += event_sizes.len() as u64;
506
507 let header_bytes = bincode::serialize(&self.header)?;
509 let header_size = 256;
510 if header_bytes.len() > header_size {
511 return Err(StorageError::configuration("Header size exceeds reserved space"));
512 }
513 self.mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
514 for i in header_bytes.len()..header_size {
515 self.mmap[i] = 0;
516 }
517
518 tracing::debug!(
519 "Batch written: {} events, {} bytes, start_offset={}",
520 event_sizes.len(),
521 batch_data.len(),
522 start_entry_index
523 );
524
525 Ok(Offset::new(start_entry_index))
526 }
527
528
529}
530
531#[derive(Debug, Clone)]
533pub struct SegmentStats {
534 pub id: SegmentId,
536 pub file_path: PathBuf,
538 pub total_size: u64,
540 pub used_size: u64,
542 pub entry_count: u64,
544 pub read_only: bool,
546}
547
548#[derive(Debug)]
552pub struct SegmentManager {
553 data_dir: PathBuf,
555 segments: RwLock<HashMap<SegmentId, Arc<RwLock<Segment>>>>,
557 current_segment: RwLock<Option<SegmentId>>,
559 segment_size: u64,
561}
562
563impl SegmentManager {
564 pub async fn new<P: AsRef<Path>>(data_dir: P) -> StorageResult<Self> {
566 let data_dir = data_dir.as_ref().to_path_buf();
567 let segment_size = 1024 * 1024 * 1024; let manager = Self {
570 data_dir,
571 segments: RwLock::new(HashMap::new()),
572 current_segment: RwLock::new(None),
573 segment_size,
574 };
575
576 manager.load_existing_segments().await?;
578
579 Ok(manager)
580 }
581
582 async fn load_existing_segments(&self) -> StorageResult<()> {
584 if !self.data_dir.exists() {
585 tokio::fs::create_dir_all(&self.data_dir).await?;
586 return Ok(());
587 }
588
589 let mut segments = self.segments.write().await;
590 let mut entries = tokio::fs::read_dir(&self.data_dir).await?;
591
592 while let Some(entry) = entries.next_entry().await? {
593 let path = entry.path();
594 if path.extension().and_then(|s| s.to_str()) == Some("seg") {
595 match Segment::open(&path, false) {
596 Ok(segment) => {
597 let segment_id = segment.id;
598 segments.insert(segment_id, Arc::new(RwLock::new(segment)));
599 }
600 Err(e) => {
601 tracing::warn!("Failed to load segment {:?}: {}", path, e);
602 }
603 }
604 }
605 }
606
607 Ok(())
608 }
609
610 pub async fn create_segment(&self) -> StorageResult<SegmentId> {
612 let segment_id = SegmentId::new();
613 let file_path = self.data_dir.join(format!("{}.seg", segment_id));
614
615 let segment = Segment::create(file_path, self.segment_size)?;
616 let segment_id = segment.id;
617
618 let mut segments = self.segments.write().await;
619 segments.insert(segment_id, Arc::new(RwLock::new(segment)));
620
621 Ok(segment_id)
622 }
623
624 pub async fn get_segment(&self, segment_id: SegmentId) -> Option<Arc<RwLock<Segment>>> {
626 let segments = self.segments.read().await;
627 segments.get(&segment_id).cloned()
628 }
629
630 pub async fn get_all_stats(&self) -> Vec<SegmentStats> {
632 let segments = self.segments.read().await;
633 let mut stats = Vec::new();
634
635 for segment_arc in segments.values() {
636 if let Ok(segment) = segment_arc.try_read() {
637 stats.push(segment.stats());
638 }
639 }
640
641 stats
642 }
643
644 pub async fn list_segments(&self) -> Vec<SegmentId> {
649 let segments = self.segments.read().await;
650 segments.keys().copied().collect()
651 }
652
653 pub async fn get_segments_stats(&self) -> Vec<SegmentStats> {
658 let segments = self.segments.read().await;
659 let mut stats = Vec::new();
660
661 for segment_arc in segments.values() {
662 let segment = segment_arc.read().await;
663 stats.push(segment.stats());
664 }
665
666 stats
667 }
668
669 pub async fn process_segments_parallel<T, F, Fut>(
677 &self,
678 processor: F,
679 ) -> StorageResult<Vec<T>>
680 where
681 T: Send + 'static,
682 F: Fn(SegmentId, Arc<RwLock<Segment>>) -> Fut + Send + Sync + 'static,
683 Fut: std::future::Future<Output = StorageResult<T>> + Send,
684 {
685 let segments = self.segments.read().await;
686 let processor = Arc::new(processor);
687
688 let mut handles = Vec::new();
689
690 for (&segment_id, segment_arc) in segments.iter() {
691 let processor = Arc::clone(&processor);
692 let segment_arc = Arc::clone(segment_arc);
693
694 let handle = tokio::spawn(async move {
695 processor(segment_id, segment_arc).await
696 });
697
698 handles.push(handle);
699 }
700
701 let mut results = Vec::new();
702 for handle in handles {
703 match handle.await {
704 Ok(result) => results.push(result?),
705 Err(e) => return Err(StorageError::internal(format!("Segment processing failed: {}", e))),
706 }
707 }
708
709 Ok(results)
710 }
711}