use std::io::{Read, Write};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use crate::data::Document;
use crate::error::Result;
use crate::storage::Storage;
use crate::store::document::UnifiedDocumentStore;
pub type SeqNumber = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogEntry {
Upsert {
doc_id: u64,
external_id: String,
document: Document,
},
Delete {
doc_id: u64,
#[serde(default)]
external_id: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogRecord {
pub seq: SeqNumber,
pub entry: LogEntry,
}
#[derive(Debug)]
pub struct DocumentLog {
wal_storage: Arc<dyn Storage>,
wal_path: String,
next_doc_id: AtomicU64,
wal_writer: Mutex<Option<Box<dyn crate::storage::StorageOutput>>>,
next_seq: AtomicU64,
doc_store: RwLock<UnifiedDocumentStore>,
}
impl DocumentLog {
pub fn new(
wal_storage: Arc<dyn Storage>,
wal_path: &str,
doc_store_storage: Arc<dyn Storage>,
) -> Result<Self> {
let doc_store = UnifiedDocumentStore::open(doc_store_storage)?;
Ok(Self {
wal_storage,
wal_path: wal_path.to_string(),
next_doc_id: AtomicU64::new(1),
wal_writer: Mutex::new(None),
next_seq: AtomicU64::new(1),
doc_store: RwLock::new(doc_store),
})
}
fn ensure_writer(&self) -> Result<()> {
let mut writer_guard = self.wal_writer.lock();
if writer_guard.is_none() {
let writer = self.wal_storage.create_output_append(&self.wal_path)?;
*writer_guard = Some(writer);
}
Ok(())
}
pub fn append(&self, external_id: &str, doc: Document) -> Result<(u64, SeqNumber)> {
self.ensure_writer()?;
let mut writer_guard = self.wal_writer.lock();
let doc_id = self.next_doc_id.fetch_add(1, Ordering::SeqCst);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let record = LogRecord {
seq,
entry: LogEntry::Upsert {
doc_id,
external_id: external_id.to_string(),
document: doc,
},
};
Self::write_record(&mut writer_guard, &record)?;
Ok((doc_id, seq))
}
pub fn append_delete(&self, doc_id: u64, external_id: &str) -> Result<SeqNumber> {
self.ensure_writer()?;
let mut writer_guard = self.wal_writer.lock();
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let record = LogRecord {
seq,
entry: LogEntry::Delete {
doc_id,
external_id: external_id.to_string(),
},
};
Self::write_record(&mut writer_guard, &record)?;
Ok(seq)
}
fn write_record(
writer_guard: &mut Option<Box<dyn crate::storage::StorageOutput>>,
record: &LogRecord,
) -> Result<()> {
let bytes = serde_json::to_vec(record)?;
let len: u32 = bytes.len().try_into().map_err(|_| {
crate::error::LaurusError::InvalidOperation(format!(
"WAL record size {} exceeds u32::MAX",
bytes.len()
))
})?;
if let Some(writer) = writer_guard.as_mut() {
writer.write_all(&len.to_le_bytes())?;
writer.write_all(&bytes)?;
writer.flush_and_sync()?;
}
Ok(())
}
pub fn read_all(&self) -> Result<Vec<LogRecord>> {
if !self.wal_storage.file_exists(&self.wal_path) {
let store_next = self.doc_store.read().next_doc_id();
self.set_next_doc_id(store_next);
return Ok(Vec::new());
}
let mut reader = self.wal_storage.open_input(&self.wal_path)?;
let mut records = Vec::new();
let size = reader.size()?;
let mut position = 0;
let mut max_seq: u64 = 0;
let mut max_doc_id: u64 = 0;
while position < size {
let mut len_bytes = [0u8; 4];
if position + 4 > size {
break;
}
reader.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as u64;
position += 4;
if position + len > size {
break;
}
let mut buffer = vec![0u8; len as usize];
reader.read_exact(&mut buffer)?;
position += len;
let record: LogRecord = serde_json::from_slice(&buffer)?;
if record.seq > max_seq {
max_seq = record.seq;
}
if let LogEntry::Upsert { doc_id, .. } = &record.entry
&& *doc_id > max_doc_id
{
max_doc_id = *doc_id;
}
records.push(record);
}
let current_next_seq = self.next_seq.load(Ordering::SeqCst);
if max_seq >= current_next_seq {
self.next_seq.store(max_seq + 1, Ordering::SeqCst);
}
let current_next_doc = self.next_doc_id.load(Ordering::SeqCst);
if max_doc_id >= current_next_doc {
self.next_doc_id.store(max_doc_id + 1, Ordering::SeqCst);
}
let store_next = self.doc_store.read().next_doc_id();
self.set_next_doc_id(store_next);
Ok(records)
}
pub fn truncate(&self) -> Result<()> {
{
let mut writer_guard = self.wal_writer.lock();
*writer_guard = None;
}
let mut writer = self.wal_storage.create_output(&self.wal_path)?;
writer.flush_and_sync()?;
writer.close()?;
self.wal_storage.sync()?;
Ok(())
}
pub fn last_seq(&self) -> SeqNumber {
self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
}
pub fn next_doc_id(&self) -> u64 {
self.next_doc_id.load(Ordering::SeqCst)
}
pub fn set_next_doc_id(&self, id: u64) {
let current = self.next_doc_id.load(Ordering::SeqCst);
if id > current {
self.next_doc_id.store(id, Ordering::SeqCst);
}
}
pub fn store_document(&self, doc_id: u64, doc: Document) {
self.doc_store.write().put_document_with_id(doc_id, doc);
}
pub fn get_document(&self, doc_id: u64) -> Result<Option<Document>> {
self.doc_store.read().get_document(doc_id)
}
pub fn get_documents_batch(
&self,
doc_ids: &[u64],
) -> Result<std::collections::HashMap<u64, Document>> {
self.doc_store.read().get_documents_batch(doc_ids)
}
pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<u64>> {
self.doc_store.read().find_by_external_id(external_id)
}
pub fn find_all_by_external_id(&self, external_id: &str) -> Result<Vec<u64>> {
self.doc_store.read().find_all_by_external_id(external_id)
}
pub fn commit_documents(&self) -> Result<()> {
self.doc_store.write().commit()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{DataValue, Document};
use crate::storage::memory::{MemoryStorage, MemoryStorageConfig};
fn make_storage() -> Arc<dyn Storage> {
Arc::new(MemoryStorage::new(MemoryStorageConfig::default()))
}
fn make_log() -> DocumentLog {
let wal_storage = make_storage();
let doc_storage = make_storage();
DocumentLog::new(wal_storage, "test.log", doc_storage).unwrap()
}
#[test]
fn test_append_and_read() {
let log = make_log();
let doc = Document::builder()
.add_field("body", DataValue::Text("hello".to_string()))
.build();
let (doc_id, seq1) = log.append("ext_1", doc.clone()).unwrap();
assert_eq!(doc_id, 1);
assert_eq!(seq1, 1);
let seq2 = log.append_delete(doc_id, "ext_1").unwrap();
assert_eq!(seq2, 2);
let records = log.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].seq, 1);
match &records[0].entry {
LogEntry::Upsert {
doc_id,
external_id,
..
} => {
assert_eq!(*doc_id, 1);
assert_eq!(external_id, "ext_1");
}
_ => panic!("Expected Upsert"),
}
assert_eq!(records[1].seq, 2);
match &records[1].entry {
LogEntry::Delete {
doc_id,
external_id,
} => {
assert_eq!(*doc_id, 1);
assert_eq!(external_id, "ext_1");
}
_ => panic!("Expected Delete"),
}
}
#[test]
fn test_truncate() {
let log = make_log();
let doc = Document::builder()
.add_field("body", DataValue::Text("hello".to_string()))
.build();
log.append("ext_1", doc).unwrap();
log.truncate().unwrap();
let records = log.read_all().unwrap();
assert!(records.is_empty());
let doc2 = Document::builder()
.add_field("body", DataValue::Text("world".to_string()))
.build();
let (doc_id, seq) = log.append("ext_2", doc2).unwrap();
assert_eq!(doc_id, 2);
assert_eq!(seq, 2);
}
#[test]
fn test_doc_id_recovery() {
let wal_storage = make_storage();
let doc_storage = make_storage();
{
let log =
DocumentLog::new(wal_storage.clone(), "test.log", doc_storage.clone()).unwrap();
let doc = Document::builder()
.add_field("body", DataValue::Text("hello".to_string()))
.build();
log.append("ext_1", doc.clone()).unwrap();
log.append("ext_2", doc).unwrap();
}
{
let log =
DocumentLog::new(wal_storage.clone(), "test.log", doc_storage.clone()).unwrap();
let records = log.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(log.next_doc_id(), 3);
let doc = Document::builder()
.add_field("body", DataValue::Text("world".to_string()))
.build();
let (doc_id, seq) = log.append("ext_3", doc).unwrap();
assert_eq!(doc_id, 3);
assert_eq!(seq, 3);
}
}
#[test]
fn test_set_next_doc_id() {
let log = make_log();
log.set_next_doc_id(100);
assert_eq!(log.next_doc_id(), 100);
log.set_next_doc_id(50);
assert_eq!(log.next_doc_id(), 100);
let doc = Document::builder()
.add_field("body", DataValue::Text("hello".to_string()))
.build();
let (doc_id, _) = log.append("ext_1", doc).unwrap();
assert_eq!(doc_id, 100);
}
#[test]
fn test_store_and_get_document() {
let log = make_log();
let doc = Document::builder()
.add_field("body", DataValue::Text("hello world".to_string()))
.build();
log.store_document(1, doc.clone());
let retrieved = log.get_document(1).unwrap();
assert!(retrieved.is_some());
assert_eq!(
retrieved.unwrap().fields.get("body"),
doc.fields.get("body")
);
log.commit_documents().unwrap();
let retrieved = log.get_document(1).unwrap();
assert!(retrieved.is_some());
}
}