1use crate::model::{Literal, Term, Triple, TriplePattern};
7use crate::OxirsError;
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, HashMap, VecDeque};
11use std::ops::Bound;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone)]
18pub struct TemporalConfig {
19 pub path: PathBuf,
21 pub bucket_duration: Duration,
23 pub retention: RetentionPolicy,
25 pub indexing: TemporalIndexing,
27 pub compression: bool,
29}
30
31impl Default for TemporalConfig {
32 fn default() -> Self {
33 TemporalConfig {
34 path: PathBuf::from("/var/oxirs/temporal"),
35 bucket_duration: Duration::hours(1),
36 retention: RetentionPolicy::Days(365),
37 indexing: TemporalIndexing::default(),
38 compression: true,
39 }
40 }
41}
42
43#[derive(Clone)]
45pub enum RetentionPolicy {
46 Forever,
48 Days(u32),
50 Months(u32),
52 Versions(u32),
54 Custom(Arc<dyn Fn(&TemporalTriple) -> bool + Send + Sync>),
56}
57
58impl std::fmt::Debug for RetentionPolicy {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 RetentionPolicy::Forever => write!(f, "Forever"),
62 RetentionPolicy::Days(n) => write!(f, "Days({n})"),
63 RetentionPolicy::Months(n) => write!(f, "Months({n})"),
64 RetentionPolicy::Versions(n) => write!(f, "Versions({n})"),
65 RetentionPolicy::Custom(_) => write!(f, "Custom(<function>)"),
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct TemporalIndexing {
73 pub interval_index: bool,
75 pub entity_index: bool,
77 pub change_index: bool,
79 pub allen_relations: bool,
81}
82
83impl Default for TemporalIndexing {
84 fn default() -> Self {
85 TemporalIndexing {
86 interval_index: true,
87 entity_index: true,
88 change_index: true,
89 allen_relations: false,
90 }
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct TemporalTriple {
97 pub triple: Triple,
99 pub valid_from: DateTime<Utc>,
101 pub valid_to: Option<DateTime<Utc>>,
103 pub transaction_time: DateTime<Utc>,
105 pub metadata: TemporalMetadata,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct TemporalMetadata {
112 pub certainty: Option<f64>,
114 pub provenance: Option<String>,
116 pub predicted: bool,
118 pub granularity: TemporalGranularity,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum TemporalGranularity {
125 Nanosecond,
126 Microsecond,
127 Millisecond,
128 Second,
129 Minute,
130 Hour,
131 Day,
132 Month,
133 Year,
134}
135
136pub struct TemporalStorage {
138 config: TemporalConfig,
139 buckets: Arc<RwLock<BTreeMap<DateTime<Utc>, Bucket>>>,
141 #[allow(dead_code)]
143 interval_index: Arc<RwLock<IntervalIndex>>,
144 entity_index: Arc<RwLock<EntityIndex>>,
146 change_index: Arc<RwLock<ChangeIndex>>,
148 stats: Arc<RwLock<TemporalStats>>,
150}
151
152struct Bucket {
154 #[allow(dead_code)]
156 start_time: DateTime<Utc>,
157 triples: Vec<TemporalTriple>,
159 stats: BucketStats,
161}
162
163#[derive(Debug, Default)]
165struct BucketStats {
166 triple_count: usize,
167 #[allow(dead_code)]
168 compressed_size: Option<usize>,
169 last_access: DateTime<Utc>,
170}
171
172struct IntervalIndex {
174 #[allow(dead_code)]
176 intervals: IntervalTree<DateTime<Utc>, TemporalTriple>,
177}
178
179struct EntityIndex {
181 entity_history: HashMap<String, EntityHistory>,
183}
184
185#[derive(Debug, Clone)]
187pub struct EntityHistory {
188 states: BTreeMap<DateTime<Utc>, EntityState>,
190 #[allow(dead_code)]
192 changes: Vec<ChangeEvent>,
193}
194
195#[derive(Debug, Clone)]
197struct EntityState {
198 properties: HashMap<String, Vec<Literal>>,
200 relationships: HashMap<String, Vec<String>>,
202}
203
204#[derive(Debug, Clone)]
206pub struct ChangeEvent {
207 #[allow(dead_code)]
209 timestamp: DateTime<Utc>,
210 #[allow(dead_code)]
212 change_type: ChangeType,
213 property: String,
215 #[allow(dead_code)]
217 old_value: Option<Term>,
218 #[allow(dead_code)]
220 new_value: Option<Term>,
221}
222
223#[derive(Debug, Clone)]
225enum ChangeType {
226 Insert,
227 #[allow(dead_code)]
228 Update,
229 #[allow(dead_code)]
230 Delete,
231}
232
233struct ChangeIndex {
235 recent_changes: VecDeque<ChangeEvent>,
237 property_changes: HashMap<String, Vec<ChangeEvent>>,
239}
240
241#[derive(Debug, Default)]
243struct TemporalStats {
244 total_triples: u64,
245 active_triples: u64,
246 historical_triples: u64,
247 #[allow(dead_code)]
248 total_buckets: u64,
249 #[allow(dead_code)]
250 compression_ratio: f64,
251 #[allow(dead_code)]
252 avg_query_time_ms: f64,
253}
254
255struct IntervalTree<K, V> {
257 _key: std::marker::PhantomData<K>,
258 _value: std::marker::PhantomData<V>,
259}
260
261impl<K, V> IntervalTree<K, V> {
262 fn new() -> Self {
263 IntervalTree {
264 _key: std::marker::PhantomData,
265 _value: std::marker::PhantomData,
266 }
267 }
268}
269
270impl TemporalStorage {
271 pub async fn new(config: TemporalConfig) -> Result<Self, OxirsError> {
273 std::fs::create_dir_all(&config.path)?;
274
275 Ok(TemporalStorage {
276 config,
277 buckets: Arc::new(RwLock::new(BTreeMap::new())),
278 interval_index: Arc::new(RwLock::new(IntervalIndex {
279 intervals: IntervalTree::new(),
280 })),
281 entity_index: Arc::new(RwLock::new(EntityIndex {
282 entity_history: HashMap::new(),
283 })),
284 change_index: Arc::new(RwLock::new(ChangeIndex {
285 recent_changes: VecDeque::with_capacity(10000),
286 property_changes: HashMap::new(),
287 })),
288 stats: Arc::new(RwLock::new(TemporalStats::default())),
289 })
290 }
291
292 pub async fn store_temporal(
294 &self,
295 triple: Triple,
296 valid_from: DateTime<Utc>,
297 valid_to: Option<DateTime<Utc>>,
298 metadata: Option<TemporalMetadata>,
299 ) -> Result<(), OxirsError> {
300 let temporal_triple = TemporalTriple {
301 triple: triple.clone(),
302 valid_from,
303 valid_to,
304 transaction_time: Utc::now(),
305 metadata: metadata.unwrap_or(TemporalMetadata {
306 certainty: None,
307 provenance: None,
308 predicted: false,
309 granularity: TemporalGranularity::Second,
310 }),
311 };
312
313 let bucket_time = self.get_bucket_time(valid_from);
315
316 {
318 let mut buckets = self.buckets.write().await;
319 let bucket = buckets.entry(bucket_time).or_insert_with(|| Bucket {
320 start_time: bucket_time,
321 triples: Vec::new(),
322 stats: BucketStats::default(),
323 });
324
325 bucket.triples.push(temporal_triple.clone());
326 bucket.stats.triple_count += 1;
327 bucket.stats.last_access = Utc::now();
328 }
329
330 if self.config.indexing.entity_index {
332 self.update_entity_index(&temporal_triple).await?;
333 }
334
335 if self.config.indexing.change_index {
336 self.update_change_index(&temporal_triple).await?;
337 }
338
339 let mut stats = self.stats.write().await;
341 stats.total_triples += 1;
342 if valid_to.is_none() {
343 stats.active_triples += 1;
344 } else {
345 stats.historical_triples += 1;
346 }
347
348 Ok(())
349 }
350
351 pub async fn query_at_time(
353 &self,
354 pattern: &TriplePattern,
355 time: DateTime<Utc>,
356 ) -> Result<Vec<Triple>, OxirsError> {
357 let mut results = Vec::new();
358
359 let buckets = self.buckets.read().await;
361 for (_, bucket) in buckets.iter() {
362 for temporal in &bucket.triples {
363 if temporal.valid_from <= time {
365 if let Some(valid_to) = temporal.valid_to {
366 if valid_to < time {
367 continue;
368 }
369 }
370
371 if pattern.matches(&temporal.triple) {
373 results.push(temporal.triple.clone());
374 }
375 }
376 }
377 }
378
379 Ok(results)
380 }
381
382 pub async fn query_time_range(
384 &self,
385 pattern: &TriplePattern,
386 start: DateTime<Utc>,
387 end: DateTime<Utc>,
388 ) -> Result<Vec<TemporalTriple>, OxirsError> {
389 let mut results = Vec::new();
390
391 let start_bucket = self.get_bucket_time(start);
393 let end_bucket = self.get_bucket_time(end);
394
395 let buckets = self.buckets.read().await;
396 let range = buckets.range((Bound::Included(start_bucket), Bound::Included(end_bucket)));
397
398 for (_, bucket) in range {
399 for temporal in &bucket.triples {
400 if temporal.valid_from <= end {
402 if let Some(valid_to) = temporal.valid_to {
403 if valid_to < start {
404 continue;
405 }
406 }
407
408 if pattern.matches(&temporal.triple) {
410 results.push(temporal.clone());
411 }
412 }
413 }
414 }
415
416 Ok(results)
417 }
418
419 pub async fn get_entity_history(
421 &self,
422 entity_uri: &str,
423 ) -> Result<Option<EntityHistory>, OxirsError> {
424 let entity_index = self.entity_index.read().await;
425 Ok(entity_index.entity_history.get(entity_uri).cloned())
426 }
427
428 pub async fn get_recent_changes(&self, limit: usize) -> Result<Vec<ChangeEvent>, OxirsError> {
430 let change_index = self.change_index.read().await;
431 Ok(change_index
432 .recent_changes
433 .iter()
434 .take(limit)
435 .cloned()
436 .collect())
437 }
438
439 pub async fn temporal_reason(
441 &self,
442 query: TemporalQuery,
443 ) -> Result<TemporalResult, OxirsError> {
444 match query {
445 TemporalQuery::AllenRelation {
446 triple1: _,
447 triple2: _,
448 relation: _,
449 } => {
450 Ok(TemporalResult::Boolean(false)) }
453 TemporalQuery::TemporalPath {
454 start: _,
455 end: _,
456 predicate: _,
457 max_hops: _,
458 } => {
459 Ok(TemporalResult::Paths(Vec::new())) }
462 TemporalQuery::ChangeDetection {
463 entity: _,
464 property: _,
465 threshold: _,
466 } => {
467 Ok(TemporalResult::Changes(Vec::new())) }
470 TemporalQuery::TrendAnalysis {
471 entity: _,
472 property: _,
473 window: _,
474 } => {
475 Ok(TemporalResult::Trend(TrendData::default())) }
478 }
479 }
480
481 pub async fn apply_retention(&self) -> Result<usize, OxirsError> {
483 let mut removed = 0;
484 let now = Utc::now();
485
486 let mut buckets = self.buckets.write().await;
487 let mut to_remove = Vec::new();
488
489 for (bucket_time, bucket) in buckets.iter_mut() {
490 match &self.config.retention {
491 RetentionPolicy::Days(days) => {
492 let cutoff = now - Duration::days(*days as i64);
493 if *bucket_time < cutoff {
494 to_remove.push(*bucket_time);
495 removed += bucket.triples.len();
496 }
497 }
498 RetentionPolicy::Months(months) => {
499 let cutoff = now - Duration::days((*months as i64) * 30);
500 if *bucket_time < cutoff {
501 to_remove.push(*bucket_time);
502 removed += bucket.triples.len();
503 }
504 }
505 _ => {} }
507 }
508
509 for bucket_time in to_remove {
510 buckets.remove(&bucket_time);
511 }
512
513 let mut stats = self.stats.write().await;
515 stats.total_triples = stats.total_triples.saturating_sub(removed as u64);
516
517 Ok(removed)
518 }
519
520 fn get_bucket_time(&self, time: DateTime<Utc>) -> DateTime<Utc> {
522 let bucket_seconds = self.config.bucket_duration.num_seconds();
523 let timestamp = time.timestamp();
524 let bucket_timestamp = (timestamp / bucket_seconds) * bucket_seconds;
525 DateTime::from_timestamp(bucket_timestamp, 0).expect("bucket timestamp should be valid")
526 }
527
528 async fn update_entity_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
530 let mut entity_index = self.entity_index.write().await;
531
532 let entity_uri = match temporal.triple.subject() {
534 crate::model::Subject::NamedNode(nn) => nn.as_str().to_string(),
535 _ => return Ok(()), };
537
538 let history = entity_index
539 .entity_history
540 .entry(entity_uri)
541 .or_insert_with(|| EntityHistory {
542 states: BTreeMap::new(),
543 changes: Vec::new(),
544 });
545
546 let state = history
548 .states
549 .entry(temporal.valid_from)
550 .or_insert_with(|| EntityState {
551 properties: HashMap::new(),
552 relationships: HashMap::new(),
553 });
554
555 let predicate_uri = match temporal.triple.predicate() {
557 crate::model::Predicate::NamedNode(nn) => nn.as_str(),
558 crate::model::Predicate::Variable(v) => v.as_str(),
559 };
560 match temporal.triple.object() {
561 crate::model::Object::Literal(lit) => {
562 state
563 .properties
564 .entry(predicate_uri.to_string())
565 .or_insert_with(Vec::new)
566 .push(lit.clone());
567 }
568 crate::model::Object::NamedNode(nn) => {
569 state
570 .relationships
571 .entry(predicate_uri.to_string())
572 .or_insert_with(Vec::new)
573 .push(nn.as_str().to_string());
574 }
575 _ => {}
576 }
577
578 Ok(())
579 }
580
581 async fn update_change_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
583 let mut change_index = self.change_index.write().await;
584
585 let change = ChangeEvent {
586 timestamp: temporal.valid_from,
587 change_type: ChangeType::Insert,
588 property: match temporal.triple.predicate() {
589 crate::model::Predicate::NamedNode(nn) => nn.as_str(),
590 crate::model::Predicate::Variable(v) => v.as_str(),
591 }
592 .to_string(),
593 old_value: None,
594 new_value: Some(Term::from_object(temporal.triple.object())),
595 };
596
597 change_index.recent_changes.push_front(change.clone());
599 if change_index.recent_changes.len() > 10000 {
600 change_index.recent_changes.pop_back();
601 }
602
603 change_index
605 .property_changes
606 .entry(change.property.clone())
607 .or_insert_with(Vec::new)
608 .push(change);
609
610 Ok(())
611 }
612}
613
614#[derive(Debug, Clone)]
616pub enum TemporalQuery {
617 AllenRelation {
619 triple1: Box<TemporalTriple>,
620 triple2: Box<TemporalTriple>,
621 relation: AllenRelation,
622 },
623 TemporalPath {
625 start: String,
626 end: String,
627 predicate: Option<String>,
628 max_hops: usize,
629 },
630 ChangeDetection {
632 entity: String,
633 property: String,
634 threshold: f64,
635 },
636 TrendAnalysis {
638 entity: String,
639 property: String,
640 window: Duration,
641 },
642}
643
644#[derive(Debug, Clone)]
646pub enum AllenRelation {
647 Before,
648 After,
649 Meets,
650 MetBy,
651 Overlaps,
652 OverlappedBy,
653 Starts,
654 StartedBy,
655 During,
656 Contains,
657 Finishes,
658 FinishedBy,
659 Equals,
660}
661
662#[derive(Debug)]
664pub enum TemporalResult {
665 Boolean(bool),
666 Paths(Vec<Vec<TemporalTriple>>),
667 Changes(Vec<ChangeEvent>),
668 Trend(TrendData),
669}
670
671#[derive(Debug, Default)]
673pub struct TrendData {
674 pub slope: f64,
675 pub intercept: f64,
676 pub r_squared: f64,
677 pub predictions: Vec<(DateTime<Utc>, f64)>,
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use crate::model::NamedNode;
684
685 #[tokio::test]
686 async fn test_temporal_storage() {
687 let config = TemporalConfig {
688 path: PathBuf::from("/tmp/oxirs_temporal_test"),
689 ..Default::default()
690 };
691
692 let storage = TemporalStorage::new(config)
693 .await
694 .expect("async operation should succeed");
695
696 let triple = Triple::new(
698 NamedNode::new("http://example.org/person1").expect("valid IRI"),
699 NamedNode::new("http://example.org/age").expect("valid IRI"),
700 crate::model::Object::Literal(Literal::new("25")),
701 );
702
703 let valid_from = Utc::now() - Duration::days(365);
704 let valid_to = Some(Utc::now() - Duration::days(180));
705
706 storage
708 .store_temporal(triple.clone(), valid_from, valid_to, None)
709 .await
710 .expect("operation should succeed");
711
712 let query_time = Utc::now() - Duration::days(270);
714 let pattern = TriplePattern::new(
715 Some(crate::model::SubjectPattern::NamedNode(
716 NamedNode::new("http://example.org/person1").expect("valid IRI"),
717 )),
718 None,
719 None,
720 );
721
722 let results = storage
723 .query_at_time(&pattern, query_time)
724 .await
725 .expect("async operation should succeed");
726 assert_eq!(results.len(), 1);
727 assert_eq!(results[0], triple);
728
729 let current_results = storage
731 .query_at_time(&pattern, Utc::now())
732 .await
733 .expect("async operation should succeed");
734 assert_eq!(current_results.len(), 0);
735 }
736
737 #[tokio::test]
738 async fn test_entity_history() {
739 let config = TemporalConfig {
740 path: PathBuf::from("/tmp/oxirs_temporal_history"),
741 ..Default::default()
742 };
743
744 let storage = TemporalStorage::new(config)
745 .await
746 .expect("async operation should succeed");
747
748 let entity = "http://example.org/person1";
749
750 for age in 20..=25 {
752 let triple = Triple::new(
753 NamedNode::new(entity).expect("valid IRI"),
754 NamedNode::new("http://example.org/age").expect("valid IRI"),
755 crate::model::Object::Literal(Literal::new(age.to_string())),
756 );
757
758 let valid_from = Utc::now() - Duration::days((26 - age) as i64 * 365);
759 storage
760 .store_temporal(triple, valid_from, None, None)
761 .await
762 .expect("operation should succeed");
763 }
764
765 let history = storage
767 .get_entity_history(entity)
768 .await
769 .expect("async operation should succeed");
770 assert!(history.is_some());
771
772 let history = history.expect("history should be available");
773 assert_eq!(history.states.len(), 6);
774 }
775}