use super::store::RdfStore;
use super::triple::Triple;
pub trait TripleSink {
fn emit(&mut self, triple: Triple) -> Result<(), String>;
fn finish(&mut self) -> Result<(), String> {
Ok(())
}
}
pub struct VecSink {
triples: Vec<Triple>,
}
impl VecSink {
#[must_use]
pub fn new() -> Self {
Self {
triples: Vec::new(),
}
}
#[must_use]
pub fn into_triples(self) -> Vec<Triple> {
self.triples
}
}
impl Default for VecSink {
fn default() -> Self {
Self::new()
}
}
impl TripleSink for VecSink {
fn emit(&mut self, triple: Triple) -> Result<(), String> {
self.triples.push(triple);
Ok(())
}
}
pub struct BatchInsertSink<'a> {
store: &'a RdfStore,
buffer: Vec<Triple>,
batch_size: usize,
total_inserted: usize,
}
impl<'a> BatchInsertSink<'a> {
#[must_use]
pub fn new(store: &'a RdfStore, batch_size: usize) -> Self {
debug_assert!(
batch_size > 0,
"batch_size must be > 0 to amortize flush overhead"
);
let batch_size = batch_size.max(1);
Self {
store,
buffer: Vec::with_capacity(batch_size),
batch_size,
total_inserted: 0,
}
}
#[must_use]
pub fn total_inserted(&self) -> usize {
self.total_inserted
}
fn flush(&mut self) {
if !self.buffer.is_empty() {
let batch = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
self.total_inserted += self.store.batch_insert(batch);
}
}
}
impl TripleSink for BatchInsertSink<'_> {
fn emit(&mut self, triple: Triple) -> Result<(), String> {
self.buffer.push(triple);
if self.buffer.len() >= self.batch_size {
self.flush();
}
Ok(())
}
fn finish(&mut self) -> Result<(), String> {
self.flush();
Ok(())
}
}
pub struct CountSink {
count: usize,
}
impl CountSink {
#[must_use]
pub fn new() -> Self {
Self { count: 0 }
}
#[must_use]
pub fn count(&self) -> usize {
self.count
}
}
impl Default for CountSink {
fn default() -> Self {
Self::new()
}
}
impl TripleSink for CountSink {
fn emit(&mut self, _triple: Triple) -> Result<(), String> {
self.count += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::rdf::term::Term;
fn sample_triple() -> Triple {
Triple::new(
Term::iri("http://example.org/s"),
Term::iri("http://example.org/p"),
Term::literal("o"),
)
}
#[test]
fn vec_sink_collects() {
let mut sink = VecSink::new();
sink.emit(sample_triple()).unwrap();
sink.emit(sample_triple()).unwrap();
assert_eq!(sink.into_triples().len(), 2);
}
#[test]
fn count_sink_counts() {
let mut sink = CountSink::new();
sink.emit(sample_triple()).unwrap();
sink.emit(sample_triple()).unwrap();
sink.emit(sample_triple()).unwrap();
assert_eq!(sink.count(), 3);
}
#[test]
fn batch_insert_sink_flushes_on_finish() {
let store = RdfStore::new();
let mut sink = BatchInsertSink::new(&store, 100);
sink.emit(sample_triple()).unwrap();
assert_eq!(store.len(), 0);
sink.finish().unwrap();
assert_eq!(store.len(), 1);
assert_eq!(sink.total_inserted(), 1);
}
#[test]
fn batch_insert_sink_flushes_at_batch_size() {
let store = RdfStore::new();
let mut sink = BatchInsertSink::new(&store, 2);
for i in 0..3 {
sink.emit(Triple::new(
Term::iri(format!("http://example.org/s{i}")),
Term::iri("http://example.org/p"),
Term::literal("o"),
))
.unwrap();
}
assert_eq!(store.len(), 2);
sink.finish().unwrap();
assert_eq!(store.len(), 3);
}
#[test]
fn batch_insert_sink_deduplicates() {
let store = RdfStore::new();
let mut sink = BatchInsertSink::new(&store, 100);
sink.emit(sample_triple()).unwrap();
sink.emit(sample_triple()).unwrap();
sink.finish().unwrap();
assert_eq!(store.len(), 1);
assert_eq!(sink.total_inserted(), 1);
}
}