#![cfg(feature = "columnar")]
use crate::model::{BlankNode, Literal, NamedNode, Triple};
use crate::OxirsError;
#[cfg(all(feature = "columnar", feature = "arrow"))]
use arrow::{
array::{ArrayBuilder, StringArray, StringBuilder, UInt64Array, UInt64Builder},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
#[cfg(all(feature = "columnar", feature = "datafusion"))]
use datafusion::prelude::*;
#[cfg(all(feature = "columnar", feature = "datafusion"))]
use datafusion::execution::context::SessionContext;
#[cfg(all(feature = "columnar", feature = "parquet"))]
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
#[cfg(all(feature = "columnar", feature = "parquet"))]
use parquet::file::reader::ParquetReadOptions;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ColumnarConfig {
pub path: PathBuf,
pub batch_size: usize,
pub dictionary_encoding: bool,
pub compression: CompressionType,
pub partition_strategy: PartitionStrategy,
}
impl Default for ColumnarConfig {
fn default() -> Self {
ColumnarConfig {
path: PathBuf::from("/var/oxirs/columnar"),
batch_size: 10000,
dictionary_encoding: true,
compression: CompressionType::Snappy,
partition_strategy: PartitionStrategy::ByPredicate,
}
}
}
#[derive(Debug, Clone)]
pub enum CompressionType {
None,
Snappy,
Gzip,
Lz4,
Zstd,
}
#[derive(Debug, Clone)]
pub enum PartitionStrategy {
None,
ByPredicate,
ByGraph,
ByTimeRange { bucket_hours: u32 },
Custom(String),
}
pub struct ColumnarStorage {
config: ColumnarConfig,
ctx: Arc<RwLock<SessionContext>>,
schema: Arc<Schema>,
uri_dictionary: Arc<RwLock<UriDictionary>>,
writer: Arc<RwLock<Option<BatchWriter>>>,
stats: Arc<RwLock<ColumnarStats>>,
}
struct UriDictionary {
uri_to_id: HashMap<String, u64>,
id_to_uri: HashMap<u64, String>,
next_id: u64,
}
struct BatchWriter {
subject_builder: UInt64Builder,
predicate_builder: UInt64Builder,
object_type_builder: StringBuilder,
object_value_builder: StringBuilder,
object_datatype_builder: UInt64Builder,
object_lang_builder: StringBuilder,
graph_builder: UInt64Builder,
timestamp_builder: UInt64Builder,
current_size: usize,
}
#[derive(Debug, Default)]
struct ColumnarStats {
total_triples: u64,
total_partitions: u64,
total_bytes: u64,
query_count: u64,
avg_query_time_ms: f64,
}
impl ColumnarStorage {
pub async fn new(config: ColumnarConfig) -> Result<Self, OxirsError> {
std::fs::create_dir_all(&config.path)?;
let schema = Arc::new(Schema::new(vec![
Field::new("subject_id", DataType::UInt64, false),
Field::new("predicate_id", DataType::UInt64, false),
Field::new("object_type", DataType::Utf8, false),
Field::new("object_value", DataType::Utf8, false),
Field::new("object_datatype_id", DataType::UInt64, true),
Field::new("object_lang", DataType::Utf8, true),
Field::new("graph_id", DataType::UInt64, true),
Field::new("timestamp", DataType::UInt64, false),
]));
let ctx = SessionContext::new();
let pattern = config.path.join("*.parquet");
if let Ok(paths) = glob::glob(pattern.to_str().expect("path should be valid UTF-8")) {
for path in paths.flatten() {
let table_name = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("triples");
ctx.register_parquet(
table_name,
path.to_str().expect("path should be valid UTF-8"),
ParquetReadOptions::default(),
)
.await?;
}
}
Ok(ColumnarStorage {
config,
ctx: Arc::new(RwLock::new(ctx)),
schema,
uri_dictionary: Arc::new(RwLock::new(UriDictionary::new())),
writer: Arc::new(RwLock::new(None)),
stats: Arc::new(RwLock::new(ColumnarStats::default())),
})
}
pub async fn store_triple(&self, triple: &Triple) -> Result<(), OxirsError> {
let mut writer_guard = self.writer.write().await;
if writer_guard.is_none() {
*writer_guard = Some(BatchWriter::new());
}
let writer = writer_guard.as_mut().expect("writer should be initialized after is_none check");
let mut dict = self.uri_dictionary.write().await;
let subject_str = match triple.subject() {
crate::model::Subject::NamedNode(nn) => nn.as_str(),
crate::model::Subject::BlankNode(bn) => bn.as_str(),
crate::model::Subject::Variable(v) => v.as_str(),
crate::model::Subject::QuotedTriple(_) => "_:quoted", };
let subject_id = dict.get_or_create_uri(subject_str);
let predicate_str = match triple.predicate() {
crate::model::Predicate::NamedNode(nn) => nn.as_str(),
crate::model::Predicate::Variable(v) => v.as_str(),
};
let predicate_id = dict.get_or_create_uri(predicate_str);
writer.subject_builder.append_value(subject_id);
writer.predicate_builder.append_value(predicate_id);
match triple.object() {
crate::model::Object::NamedNode(nn) => {
writer.object_type_builder.append_value("uri");
writer.object_value_builder.append_value(nn.as_str());
writer.object_datatype_builder.append_null();
writer.object_lang_builder.append_null();
}
crate::model::Object::BlankNode(bn) => {
writer.object_type_builder.append_value("bnode");
writer.object_value_builder.append_value(bn.as_str());
writer.object_datatype_builder.append_null();
writer.object_lang_builder.append_null();
}
crate::model::Object::Literal(lit) => {
writer.object_type_builder.append_value("literal");
writer.object_value_builder.append_value(lit.value());
let dt = lit.datatype();
let dt_id = dict.get_or_create_uri(dt.as_str());
writer.object_datatype_builder.append_value(dt_id);
if let Some(lang) = lit.language() {
writer.object_lang_builder.append_value(lang);
} else {
writer.object_lang_builder.append_null();
}
}
_ => {
writer.object_type_builder.append_value("other");
writer.object_value_builder.append_value("");
writer.object_datatype_builder.append_null();
writer.object_lang_builder.append_null();
}
}
writer.graph_builder.append_value(0); writer.timestamp_builder.append_value(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time should be after UNIX epoch")
.as_secs(),
);
writer.current_size += 1;
if writer.current_size >= self.config.batch_size {
self.flush_batch(&mut writer_guard).await?;
}
let mut stats = self.stats.write().await;
stats.total_triples += 1;
Ok(())
}
pub async fn query_sql(&self, sql: &str) -> Result<Vec<Triple>, OxirsError> {
let start = std::time::Instant::now();
let ctx = self.ctx.read().await;
let df = ctx.sql(sql).await?;
let batches = df.collect().await?;
let mut results = Vec::new();
let dict = self.uri_dictionary.read().await;
for batch in batches {
let subject_ids = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| OxirsError::Query("Invalid subject column".to_string()))?;
let predicate_ids = batch
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| OxirsError::Query("Invalid predicate column".to_string()))?;
let object_types_str: Vec<&str> =
if let Some(arr) = batch.column(2).as_any().downcast_ref::<StringArray>() {
(0..batch.num_rows()).map(|i| arr.value(i)).collect()
} else if let Some(arr) = batch
.column(2)
.as_any()
.downcast_ref::<arrow::array::StringViewArray>()
{
(0..batch.num_rows()).map(|i| arr.value(i)).collect()
} else {
let col_type = batch.column(2).data_type();
return Err(OxirsError::Query(format!(
"Invalid object type column type: {:?}, expected Utf8 or Utf8View",
col_type
)));
};
let object_values_str: Vec<&str> =
if let Some(arr) = batch.column(3).as_any().downcast_ref::<StringArray>() {
(0..batch.num_rows()).map(|i| arr.value(i)).collect()
} else if let Some(arr) = batch
.column(3)
.as_any()
.downcast_ref::<arrow::array::StringViewArray>()
{
(0..batch.num_rows()).map(|i| arr.value(i)).collect()
} else {
let col_type = batch.column(3).data_type();
return Err(OxirsError::Query(format!(
"Invalid object value column type: {:?}, expected Utf8 or Utf8View",
col_type
)));
};
for i in 0..batch.num_rows() {
let subject_id = subject_ids.value(i);
let predicate_id = predicate_ids.value(i);
let object_type = object_types_str[i];
let object_value = object_values_str[i];
let subject_uri = dict.get_uri(subject_id).ok_or_else(|| {
OxirsError::Query(format!("Unknown subject ID: {subject_id}"))
})?;
let predicate_uri = dict.get_uri(predicate_id).ok_or_else(|| {
OxirsError::Query(format!("Unknown predicate ID: {predicate_id}"))
})?;
let subject = if subject_uri.starts_with("_:") {
crate::model::Subject::BlankNode(BlankNode::new(subject_uri)?)
} else {
crate::model::Subject::NamedNode(NamedNode::new(subject_uri)?)
};
let predicate = crate::model::Predicate::NamedNode(NamedNode::new(predicate_uri)?);
let object = match object_type {
"uri" => crate::model::Object::NamedNode(NamedNode::new(object_value)?),
"literal" => crate::model::Object::Literal(Literal::new(object_value)),
_ => continue, };
results.push(Triple::new(subject, predicate, object));
}
}
let elapsed = start.elapsed();
let mut stats = self.stats.write().await;
stats.query_count += 1;
stats.avg_query_time_ms = (stats.avg_query_time_ms * (stats.query_count - 1) as f64
+ elapsed.as_millis() as f64)
/ stats.query_count as f64;
Ok(results)
}
pub async fn analyze(&self, query: AnalyticalQuery) -> Result<AnalysisResult, OxirsError> {
match query {
AnalyticalQuery::CountByPredicate => {
let sql = "SELECT predicate_id, COUNT(*) as count
FROM triples
GROUP BY predicate_id
ORDER BY count DESC";
let ctx = self.ctx.read().await;
let df = ctx.sql(sql).await?;
let batches = df.collect().await?;
let mut counts = HashMap::new();
let dict = self.uri_dictionary.read().await;
for batch in batches {
let predicate_ids = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.expect("predicate_id column should be UInt64Array");
let count_values = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.ok_or_else(|| {
OxirsError::Query("Invalid count column type".to_string())
})?;
for i in 0..batch.num_rows() {
let pred_id = predicate_ids.value(i);
let count = count_values.value(i) as u64;
if let Some(pred_uri) = dict.get_uri(pred_id) {
counts.insert(pred_uri.to_string(), count);
}
}
}
Ok(AnalysisResult::PredicateCounts(counts))
}
AnalyticalQuery::TopSubjects { limit } => {
let sql = format!(
"SELECT subject_id, COUNT(*) as count
FROM triples
GROUP BY subject_id
ORDER BY count DESC
LIMIT {}",
limit
);
let ctx = self.ctx.read().await;
let df = ctx.sql(&sql).await?;
let batches = df.collect().await?;
let mut subjects = Vec::new();
let dict = self.uri_dictionary.read().await;
for batch in batches {
let subject_ids = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.expect("subject_id column should be UInt64Array");
let count_values = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.ok_or_else(|| {
OxirsError::Query("Invalid count column type".to_string())
})?;
for i in 0..batch.num_rows() {
let subj_id = subject_ids.value(i);
let count = count_values.value(i) as u64;
if let Some(subj_uri) = dict.get_uri(subj_id) {
subjects.push((subj_uri.to_string(), count));
}
}
}
Ok(AnalysisResult::TopSubjects(subjects))
}
AnalyticalQuery::TimeSeriesAnalysis {
predicate,
interval,
} => {
Ok(AnalysisResult::TimeSeries(Vec::new()))
}
}
}
async fn flush_batch(&self, writer_guard: &mut Option<BatchWriter>) -> Result<(), OxirsError> {
if let Some(mut writer) = writer_guard.take() {
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(writer.subject_builder.finish()),
Arc::new(writer.predicate_builder.finish()),
Arc::new(writer.object_type_builder.finish()),
Arc::new(writer.object_value_builder.finish()),
Arc::new(writer.object_datatype_builder.finish()),
Arc::new(writer.object_lang_builder.finish()),
Arc::new(writer.graph_builder.finish()),
Arc::new(writer.timestamp_builder.finish()),
],
)?;
let partition = self.get_partition_name();
let path = self.config.path.join(format!("{}.parquet", partition));
let file = std::fs::File::create(&path)?;
let props = WriterProperties::builder()
.set_compression(self.get_parquet_compression())
.build();
let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
let ctx = self.ctx.write().await;
ctx.register_parquet(
&partition,
path.to_str().expect("path should be valid UTF-8"),
ParquetReadOptions::default(),
)
.await?;
self.create_triples_view(&ctx).await?;
let mut stats = self.stats.write().await;
stats.total_partitions += 1;
stats.total_bytes += std::fs::metadata(&path)?.len();
}
Ok(())
}
fn get_partition_name(&self) -> String {
match &self.config.partition_strategy {
PartitionStrategy::None => "triples".to_string(),
PartitionStrategy::ByPredicate => {
format!(
"triples_{}_{}",
chrono::Utc::now().format("%Y%m%d_%H%M%S"),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time should be after UNIX epoch")
.as_nanos()
)
}
PartitionStrategy::ByGraph => {
format!("graph_default_{}", chrono::Utc::now().format("%Y%m%d"))
}
PartitionStrategy::ByTimeRange { bucket_hours } => {
let now = chrono::Utc::now();
let bucket = now.timestamp() / (*bucket_hours as i64 * 3600);
format!("time_bucket_{bucket}")
}
PartitionStrategy::Custom(name) => name.clone(),
}
}
fn get_parquet_compression(&self) -> parquet::basic::Compression {
match self.config.compression {
CompressionType::None => parquet::basic::Compression::UNCOMPRESSED,
CompressionType::Snappy => parquet::basic::Compression::SNAPPY,
CompressionType::Gzip => {
parquet::basic::Compression::GZIP(parquet::basic::GzipLevel::default())
}
CompressionType::Lz4 => parquet::basic::Compression::LZ4,
CompressionType::Zstd => {
parquet::basic::Compression::ZSTD(parquet::basic::ZstdLevel::default())
}
}
}
async fn create_triples_view(&self, ctx: &SessionContext) -> Result<(), OxirsError> {
let tables = ctx.catalog_names();
let mut triple_tables = Vec::new();
for catalog in tables {
let schemas = ctx.catalog(&catalog).expect("catalog should exist").schema_names();
for schema in schemas {
let tables = ctx
.catalog(&catalog)
.expect("catalog should exist")
.schema(&schema)
.expect("schema should exist")
.table_names();
for table in tables {
if table == "triples" {
continue;
}
if table.starts_with("triples")
|| table.contains("time_bucket")
|| table.contains("graph_")
{
triple_tables.push(format!("{}.{}.{}", catalog, schema, table));
}
}
}
}
if !triple_tables.is_empty() {
let union_query = triple_tables
.iter()
.map(|t| format!("SELECT * FROM {t}"))
.collect::<Vec<_>>()
.join(" UNION ALL ");
let create_view_sql = format!("CREATE OR REPLACE VIEW triples AS {union_query}");
ctx.sql(&create_view_sql).await?;
}
Ok(())
}
}
impl UriDictionary {
fn new() -> Self {
UriDictionary {
uri_to_id: HashMap::new(),
id_to_uri: HashMap::new(),
next_id: 1,
}
}
fn get_or_create_uri(&mut self, uri: &str) -> u64 {
if let Some(&id) = self.uri_to_id.get(uri) {
return id;
}
let id = self.next_id;
self.uri_to_id.insert(uri.to_string(), id);
self.id_to_uri.insert(id, uri.to_string());
self.next_id += 1;
id
}
fn get_uri(&self, id: u64) -> Option<&str> {
self.id_to_uri.get(&id).map(|s| s.as_str())
}
fn get_term(&self, id: u64) -> Option<crate::model::Subject> {
self.get_uri(id).and_then(|uri| {
if uri.starts_with("_:") {
BlankNode::new(uri)
.ok()
.map(crate::model::Subject::BlankNode)
} else {
NamedNode::new(uri)
.ok()
.map(crate::model::Subject::NamedNode)
}
})
}
}
impl BatchWriter {
fn new() -> Self {
BatchWriter {
subject_builder: UInt64Builder::new(),
predicate_builder: UInt64Builder::new(),
object_type_builder: StringBuilder::new(),
object_value_builder: StringBuilder::new(),
object_datatype_builder: UInt64Builder::new(),
object_lang_builder: StringBuilder::new(),
graph_builder: UInt64Builder::new(),
timestamp_builder: UInt64Builder::new(),
current_size: 0,
}
}
}
#[derive(Debug, Clone)]
pub enum AnalyticalQuery {
CountByPredicate,
TopSubjects { limit: usize },
TimeSeriesAnalysis {
predicate: String,
interval: TimeInterval,
},
}
#[derive(Debug, Clone)]
pub enum TimeInterval {
Hour,
Day,
Week,
Month,
}
#[derive(Debug)]
pub enum AnalysisResult {
PredicateCounts(HashMap<String, u64>),
TopSubjects(Vec<(String, u64)>),
TimeSeries(Vec<(chrono::DateTime<chrono::Utc>, f64)>),
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_columnar_storage() {
let test_dir = format!(
"/tmp/oxirs_columnar_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("operation should succeed")
.as_millis()
);
let config = ColumnarConfig {
path: PathBuf::from(&test_dir),
..Default::default()
};
let storage = ColumnarStorage::new(config).await.expect("async operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("test")),
);
storage.store_triple(&triple).await.expect("async operation should succeed");
{
let mut writer_guard = storage.writer.write().await;
storage.flush_batch(&mut writer_guard).await.expect("async operation should succeed");
}
let results = storage
.query_sql("SELECT * FROM triples WHERE object_value = 'test'")
.await
.expect("operation should succeed");
assert_eq!(results.len(), 1);
}
#[tokio::test]
async fn test_analytical_queries() {
let test_dir = format!(
"/tmp/oxirs_columnar_analytics_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("operation should succeed")
.as_millis()
);
let config = ColumnarConfig {
path: PathBuf::from(&test_dir),
batch_size: 2,
..Default::default()
};
let storage = ColumnarStorage::new(config).await.expect("async operation should succeed");
let predicates = ["p1", "p1", "p2", "p1", "p3"];
for (i, pred) in predicates.iter().enumerate() {
let triple = Triple::new(
NamedNode::new(format!("http://example.org/s{i}")).expect("valid IRI from format"),
NamedNode::new(format!("http://example.org/{pred}")).expect("valid IRI from format"),
crate::model::Object::Literal(Literal::new(format!("value{i}"))),
);
storage.store_triple(&triple).await.expect("async operation should succeed");
}
{
let mut writer_guard = storage.writer.write().await;
storage.flush_batch(&mut writer_guard).await.expect("async operation should succeed");
}
let result = storage
.analyze(AnalyticalQuery::CountByPredicate)
.await
.expect("operation should succeed");
if let AnalysisResult::PredicateCounts(counts) = result {
assert_eq!(counts.get("http://example.org/p1"), Some(&3));
assert_eq!(counts.get("http://example.org/p2"), Some(&1));
assert_eq!(counts.get("http://example.org/p3"), Some(&1));
} else {
panic!("Unexpected result type");
}
}
}