use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use crate::data::Document;
use crate::error::{LaurusError, Result};
use crate::storage::Storage;
use crate::storage::structured::{StructReader, StructWriter};
const DEFAULT_DOC_CACHE_CAPACITY: usize = 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentSegment {
pub id: u32,
pub start_doc_id: u64,
pub end_doc_id: u64,
pub doc_count: usize,
}
impl DocumentSegment {
pub fn file_name(&self) -> String {
format!("doc_segment_{:06}.docs", self.id)
}
pub fn contains(&self, doc_id: u64) -> bool {
doc_id >= self.start_doc_id && doc_id <= self.end_doc_id
}
}
#[derive(Debug)]
pub struct DocumentSegmentWriter {
storage: Arc<dyn Storage>,
}
impl DocumentSegmentWriter {
pub fn new(storage: Arc<dyn Storage>) -> Self {
Self { storage }
}
pub fn write_segment(
&self,
segment_id: u32,
docs: &HashMap<u64, Document>,
) -> Result<DocumentSegment> {
if docs.is_empty() {
return Err(LaurusError::invalid_argument(
"cannot write empty document segment",
));
}
let mut sorted_ids: Vec<_> = docs.keys().cloned().collect();
sorted_ids.sort();
let start_doc_id = *sorted_ids.first().unwrap();
let end_doc_id = *sorted_ids.last().unwrap();
let doc_count = docs.len();
let segment = DocumentSegment {
id: segment_id,
start_doc_id,
end_doc_id,
doc_count,
};
let file_name = segment.file_name();
let output = self.storage.create_output(&file_name)?;
let mut writer = StructWriter::new(output);
let doc_count_u32: u32 = doc_count.try_into().map_err(|_| {
LaurusError::InvalidOperation(format!("document count {doc_count} exceeds u32::MAX"))
})?;
writer.write_u32(doc_count_u32)?;
for id in sorted_ids {
let doc = docs.get(&id).unwrap();
let json = serde_json::to_vec(doc)
.map_err(|e| LaurusError::index(format!("failed to serialize document: {e}")))?;
writer.write_u64(id)?;
writer.write_bytes(&json)?;
}
writer.close()?;
Ok(segment)
}
}
#[derive(Debug)]
pub struct DocumentSegmentReader {
storage: Arc<dyn Storage>,
segment: DocumentSegment,
offsets: HashMap<u64, u64>,
}
impl DocumentSegmentReader {
pub fn new(storage: Arc<dyn Storage>, segment: DocumentSegment) -> Self {
Self {
storage,
segment,
offsets: HashMap::new(),
}
}
pub fn with_index(storage: Arc<dyn Storage>, segment: DocumentSegment) -> Result<Self> {
let offsets = Self::build_index(&*storage, &segment)?;
Ok(Self {
storage,
segment,
offsets,
})
}
fn build_index(storage: &dyn Storage, segment: &DocumentSegment) -> Result<HashMap<u64, u64>> {
let input = storage.open_input(&segment.file_name())?;
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_u32()?;
let mut offsets = HashMap::with_capacity(doc_count as usize);
for _ in 0..doc_count {
let offset = reader.stream_position()?;
let doc_id = reader.read_u64()?;
let _json = reader.read_bytes()?;
offsets.insert(doc_id, offset);
}
Ok(offsets)
}
pub fn get_document(&self, doc_id: u64) -> Result<Option<Document>> {
if !self.segment.contains(doc_id) {
return Ok(None);
}
if let Some(&offset) = self.offsets.get(&doc_id) {
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
reader.seek(std::io::SeekFrom::Start(offset))?;
let current_id = reader.read_u64()?;
debug_assert_eq!(current_id, doc_id);
let json = reader.read_bytes()?;
let doc: Document = serde_json::from_slice(&json)
.map_err(|e| LaurusError::index(format!("failed to deserialize document: {e}")))?;
return Ok(Some(doc));
}
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_u32()?;
for _ in 0..doc_count {
let current_id = reader.read_u64()?;
let json = reader.read_bytes()?;
if current_id == doc_id {
let doc: Document = serde_json::from_slice(&json).map_err(|e| {
LaurusError::index(format!("failed to deserialize document: {e}"))
})?;
return Ok(Some(doc));
}
}
Ok(None)
}
pub fn get_documents_batch(
&self,
doc_ids: &std::collections::HashSet<u64>,
) -> Result<HashMap<u64, Document>> {
let mut results = HashMap::with_capacity(doc_ids.len());
if doc_ids.is_empty() {
return Ok(results);
}
if !doc_ids.iter().any(|id| self.segment.contains(*id)) {
return Ok(results);
}
if !self.offsets.is_empty() {
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
let mut indexed: Vec<(u64, u64)> = doc_ids
.iter()
.filter_map(|id| self.offsets.get(id).map(|&off| (*id, off)))
.collect();
indexed.sort_unstable_by_key(|&(_, off)| off);
for (doc_id, offset) in indexed {
reader.seek(std::io::SeekFrom::Start(offset))?;
let current_id = reader.read_u64()?;
debug_assert_eq!(current_id, doc_id);
let json = reader.read_bytes()?;
let doc: Document = serde_json::from_slice(&json).map_err(|e| {
LaurusError::index(format!("failed to deserialize document: {e}"))
})?;
results.insert(doc_id, doc);
}
return Ok(results);
}
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_u32()?;
let mut remaining = doc_ids.len();
for _ in 0..doc_count {
if remaining == 0 {
break; }
let current_id = reader.read_u64()?;
let json = reader.read_bytes()?;
if doc_ids.contains(¤t_id) {
let doc: Document = serde_json::from_slice(&json).map_err(|e| {
LaurusError::index(format!("failed to deserialize document: {e}"))
})?;
results.insert(current_id, doc);
remaining -= 1;
}
}
Ok(results)
}
pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<u64>> {
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_u32()?;
for _ in 0..doc_count {
let current_id = reader.read_u64()?;
let json = reader.read_bytes()?;
let doc: Document = serde_json::from_slice(&json)
.map_err(|e| LaurusError::index(format!("failed to deserialize document: {e}")))?;
if doc.fields.get("_id").and_then(|v| v.as_text()) == Some(external_id) {
return Ok(Some(current_id));
}
}
Ok(None)
}
pub fn find_all_by_external_id(&self, external_id: &str) -> Result<Vec<u64>> {
let input = self.storage.open_input(&self.segment.file_name())?;
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_u32()?;
let mut results = Vec::new();
for _ in 0..doc_count {
let current_id = reader.read_u64()?;
let json = reader.read_bytes()?;
let doc: Document = serde_json::from_slice(&json)
.map_err(|e| LaurusError::index(format!("failed to deserialize document: {e}")))?;
if doc.fields.get("_id").and_then(|v| v.as_text()) == Some(external_id) {
results.push(current_id);
}
}
Ok(results)
}
}
const MANIFEST_FILE: &str = "segments.json";
#[derive(Debug, Serialize, Deserialize)]
struct StoreManifest {
version: u32,
segments: Vec<DocumentSegment>,
next_segment_id: u32,
}
#[derive(Debug)]
pub struct UnifiedDocumentStore {
storage: Arc<dyn Storage>,
segments: Vec<DocumentSegment>,
next_segment_id: u32,
pending_docs: HashMap<u64, Document>,
next_doc_id: u64,
reader_cache: HashMap<u32, DocumentSegmentReader>,
doc_cache: parking_lot::Mutex<LruCache<u64, Document>>,
}
impl UnifiedDocumentStore {
pub fn new(storage: Arc<dyn Storage>) -> Self {
let cap = NonZeroUsize::new(DEFAULT_DOC_CACHE_CAPACITY).unwrap();
Self {
storage,
segments: Vec::new(),
next_segment_id: 0,
pending_docs: HashMap::new(),
next_doc_id: 1,
reader_cache: HashMap::new(),
doc_cache: parking_lot::Mutex::new(LruCache::new(cap)),
}
}
pub fn open(storage: Arc<dyn Storage>) -> Result<Self> {
if storage.file_exists(MANIFEST_FILE) {
let input = storage.open_input(MANIFEST_FILE)?;
let mut reader = StructReader::new(input)?;
let json = reader.read_bytes()?;
let manifest: StoreManifest = serde_json::from_slice(&json)
.map_err(|e| LaurusError::index(format!("failed to deserialize manifest: {e}")))?;
let mut next_doc_id = 1;
for segment in &manifest.segments {
if segment.end_doc_id >= next_doc_id {
next_doc_id = segment.end_doc_id + 1;
}
}
let cap = NonZeroUsize::new(DEFAULT_DOC_CACHE_CAPACITY).unwrap();
Ok(Self {
storage,
segments: manifest.segments,
next_segment_id: manifest.next_segment_id,
pending_docs: HashMap::new(),
next_doc_id,
reader_cache: HashMap::new(),
doc_cache: parking_lot::Mutex::new(LruCache::new(cap)),
})
} else {
Ok(Self::new(storage))
}
}
pub fn commit(&mut self) -> Result<()> {
if !self.pending_docs.is_empty() {
let docs = std::mem::take(&mut self.pending_docs);
self.add_segment(&docs)?;
}
for segment in &self.segments {
if self.reader_cache.contains_key(&segment.id) {
continue;
}
if let Ok(reader) =
DocumentSegmentReader::with_index(self.storage.clone(), segment.clone())
{
self.reader_cache.insert(segment.id, reader);
}
}
let manifest = StoreManifest {
version: 1,
segments: self.segments.clone(),
next_segment_id: self.next_segment_id,
};
let json = serde_json::to_vec(&manifest)
.map_err(|e| LaurusError::index(format!("failed to serialize manifest: {e}")))?;
let tmp_file = format!("{}.tmp", MANIFEST_FILE);
let output = self.storage.create_output(&tmp_file)?;
let mut writer = StructWriter::new(output);
writer.write_bytes(&json)?;
writer.close()?;
self.storage.rename_file(&tmp_file, MANIFEST_FILE)?;
self.storage.sync()?;
Ok(())
}
pub fn add_document(&mut self, doc: Document) -> Result<u64> {
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
self.pending_docs.insert(doc_id, doc);
Ok(doc_id)
}
pub fn next_doc_id(&self) -> u64 {
self.next_doc_id
}
pub fn put_document_with_id(&mut self, doc_id: u64, doc: Document) {
self.pending_docs.insert(doc_id, doc);
self.doc_cache.get_mut().pop(&doc_id);
if doc_id >= self.next_doc_id {
self.next_doc_id = doc_id + 1;
}
}
pub fn add_segment(&mut self, docs: &HashMap<u64, Document>) -> Result<DocumentSegment> {
let writer = DocumentSegmentWriter::new(self.storage.clone());
let segment = writer.write_segment(self.next_segment_id, docs)?;
self.segments.push(segment.clone());
self.next_segment_id += 1;
Ok(segment)
}
pub fn get_document(&self, doc_id: u64) -> Result<Option<Document>> {
if let Some(doc) = self.pending_docs.get(&doc_id) {
return Ok(Some(doc.clone()));
}
{
let mut cache = self.doc_cache.lock();
if let Some(doc) = cache.get(&doc_id) {
return Ok(Some(doc.clone()));
}
}
for segment in self.segments.iter().rev() {
if segment.contains(doc_id) {
let doc_opt = if let Some(reader) = self.reader_cache.get(&segment.id) {
reader.get_document(doc_id)?
} else {
let reader = DocumentSegmentReader::new(self.storage.clone(), segment.clone());
reader.get_document(doc_id)?
};
if let Some(doc) = doc_opt {
self.doc_cache.lock().put(doc_id, doc.clone());
return Ok(Some(doc));
}
}
}
Ok(None)
}
pub fn get_documents_batch(&self, doc_ids: &[u64]) -> Result<HashMap<u64, Document>> {
let mut results = HashMap::with_capacity(doc_ids.len());
if doc_ids.is_empty() {
return Ok(results);
}
let id_set: std::collections::HashSet<u64> = doc_ids.iter().copied().collect();
for &doc_id in doc_ids {
if let Some(doc) = self.pending_docs.get(&doc_id) {
results.insert(doc_id, doc.clone());
}
}
let remaining: std::collections::HashSet<u64> = id_set
.iter()
.filter(|id| !results.contains_key(id))
.copied()
.collect();
if remaining.is_empty() {
return Ok(results);
}
for segment in &self.segments {
let segment_ids: std::collections::HashSet<u64> = remaining
.iter()
.filter(|id| segment.contains(**id))
.copied()
.collect();
if segment_ids.is_empty() {
continue;
}
if let Some(reader) = self.reader_cache.get(&segment.id) {
let batch = reader.get_documents_batch(&segment_ids)?;
results.extend(batch);
} else {
let reader = DocumentSegmentReader::new(self.storage.clone(), segment.clone());
let batch = reader.get_documents_batch(&segment_ids)?;
results.extend(batch);
}
}
Ok(results)
}
pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<u64>> {
for (id, doc) in &self.pending_docs {
if doc.fields.get("_id").and_then(|v| v.as_text()) == Some(external_id) {
return Ok(Some(*id));
}
}
for segment in self.segments.iter().rev() {
let reader = DocumentSegmentReader::new(self.storage.clone(), segment.clone());
if let Some(id) = reader.find_by_external_id(external_id)? {
return Ok(Some(id));
}
}
Ok(None)
}
pub fn find_all_by_external_id(&self, external_id: &str) -> Result<Vec<u64>> {
let mut results = Vec::new();
for (id, doc) in &self.pending_docs {
if doc.fields.get("_id").and_then(|v| v.as_text()) == Some(external_id) {
results.push(*id);
}
}
for segment in self.segments.iter() {
let reader = DocumentSegmentReader::new(self.storage.clone(), segment.clone());
results.extend(reader.find_all_by_external_id(external_id)?);
}
Ok(results)
}
pub fn delete_document(&mut self, _doc_id: u64) -> Result<()> {
Ok(())
}
pub fn segments(&self) -> &[DocumentSegment] {
&self.segments
}
pub fn delete_segment_files(&self, segment_id: u32) -> Result<()> {
if let Some(segment) = self.segments.iter().find(|s| s.id == segment_id) {
self.storage.delete_file(&segment.file_name())?;
}
Ok(())
}
}