use crate::model::{Literal, Term, Triple, TriplePattern};
use crate::OxirsError;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::ops::Bound;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct TemporalConfig {
pub path: PathBuf,
pub bucket_duration: Duration,
pub retention: RetentionPolicy,
pub indexing: TemporalIndexing,
pub compression: bool,
}
impl Default for TemporalConfig {
fn default() -> Self {
TemporalConfig {
path: PathBuf::from("/var/oxirs/temporal"),
bucket_duration: Duration::hours(1),
retention: RetentionPolicy::Days(365),
indexing: TemporalIndexing::default(),
compression: true,
}
}
}
#[derive(Clone)]
pub enum RetentionPolicy {
Forever,
Days(u32),
Months(u32),
Versions(u32),
Custom(Arc<dyn Fn(&TemporalTriple) -> bool + Send + Sync>),
}
impl std::fmt::Debug for RetentionPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RetentionPolicy::Forever => write!(f, "Forever"),
RetentionPolicy::Days(n) => write!(f, "Days({n})"),
RetentionPolicy::Months(n) => write!(f, "Months({n})"),
RetentionPolicy::Versions(n) => write!(f, "Versions({n})"),
RetentionPolicy::Custom(_) => write!(f, "Custom(<function>)"),
}
}
}
#[derive(Debug, Clone)]
pub struct TemporalIndexing {
pub interval_index: bool,
pub entity_index: bool,
pub change_index: bool,
pub allen_relations: bool,
}
impl Default for TemporalIndexing {
fn default() -> Self {
TemporalIndexing {
interval_index: true,
entity_index: true,
change_index: true,
allen_relations: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TemporalTriple {
pub triple: Triple,
pub valid_from: DateTime<Utc>,
pub valid_to: Option<DateTime<Utc>>,
pub transaction_time: DateTime<Utc>,
pub metadata: TemporalMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TemporalMetadata {
pub certainty: Option<f64>,
pub provenance: Option<String>,
pub predicted: bool,
pub granularity: TemporalGranularity,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TemporalGranularity {
Nanosecond,
Microsecond,
Millisecond,
Second,
Minute,
Hour,
Day,
Month,
Year,
}
pub struct TemporalStorage {
config: TemporalConfig,
buckets: Arc<RwLock<BTreeMap<DateTime<Utc>, Bucket>>>,
#[allow(dead_code)]
interval_index: Arc<RwLock<IntervalIndex>>,
entity_index: Arc<RwLock<EntityIndex>>,
change_index: Arc<RwLock<ChangeIndex>>,
stats: Arc<RwLock<TemporalStats>>,
}
struct Bucket {
#[allow(dead_code)]
start_time: DateTime<Utc>,
triples: Vec<TemporalTriple>,
stats: BucketStats,
}
#[derive(Debug, Default)]
struct BucketStats {
triple_count: usize,
#[allow(dead_code)]
compressed_size: Option<usize>,
last_access: DateTime<Utc>,
}
struct IntervalIndex {
#[allow(dead_code)]
intervals: IntervalTree<DateTime<Utc>, TemporalTriple>,
}
struct EntityIndex {
entity_history: HashMap<String, EntityHistory>,
}
#[derive(Debug, Clone)]
pub struct EntityHistory {
states: BTreeMap<DateTime<Utc>, EntityState>,
#[allow(dead_code)]
changes: Vec<ChangeEvent>,
}
#[derive(Debug, Clone)]
struct EntityState {
properties: HashMap<String, Vec<Literal>>,
relationships: HashMap<String, Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct ChangeEvent {
#[allow(dead_code)]
timestamp: DateTime<Utc>,
#[allow(dead_code)]
change_type: ChangeType,
property: String,
#[allow(dead_code)]
old_value: Option<Term>,
#[allow(dead_code)]
new_value: Option<Term>,
}
#[derive(Debug, Clone)]
enum ChangeType {
Insert,
#[allow(dead_code)]
Update,
#[allow(dead_code)]
Delete,
}
struct ChangeIndex {
recent_changes: VecDeque<ChangeEvent>,
property_changes: HashMap<String, Vec<ChangeEvent>>,
}
#[derive(Debug, Default)]
struct TemporalStats {
total_triples: u64,
active_triples: u64,
historical_triples: u64,
#[allow(dead_code)]
total_buckets: u64,
#[allow(dead_code)]
compression_ratio: f64,
#[allow(dead_code)]
avg_query_time_ms: f64,
}
struct IntervalTree<K, V> {
_key: std::marker::PhantomData<K>,
_value: std::marker::PhantomData<V>,
}
impl<K, V> IntervalTree<K, V> {
fn new() -> Self {
IntervalTree {
_key: std::marker::PhantomData,
_value: std::marker::PhantomData,
}
}
}
impl TemporalStorage {
pub async fn new(config: TemporalConfig) -> Result<Self, OxirsError> {
std::fs::create_dir_all(&config.path)?;
Ok(TemporalStorage {
config,
buckets: Arc::new(RwLock::new(BTreeMap::new())),
interval_index: Arc::new(RwLock::new(IntervalIndex {
intervals: IntervalTree::new(),
})),
entity_index: Arc::new(RwLock::new(EntityIndex {
entity_history: HashMap::new(),
})),
change_index: Arc::new(RwLock::new(ChangeIndex {
recent_changes: VecDeque::with_capacity(10000),
property_changes: HashMap::new(),
})),
stats: Arc::new(RwLock::new(TemporalStats::default())),
})
}
pub async fn store_temporal(
&self,
triple: Triple,
valid_from: DateTime<Utc>,
valid_to: Option<DateTime<Utc>>,
metadata: Option<TemporalMetadata>,
) -> Result<(), OxirsError> {
let temporal_triple = TemporalTriple {
triple: triple.clone(),
valid_from,
valid_to,
transaction_time: Utc::now(),
metadata: metadata.unwrap_or(TemporalMetadata {
certainty: None,
provenance: None,
predicted: false,
granularity: TemporalGranularity::Second,
}),
};
let bucket_time = self.get_bucket_time(valid_from);
{
let mut buckets = self.buckets.write().await;
let bucket = buckets.entry(bucket_time).or_insert_with(|| Bucket {
start_time: bucket_time,
triples: Vec::new(),
stats: BucketStats::default(),
});
bucket.triples.push(temporal_triple.clone());
bucket.stats.triple_count += 1;
bucket.stats.last_access = Utc::now();
}
if self.config.indexing.entity_index {
self.update_entity_index(&temporal_triple).await?;
}
if self.config.indexing.change_index {
self.update_change_index(&temporal_triple).await?;
}
let mut stats = self.stats.write().await;
stats.total_triples += 1;
if valid_to.is_none() {
stats.active_triples += 1;
} else {
stats.historical_triples += 1;
}
Ok(())
}
pub async fn query_at_time(
&self,
pattern: &TriplePattern,
time: DateTime<Utc>,
) -> Result<Vec<Triple>, OxirsError> {
let mut results = Vec::new();
let buckets = self.buckets.read().await;
for (_, bucket) in buckets.iter() {
for temporal in &bucket.triples {
if temporal.valid_from <= time {
if let Some(valid_to) = temporal.valid_to {
if valid_to < time {
continue;
}
}
if pattern.matches(&temporal.triple) {
results.push(temporal.triple.clone());
}
}
}
}
Ok(results)
}
pub async fn query_time_range(
&self,
pattern: &TriplePattern,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<TemporalTriple>, OxirsError> {
let mut results = Vec::new();
let start_bucket = self.get_bucket_time(start);
let end_bucket = self.get_bucket_time(end);
let buckets = self.buckets.read().await;
let range = buckets.range((Bound::Included(start_bucket), Bound::Included(end_bucket)));
for (_, bucket) in range {
for temporal in &bucket.triples {
if temporal.valid_from <= end {
if let Some(valid_to) = temporal.valid_to {
if valid_to < start {
continue;
}
}
if pattern.matches(&temporal.triple) {
results.push(temporal.clone());
}
}
}
}
Ok(results)
}
pub async fn get_entity_history(
&self,
entity_uri: &str,
) -> Result<Option<EntityHistory>, OxirsError> {
let entity_index = self.entity_index.read().await;
Ok(entity_index.entity_history.get(entity_uri).cloned())
}
pub async fn get_recent_changes(&self, limit: usize) -> Result<Vec<ChangeEvent>, OxirsError> {
let change_index = self.change_index.read().await;
Ok(change_index
.recent_changes
.iter()
.take(limit)
.cloned()
.collect())
}
pub async fn temporal_reason(
&self,
query: TemporalQuery,
) -> Result<TemporalResult, OxirsError> {
match query {
TemporalQuery::AllenRelation {
triple1: _,
triple2: _,
relation: _,
} => {
Ok(TemporalResult::Boolean(false)) }
TemporalQuery::TemporalPath {
start: _,
end: _,
predicate: _,
max_hops: _,
} => {
Ok(TemporalResult::Paths(Vec::new())) }
TemporalQuery::ChangeDetection {
entity: _,
property: _,
threshold: _,
} => {
Ok(TemporalResult::Changes(Vec::new())) }
TemporalQuery::TrendAnalysis {
entity: _,
property: _,
window: _,
} => {
Ok(TemporalResult::Trend(TrendData::default())) }
}
}
pub async fn apply_retention(&self) -> Result<usize, OxirsError> {
let mut removed = 0;
let now = Utc::now();
let mut buckets = self.buckets.write().await;
let mut to_remove = Vec::new();
for (bucket_time, bucket) in buckets.iter_mut() {
match &self.config.retention {
RetentionPolicy::Days(days) => {
let cutoff = now - Duration::days(*days as i64);
if *bucket_time < cutoff {
to_remove.push(*bucket_time);
removed += bucket.triples.len();
}
}
RetentionPolicy::Months(months) => {
let cutoff = now - Duration::days((*months as i64) * 30);
if *bucket_time < cutoff {
to_remove.push(*bucket_time);
removed += bucket.triples.len();
}
}
_ => {} }
}
for bucket_time in to_remove {
buckets.remove(&bucket_time);
}
let mut stats = self.stats.write().await;
stats.total_triples = stats.total_triples.saturating_sub(removed as u64);
Ok(removed)
}
fn get_bucket_time(&self, time: DateTime<Utc>) -> DateTime<Utc> {
let bucket_seconds = self.config.bucket_duration.num_seconds();
let timestamp = time.timestamp();
let bucket_timestamp = (timestamp / bucket_seconds) * bucket_seconds;
DateTime::from_timestamp(bucket_timestamp, 0).expect("bucket timestamp should be valid")
}
async fn update_entity_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
let mut entity_index = self.entity_index.write().await;
let entity_uri = match temporal.triple.subject() {
crate::model::Subject::NamedNode(nn) => nn.as_str().to_string(),
_ => return Ok(()), };
let history = entity_index
.entity_history
.entry(entity_uri)
.or_insert_with(|| EntityHistory {
states: BTreeMap::new(),
changes: Vec::new(),
});
let state = history
.states
.entry(temporal.valid_from)
.or_insert_with(|| EntityState {
properties: HashMap::new(),
relationships: HashMap::new(),
});
let predicate_uri = match temporal.triple.predicate() {
crate::model::Predicate::NamedNode(nn) => nn.as_str(),
crate::model::Predicate::Variable(v) => v.as_str(),
};
match temporal.triple.object() {
crate::model::Object::Literal(lit) => {
state
.properties
.entry(predicate_uri.to_string())
.or_insert_with(Vec::new)
.push(lit.clone());
}
crate::model::Object::NamedNode(nn) => {
state
.relationships
.entry(predicate_uri.to_string())
.or_insert_with(Vec::new)
.push(nn.as_str().to_string());
}
_ => {}
}
Ok(())
}
async fn update_change_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
let mut change_index = self.change_index.write().await;
let change = ChangeEvent {
timestamp: temporal.valid_from,
change_type: ChangeType::Insert,
property: match temporal.triple.predicate() {
crate::model::Predicate::NamedNode(nn) => nn.as_str(),
crate::model::Predicate::Variable(v) => v.as_str(),
}
.to_string(),
old_value: None,
new_value: Some(Term::from_object(temporal.triple.object())),
};
change_index.recent_changes.push_front(change.clone());
if change_index.recent_changes.len() > 10000 {
change_index.recent_changes.pop_back();
}
change_index
.property_changes
.entry(change.property.clone())
.or_insert_with(Vec::new)
.push(change);
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum TemporalQuery {
AllenRelation {
triple1: Box<TemporalTriple>,
triple2: Box<TemporalTriple>,
relation: AllenRelation,
},
TemporalPath {
start: String,
end: String,
predicate: Option<String>,
max_hops: usize,
},
ChangeDetection {
entity: String,
property: String,
threshold: f64,
},
TrendAnalysis {
entity: String,
property: String,
window: Duration,
},
}
#[derive(Debug, Clone)]
pub enum AllenRelation {
Before,
After,
Meets,
MetBy,
Overlaps,
OverlappedBy,
Starts,
StartedBy,
During,
Contains,
Finishes,
FinishedBy,
Equals,
}
#[derive(Debug)]
pub enum TemporalResult {
Boolean(bool),
Paths(Vec<Vec<TemporalTriple>>),
Changes(Vec<ChangeEvent>),
Trend(TrendData),
}
#[derive(Debug, Default)]
pub struct TrendData {
pub slope: f64,
pub intercept: f64,
pub r_squared: f64,
pub predictions: Vec<(DateTime<Utc>, f64)>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::NamedNode;
#[tokio::test]
async fn test_temporal_storage() {
let config = TemporalConfig {
path: PathBuf::from("/tmp/oxirs_temporal_test"),
..Default::default()
};
let storage = TemporalStorage::new(config)
.await
.expect("async operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/person1").expect("valid IRI"),
NamedNode::new("http://example.org/age").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("25")),
);
let valid_from = Utc::now() - Duration::days(365);
let valid_to = Some(Utc::now() - Duration::days(180));
storage
.store_temporal(triple.clone(), valid_from, valid_to, None)
.await
.expect("operation should succeed");
let query_time = Utc::now() - Duration::days(270);
let pattern = TriplePattern::new(
Some(crate::model::SubjectPattern::NamedNode(
NamedNode::new("http://example.org/person1").expect("valid IRI"),
)),
None,
None,
);
let results = storage
.query_at_time(&pattern, query_time)
.await
.expect("async operation should succeed");
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple);
let current_results = storage
.query_at_time(&pattern, Utc::now())
.await
.expect("async operation should succeed");
assert_eq!(current_results.len(), 0);
}
#[tokio::test]
async fn test_entity_history() {
let config = TemporalConfig {
path: PathBuf::from("/tmp/oxirs_temporal_history"),
..Default::default()
};
let storage = TemporalStorage::new(config)
.await
.expect("async operation should succeed");
let entity = "http://example.org/person1";
for age in 20..=25 {
let triple = Triple::new(
NamedNode::new(entity).expect("valid IRI"),
NamedNode::new("http://example.org/age").expect("valid IRI"),
crate::model::Object::Literal(Literal::new(age.to_string())),
);
let valid_from = Utc::now() - Duration::days((26 - age) as i64 * 365);
storage
.store_temporal(triple, valid_from, None, None)
.await
.expect("operation should succeed");
}
let history = storage
.get_entity_history(entity)
.await
.expect("async operation should succeed");
assert!(history.is_some());
let history = history.expect("history should be available");
assert_eq!(history.states.len(), 6);
}
}