use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use grafeo_common::memory::buffer::SpillError;
use grafeo_common::storage::page_fetcher::PageFetcher;
use grafeo_common::storage::section::{Section, SectionType};
use grafeo_common::utils::error::{Error, Result};
use crate::graph::rdf::RdfStore;
const RING_SECTION_VERSION: u8 = 2;
const V2_MAGIC: &[u8; 4] = b"GRFR";
pub struct RdfRingSection {
store: Arc<RdfStore>,
dirty: AtomicBool,
}
impl RdfRingSection {
#[must_use]
pub fn new(store: Arc<RdfStore>) -> Self {
Self {
store,
dirty: AtomicBool::new(false),
}
}
pub fn mark_dirty(&self) {
self.dirty.store(true, Ordering::Release);
}
}
impl Section for RdfRingSection {
fn section_type(&self) -> SectionType {
SectionType::RdfRing
}
fn version(&self) -> u8 {
RING_SECTION_VERSION
}
fn serialize(&self) -> Result<Vec<u8>> {
match self.store.ring() {
Some(ring) => Ok(super::serialize_triple_ring(&ring)),
None => Ok(Vec::new()),
}
}
fn deserialize(&mut self, data: &[u8]) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let ring = if data.len() >= 4 && &data[0..4] == V2_MAGIC {
super::deserialize_triple_ring(bytes::Bytes::copy_from_slice(data))
.map_err(|e| Error::Serialization(e.to_string()))?
} else {
super::TripleRing::load_from_bytes(data)
.map_err(|e| Error::Serialization(e.to_string()))?
};
self.store.set_ring(ring);
Ok(())
}
fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
fn mark_clean(&self) {
self.dirty.store(false, Ordering::Release);
}
fn memory_usage(&self) -> usize {
self.store.ring().map_or(0, |r| r.size_bytes())
}
fn swap_to_mmap(&self, fetcher: Arc<dyn PageFetcher>) -> std::result::Result<(), SpillError> {
let len = fetcher.len();
if len == 0 {
return Ok(());
}
let slice = fetcher
.fetch(0, len)
.map_err(|e| SpillError::IoError(e.to_string()))?;
let data = bytes::Bytes::copy_from_slice(slice);
let ring = if data.len() >= 4 && &data[0..4] == V2_MAGIC {
super::deserialize_triple_ring(data).map_err(|e| SpillError::IoError(e.to_string()))?
} else {
super::TripleRing::load_from_bytes(&data)
.map_err(|e| SpillError::IoError(e.to_string()))?
};
self.store.set_ring(ring);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::rdf::{Term, Triple};
fn test_store() -> Arc<RdfStore> {
let store = Arc::new(RdfStore::new());
store.bulk_load(vec![
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Alix"),
),
Triple::new(
Term::iri("http://ex.org/gus"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Gus"),
),
Triple::new(
Term::iri("http://ex.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/knows"),
Term::iri("http://ex.org/gus"),
),
]);
store
}
#[test]
fn section_type_is_rdf_ring() {
let store = test_store();
let section = RdfRingSection::new(store);
assert_eq!(section.section_type(), SectionType::RdfRing);
assert_eq!(section.version(), 2);
}
#[test]
fn section_dirty_tracking() {
let store = test_store();
let section = RdfRingSection::new(store);
assert!(!section.is_dirty());
section.mark_dirty();
assert!(section.is_dirty());
section.mark_clean();
assert!(!section.is_dirty());
}
#[test]
fn section_serialize_empty() {
let store = Arc::new(RdfStore::new());
let section = RdfRingSection::new(store);
let bytes = section.serialize().unwrap();
assert!(bytes.is_empty());
}
#[test]
fn section_roundtrip() {
let store = test_store();
let section = RdfRingSection::new(Arc::clone(&store));
let bytes = section.serialize().unwrap();
assert!(!bytes.is_empty());
let store2 = Arc::new(RdfStore::new());
let mut section2 = RdfRingSection::new(Arc::clone(&store2));
section2.deserialize(&bytes).unwrap();
let ring = store2.ring().expect("ring should be loaded");
assert_eq!(ring.len(), 3);
use crate::graph::rdf::TriplePattern;
let name_pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
assert_eq!(ring.count(&name_pattern), 2);
}
#[test]
fn section_memory_usage() {
let store = test_store();
let section = RdfRingSection::new(store);
assert!(section.memory_usage() > 0);
}
#[test]
fn alix_section_serialize_writes_v2_magic() {
let store = test_store();
let section = RdfRingSection::new(store);
let bytes = section.serialize().unwrap();
assert!(bytes.len() > 4);
assert_eq!(&bytes[0..4], V2_MAGIC, "new writes must use v2 magic");
}
#[test]
fn gus_section_v1_bincode_buffer_still_loads() {
let original = test_store();
let ring = original.ring().expect("ring built").as_ref().clone();
let v1_bytes = ring.save_to_bytes().unwrap();
assert_ne!(
&v1_bytes[0..4],
V2_MAGIC,
"v1 bincode must not have GRFR magic"
);
let store2 = Arc::new(RdfStore::new());
let mut section2 = RdfRingSection::new(Arc::clone(&store2));
section2.deserialize(&v1_bytes).unwrap();
let restored = store2.ring().expect("ring loaded");
assert_eq!(restored.len(), 3);
}
struct MemFetcher(Vec<u8>);
impl PageFetcher for MemFetcher {
fn fetch(&self, offset: usize, len: usize) -> std::io::Result<&[u8]> {
let end = offset
.checked_add(len)
.ok_or_else(|| std::io::Error::other("overflow"))?;
if end > self.0.len() {
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
}
Ok(&self.0[offset..end])
}
fn len(&self) -> usize {
self.0.len()
}
fn advise(
&self,
_offset: usize,
_len: usize,
_hint: grafeo_common::storage::page_fetcher::AccessHint,
) {
}
}
#[test]
fn shosanna_swap_to_mmap_serves_queries_from_bytes() {
let original = test_store();
let v2_bytes = {
let section = RdfRingSection::new(Arc::clone(&original));
section.serialize().unwrap()
};
let store = Arc::new(RdfStore::new());
assert!(store.ring().is_none());
let section = RdfRingSection::new(Arc::clone(&store));
let fetcher: Arc<dyn PageFetcher> = Arc::new(MemFetcher(v2_bytes));
section.swap_to_mmap(fetcher).expect("swap_to_mmap");
let ring = store.ring().expect("ring loaded via swap_to_mmap");
assert_eq!(ring.len(), 3);
use crate::graph::rdf::TriplePattern;
let name_pattern = TriplePattern {
subject: None,
predicate: Some(Term::iri("http://xmlns.com/foaf/0.1/name")),
object: None,
};
assert_eq!(ring.count(&name_pattern), 2);
}
#[test]
fn butch_swap_to_mmap_empty_fetcher_is_noop() {
let store = Arc::new(RdfStore::new());
let section = RdfRingSection::new(Arc::clone(&store));
let fetcher: Arc<dyn PageFetcher> = Arc::new(MemFetcher(Vec::new()));
section.swap_to_mmap(fetcher).expect("empty swap is ok");
assert!(store.ring().is_none());
}
#[test]
fn django_swap_to_mmap_v1_bincode_fetcher_still_loads() {
let original = test_store();
let ring = original.ring().expect("ring built").as_ref().clone();
let v1_bytes = ring.save_to_bytes().unwrap();
assert_ne!(&v1_bytes[0..4], V2_MAGIC);
let store = Arc::new(RdfStore::new());
let section = RdfRingSection::new(Arc::clone(&store));
let fetcher: Arc<dyn PageFetcher> = Arc::new(MemFetcher(v1_bytes));
section.swap_to_mmap(fetcher).expect("v1 fallback in swap");
assert_eq!(store.ring().unwrap().len(), 3);
}
#[test]
fn vincent_section_v1_then_resersialize_yields_v2() {
let original = test_store();
let ring = original.ring().expect("ring built").as_ref().clone();
let v1_bytes = ring.save_to_bytes().unwrap();
let store2 = Arc::new(RdfStore::new());
let mut section2 = RdfRingSection::new(Arc::clone(&store2));
section2.deserialize(&v1_bytes).unwrap();
let v2_bytes = section2.serialize().unwrap();
assert_eq!(&v2_bytes[0..4], V2_MAGIC, "post-migration write is v2");
let store3 = Arc::new(RdfStore::new());
let mut section3 = RdfRingSection::new(Arc::clone(&store3));
section3.deserialize(&v2_bytes).unwrap();
assert_eq!(store3.ring().unwrap().len(), 3);
}
}