use crate::error::Result;
use crate::storage::Storage;
use serde::{Deserialize, Serialize};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use crate::data::Document;
use std::sync::atomic::{AtomicU64, Ordering};
pub type SeqNumber = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalEntry {
Upsert {
doc_id: u64,
external_id: String,
document: Document,
},
Delete { doc_id: u64 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRecord {
pub seq: SeqNumber,
pub entry: WalEntry,
}
#[derive(Debug)]
pub struct WalManager {
storage: Arc<dyn Storage>,
path: String,
writer: Mutex<Option<Box<dyn crate::storage::StorageOutput>>>,
next_seq: AtomicU64,
}
impl WalManager {
pub fn new(storage: Arc<dyn Storage>, path: &str) -> Result<Self> {
let manager = Self {
storage,
path: path.to_string(),
writer: Mutex::new(None),
next_seq: AtomicU64::new(1),
};
Ok(manager)
}
fn ensure_writer(&self) -> Result<()> {
let mut writer_guard = self.writer.lock().unwrap();
if writer_guard.is_none() {
let writer = self.storage.create_output_append(&self.path)?;
*writer_guard = Some(writer);
}
Ok(())
}
pub fn set_next_seq(&self, seq: SeqNumber) {
self.next_seq.store(seq, Ordering::SeqCst);
}
pub fn last_seq(&self) -> SeqNumber {
self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
}
pub fn append(&self, entry: &WalEntry) -> Result<SeqNumber> {
self.ensure_writer()?;
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let record = WalRecord {
seq,
entry: entry.clone(),
};
let bytes = serde_json::to_vec(&record)?;
let len = bytes.len() as u32;
let mut writer_guard = self.writer.lock().unwrap();
if let Some(writer) = writer_guard.as_mut() {
writer.write_all(&len.to_le_bytes())?;
writer.write_all(&bytes)?;
writer.flush_and_sync()?;
}
Ok(seq)
}
pub fn read_all(&self) -> Result<Vec<WalRecord>> {
if !self.storage.file_exists(&self.path) {
return Ok(Vec::new());
}
let mut reader = self.storage.open_input(&self.path)?;
let mut records = Vec::new();
let size = reader.size()?;
let mut position = 0;
let mut max_seq = 0;
while position < size {
let mut len_bytes = [0u8; 4];
if position + 4 > size {
break; }
reader.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as u64;
position += 4;
if position + len > size {
break; }
let mut buffer = vec![0u8; len as usize];
reader.read_exact(&mut buffer)?;
position += len;
let record: WalRecord = serde_json::from_slice(&buffer)?;
if record.seq > max_seq {
max_seq = record.seq;
}
records.push(record);
}
let current_next = self.next_seq.load(Ordering::SeqCst);
if max_seq >= current_next {
self.next_seq.store(max_seq + 1, Ordering::SeqCst);
}
Ok(records)
}
pub fn truncate(&self) -> Result<()> {
{
let mut writer_guard = self.writer.lock().unwrap();
*writer_guard = None;
}
let mut writer = self.storage.create_output(&self.path)?;
writer.flush_and_sync()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{DataValue, Document};
use crate::storage::memory::{MemoryStorage, MemoryStorageConfig};
#[test]
fn test_wal_append_read_truncate() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let wal = WalManager::new(storage.clone(), "test.wal").unwrap();
let doc = Document::builder()
.add_field("body", DataValue::Vector(vec![1.0, 2.0, 3.0]))
.build();
let seq1 = wal
.append(&WalEntry::Upsert {
doc_id: 1,
external_id: "ext_1".to_string(),
document: doc.clone(),
})
.unwrap();
assert_eq!(seq1, 1);
let seq2 = wal.append(&WalEntry::Delete { doc_id: 2 }).unwrap();
assert_eq!(seq2, 2);
let records = wal.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].seq, 1);
match &records[0].entry {
WalEntry::Upsert {
doc_id,
external_id,
document,
} => {
assert_eq!(*doc_id, 1);
assert_eq!(external_id, "ext_1");
assert_eq!(document.fields.len(), doc.fields.len());
}
_ => panic!("Expected Upsert"),
}
assert_eq!(records[1].seq, 2);
match &records[1].entry {
WalEntry::Delete { doc_id } => {
assert_eq!(*doc_id, 2);
}
_ => panic!("Expected Delete"),
}
wal.truncate().unwrap();
let records_after = wal.read_all().unwrap();
assert!(records_after.is_empty());
let seq3 = wal.append(&WalEntry::Delete { doc_id: 3 }).unwrap();
assert_eq!(seq3, 3);
}
}