Skip to main content

oxirs_core/storage/
temporal.rs

1//! Time-series optimization for temporal RDF
2//!
3//! This module provides specialized storage for temporal RDF data,
4//! optimizing for time-based queries and temporal reasoning.
5
6use crate::model::{Literal, Term, Triple, TriplePattern};
7use crate::OxirsError;
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, HashMap, VecDeque};
11use std::ops::Bound;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Temporal storage configuration
17#[derive(Debug, Clone)]
18pub struct TemporalConfig {
19    /// Base path for temporal data
20    pub path: PathBuf,
21    /// Time bucket duration
22    pub bucket_duration: Duration,
23    /// Retention policy
24    pub retention: RetentionPolicy,
25    /// Indexing strategy
26    pub indexing: TemporalIndexing,
27    /// Enable temporal compression
28    pub compression: bool,
29}
30
31impl Default for TemporalConfig {
32    fn default() -> Self {
33        TemporalConfig {
34            path: PathBuf::from("/var/oxirs/temporal"),
35            bucket_duration: Duration::hours(1),
36            retention: RetentionPolicy::Days(365),
37            indexing: TemporalIndexing::default(),
38            compression: true,
39        }
40    }
41}
42
43/// Retention policy for temporal data
44#[derive(Clone)]
45pub enum RetentionPolicy {
46    /// Keep data forever
47    Forever,
48    /// Keep data for N days
49    Days(u32),
50    /// Keep data for N months
51    Months(u32),
52    /// Keep last N versions
53    Versions(u32),
54    /// Custom policy function
55    Custom(Arc<dyn Fn(&TemporalTriple) -> bool + Send + Sync>),
56}
57
58impl std::fmt::Debug for RetentionPolicy {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            RetentionPolicy::Forever => write!(f, "Forever"),
62            RetentionPolicy::Days(n) => write!(f, "Days({n})"),
63            RetentionPolicy::Months(n) => write!(f, "Months({n})"),
64            RetentionPolicy::Versions(n) => write!(f, "Versions({n})"),
65            RetentionPolicy::Custom(_) => write!(f, "Custom(<function>)"),
66        }
67    }
68}
69
70/// Temporal indexing strategy
71#[derive(Debug, Clone)]
72pub struct TemporalIndexing {
73    /// Index by time intervals
74    pub interval_index: bool,
75    /// Index by entity history
76    pub entity_index: bool,
77    /// Index by change events
78    pub change_index: bool,
79    /// Enable Allen interval relations
80    pub allen_relations: bool,
81}
82
83impl Default for TemporalIndexing {
84    fn default() -> Self {
85        TemporalIndexing {
86            interval_index: true,
87            entity_index: true,
88            change_index: true,
89            allen_relations: false,
90        }
91    }
92}
93
94/// Temporal triple with time metadata
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct TemporalTriple {
97    /// The base triple
98    pub triple: Triple,
99    /// Valid time start
100    pub valid_from: DateTime<Utc>,
101    /// Valid time end (None means currently valid)
102    pub valid_to: Option<DateTime<Utc>>,
103    /// Transaction time
104    pub transaction_time: DateTime<Utc>,
105    /// Additional temporal metadata
106    pub metadata: TemporalMetadata,
107}
108
109/// Temporal metadata
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct TemporalMetadata {
112    /// Certainty factor (0.0 - 1.0)
113    pub certainty: Option<f64>,
114    /// Provenance information
115    pub provenance: Option<String>,
116    /// Is this a predicted/inferred value
117    pub predicted: bool,
118    /// Temporal granularity
119    pub granularity: TemporalGranularity,
120}
121
122/// Temporal granularity
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum TemporalGranularity {
125    Nanosecond,
126    Microsecond,
127    Millisecond,
128    Second,
129    Minute,
130    Hour,
131    Day,
132    Month,
133    Year,
134}
135
136/// Temporal storage engine
137pub struct TemporalStorage {
138    config: TemporalConfig,
139    /// Time-bucketed storage
140    buckets: Arc<RwLock<BTreeMap<DateTime<Utc>, Bucket>>>,
141    /// Interval index
142    #[allow(dead_code)]
143    interval_index: Arc<RwLock<IntervalIndex>>,
144    /// Entity history index
145    entity_index: Arc<RwLock<EntityIndex>>,
146    /// Change event index
147    change_index: Arc<RwLock<ChangeIndex>>,
148    /// Statistics
149    stats: Arc<RwLock<TemporalStats>>,
150}
151
152/// Time bucket for efficient temporal storage
153struct Bucket {
154    /// Start time of bucket
155    #[allow(dead_code)]
156    start_time: DateTime<Utc>,
157    /// Triples in this bucket
158    triples: Vec<TemporalTriple>,
159    /// Bucket statistics
160    stats: BucketStats,
161}
162
163/// Bucket statistics
164#[derive(Debug, Default)]
165struct BucketStats {
166    triple_count: usize,
167    #[allow(dead_code)]
168    compressed_size: Option<usize>,
169    last_access: DateTime<Utc>,
170}
171
172/// Interval index for temporal queries
173struct IntervalIndex {
174    /// Interval tree for efficient range queries
175    #[allow(dead_code)]
176    intervals: IntervalTree<DateTime<Utc>, TemporalTriple>,
177}
178
179/// Entity history index
180struct EntityIndex {
181    /// Entity URI to history mapping
182    entity_history: HashMap<String, EntityHistory>,
183}
184
185/// Entity history
186#[derive(Debug, Clone)]
187pub struct EntityHistory {
188    /// Chronological list of states
189    states: BTreeMap<DateTime<Utc>, EntityState>,
190    /// Change events
191    #[allow(dead_code)]
192    changes: Vec<ChangeEvent>,
193}
194
195/// Entity state at a point in time
196#[derive(Debug, Clone)]
197struct EntityState {
198    /// Properties at this time
199    properties: HashMap<String, Vec<Literal>>,
200    /// Relationships at this time
201    relationships: HashMap<String, Vec<String>>,
202}
203
204/// Change event
205#[derive(Debug, Clone)]
206pub struct ChangeEvent {
207    /// Time of change
208    #[allow(dead_code)]
209    timestamp: DateTime<Utc>,
210    /// Type of change
211    #[allow(dead_code)]
212    change_type: ChangeType,
213    /// Changed property/relationship
214    property: String,
215    /// Old value
216    #[allow(dead_code)]
217    old_value: Option<Term>,
218    /// New value
219    #[allow(dead_code)]
220    new_value: Option<Term>,
221}
222
223/// Type of change
224#[derive(Debug, Clone)]
225enum ChangeType {
226    Insert,
227    #[allow(dead_code)]
228    Update,
229    #[allow(dead_code)]
230    Delete,
231}
232
233/// Change event index
234struct ChangeIndex {
235    /// Recent changes queue
236    recent_changes: VecDeque<ChangeEvent>,
237    /// Changes by property
238    property_changes: HashMap<String, Vec<ChangeEvent>>,
239}
240
241/// Temporal statistics
242#[derive(Debug, Default)]
243struct TemporalStats {
244    total_triples: u64,
245    active_triples: u64,
246    historical_triples: u64,
247    #[allow(dead_code)]
248    total_buckets: u64,
249    #[allow(dead_code)]
250    compression_ratio: f64,
251    #[allow(dead_code)]
252    avg_query_time_ms: f64,
253}
254
255/// Interval tree for temporal queries (simplified placeholder)
256struct IntervalTree<K, V> {
257    _key: std::marker::PhantomData<K>,
258    _value: std::marker::PhantomData<V>,
259}
260
261impl<K, V> IntervalTree<K, V> {
262    fn new() -> Self {
263        IntervalTree {
264            _key: std::marker::PhantomData,
265            _value: std::marker::PhantomData,
266        }
267    }
268}
269
270impl TemporalStorage {
271    /// Create new temporal storage
272    pub async fn new(config: TemporalConfig) -> Result<Self, OxirsError> {
273        std::fs::create_dir_all(&config.path)?;
274
275        Ok(TemporalStorage {
276            config,
277            buckets: Arc::new(RwLock::new(BTreeMap::new())),
278            interval_index: Arc::new(RwLock::new(IntervalIndex {
279                intervals: IntervalTree::new(),
280            })),
281            entity_index: Arc::new(RwLock::new(EntityIndex {
282                entity_history: HashMap::new(),
283            })),
284            change_index: Arc::new(RwLock::new(ChangeIndex {
285                recent_changes: VecDeque::with_capacity(10000),
286                property_changes: HashMap::new(),
287            })),
288            stats: Arc::new(RwLock::new(TemporalStats::default())),
289        })
290    }
291
292    /// Store a temporal triple
293    pub async fn store_temporal(
294        &self,
295        triple: Triple,
296        valid_from: DateTime<Utc>,
297        valid_to: Option<DateTime<Utc>>,
298        metadata: Option<TemporalMetadata>,
299    ) -> Result<(), OxirsError> {
300        let temporal_triple = TemporalTriple {
301            triple: triple.clone(),
302            valid_from,
303            valid_to,
304            transaction_time: Utc::now(),
305            metadata: metadata.unwrap_or(TemporalMetadata {
306                certainty: None,
307                provenance: None,
308                predicted: false,
309                granularity: TemporalGranularity::Second,
310            }),
311        };
312
313        // Determine bucket
314        let bucket_time = self.get_bucket_time(valid_from);
315
316        // Store in bucket
317        {
318            let mut buckets = self.buckets.write().await;
319            let bucket = buckets.entry(bucket_time).or_insert_with(|| Bucket {
320                start_time: bucket_time,
321                triples: Vec::new(),
322                stats: BucketStats::default(),
323            });
324
325            bucket.triples.push(temporal_triple.clone());
326            bucket.stats.triple_count += 1;
327            bucket.stats.last_access = Utc::now();
328        }
329
330        // Update indexes
331        if self.config.indexing.entity_index {
332            self.update_entity_index(&temporal_triple).await?;
333        }
334
335        if self.config.indexing.change_index {
336            self.update_change_index(&temporal_triple).await?;
337        }
338
339        // Update stats
340        let mut stats = self.stats.write().await;
341        stats.total_triples += 1;
342        if valid_to.is_none() {
343            stats.active_triples += 1;
344        } else {
345            stats.historical_triples += 1;
346        }
347
348        Ok(())
349    }
350
351    /// Query triples at a specific time point
352    pub async fn query_at_time(
353        &self,
354        pattern: &TriplePattern,
355        time: DateTime<Utc>,
356    ) -> Result<Vec<Triple>, OxirsError> {
357        let mut results = Vec::new();
358
359        // Search relevant buckets
360        let buckets = self.buckets.read().await;
361        for (_, bucket) in buckets.iter() {
362            for temporal in &bucket.triples {
363                // Check if triple is valid at the given time
364                if temporal.valid_from <= time {
365                    if let Some(valid_to) = temporal.valid_to {
366                        if valid_to < time {
367                            continue;
368                        }
369                    }
370
371                    // Check if pattern matches
372                    if pattern.matches(&temporal.triple) {
373                        results.push(temporal.triple.clone());
374                    }
375                }
376            }
377        }
378
379        Ok(results)
380    }
381
382    /// Query triples within a time range
383    pub async fn query_time_range(
384        &self,
385        pattern: &TriplePattern,
386        start: DateTime<Utc>,
387        end: DateTime<Utc>,
388    ) -> Result<Vec<TemporalTriple>, OxirsError> {
389        let mut results = Vec::new();
390
391        // Get relevant buckets
392        let start_bucket = self.get_bucket_time(start);
393        let end_bucket = self.get_bucket_time(end);
394
395        let buckets = self.buckets.read().await;
396        let range = buckets.range((Bound::Included(start_bucket), Bound::Included(end_bucket)));
397
398        for (_, bucket) in range {
399            for temporal in &bucket.triples {
400                // Check temporal overlap
401                if temporal.valid_from <= end {
402                    if let Some(valid_to) = temporal.valid_to {
403                        if valid_to < start {
404                            continue;
405                        }
406                    }
407
408                    // Check pattern match
409                    if pattern.matches(&temporal.triple) {
410                        results.push(temporal.clone());
411                    }
412                }
413            }
414        }
415
416        Ok(results)
417    }
418
419    /// Get entity history
420    pub async fn get_entity_history(
421        &self,
422        entity_uri: &str,
423    ) -> Result<Option<EntityHistory>, OxirsError> {
424        let entity_index = self.entity_index.read().await;
425        Ok(entity_index.entity_history.get(entity_uri).cloned())
426    }
427
428    /// Get recent changes
429    pub async fn get_recent_changes(&self, limit: usize) -> Result<Vec<ChangeEvent>, OxirsError> {
430        let change_index = self.change_index.read().await;
431        Ok(change_index
432            .recent_changes
433            .iter()
434            .take(limit)
435            .cloned()
436            .collect())
437    }
438
439    /// Perform temporal reasoning
440    pub async fn temporal_reason(
441        &self,
442        query: TemporalQuery,
443    ) -> Result<TemporalResult, OxirsError> {
444        match query {
445            TemporalQuery::AllenRelation {
446                triple1: _,
447                triple2: _,
448                relation: _,
449            } => {
450                // Implement Allen's interval algebra
451                Ok(TemporalResult::Boolean(false)) // Placeholder
452            }
453            TemporalQuery::TemporalPath {
454                start: _,
455                end: _,
456                predicate: _,
457                max_hops: _,
458            } => {
459                // Find temporal paths between entities
460                Ok(TemporalResult::Paths(Vec::new())) // Placeholder
461            }
462            TemporalQuery::ChangeDetection {
463                entity: _,
464                property: _,
465                threshold: _,
466            } => {
467                // Detect significant changes
468                Ok(TemporalResult::Changes(Vec::new())) // Placeholder
469            }
470            TemporalQuery::TrendAnalysis {
471                entity: _,
472                property: _,
473                window: _,
474            } => {
475                // Analyze trends over time
476                Ok(TemporalResult::Trend(TrendData::default())) // Placeholder
477            }
478        }
479    }
480
481    /// Apply retention policy
482    pub async fn apply_retention(&self) -> Result<usize, OxirsError> {
483        let mut removed = 0;
484        let now = Utc::now();
485
486        let mut buckets = self.buckets.write().await;
487        let mut to_remove = Vec::new();
488
489        for (bucket_time, bucket) in buckets.iter_mut() {
490            match &self.config.retention {
491                RetentionPolicy::Days(days) => {
492                    let cutoff = now - Duration::days(*days as i64);
493                    if *bucket_time < cutoff {
494                        to_remove.push(*bucket_time);
495                        removed += bucket.triples.len();
496                    }
497                }
498                RetentionPolicy::Months(months) => {
499                    let cutoff = now - Duration::days((*months as i64) * 30);
500                    if *bucket_time < cutoff {
501                        to_remove.push(*bucket_time);
502                        removed += bucket.triples.len();
503                    }
504                }
505                _ => {} // Other policies not implemented in this example
506            }
507        }
508
509        for bucket_time in to_remove {
510            buckets.remove(&bucket_time);
511        }
512
513        // Update stats
514        let mut stats = self.stats.write().await;
515        stats.total_triples = stats.total_triples.saturating_sub(removed as u64);
516
517        Ok(removed)
518    }
519
520    /// Get bucket time for a given timestamp
521    fn get_bucket_time(&self, time: DateTime<Utc>) -> DateTime<Utc> {
522        let bucket_seconds = self.config.bucket_duration.num_seconds();
523        let timestamp = time.timestamp();
524        let bucket_timestamp = (timestamp / bucket_seconds) * bucket_seconds;
525        DateTime::from_timestamp(bucket_timestamp, 0).expect("bucket timestamp should be valid")
526    }
527
528    /// Update entity index
529    async fn update_entity_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
530        let mut entity_index = self.entity_index.write().await;
531
532        // Extract entity URI from subject
533        let entity_uri = match temporal.triple.subject() {
534            crate::model::Subject::NamedNode(nn) => nn.as_str().to_string(),
535            _ => return Ok(()), // Skip non-URI subjects
536        };
537
538        let history = entity_index
539            .entity_history
540            .entry(entity_uri)
541            .or_insert_with(|| EntityHistory {
542                states: BTreeMap::new(),
543                changes: Vec::new(),
544            });
545
546        // Update entity state
547        let state = history
548            .states
549            .entry(temporal.valid_from)
550            .or_insert_with(|| EntityState {
551                properties: HashMap::new(),
552                relationships: HashMap::new(),
553            });
554
555        // Add property or relationship
556        let predicate_uri = match temporal.triple.predicate() {
557            crate::model::Predicate::NamedNode(nn) => nn.as_str(),
558            crate::model::Predicate::Variable(v) => v.as_str(),
559        };
560        match temporal.triple.object() {
561            crate::model::Object::Literal(lit) => {
562                state
563                    .properties
564                    .entry(predicate_uri.to_string())
565                    .or_insert_with(Vec::new)
566                    .push(lit.clone());
567            }
568            crate::model::Object::NamedNode(nn) => {
569                state
570                    .relationships
571                    .entry(predicate_uri.to_string())
572                    .or_insert_with(Vec::new)
573                    .push(nn.as_str().to_string());
574            }
575            _ => {}
576        }
577
578        Ok(())
579    }
580
581    /// Update change index
582    async fn update_change_index(&self, temporal: &TemporalTriple) -> Result<(), OxirsError> {
583        let mut change_index = self.change_index.write().await;
584
585        let change = ChangeEvent {
586            timestamp: temporal.valid_from,
587            change_type: ChangeType::Insert,
588            property: match temporal.triple.predicate() {
589                crate::model::Predicate::NamedNode(nn) => nn.as_str(),
590                crate::model::Predicate::Variable(v) => v.as_str(),
591            }
592            .to_string(),
593            old_value: None,
594            new_value: Some(Term::from_object(temporal.triple.object())),
595        };
596
597        // Add to recent changes
598        change_index.recent_changes.push_front(change.clone());
599        if change_index.recent_changes.len() > 10000 {
600            change_index.recent_changes.pop_back();
601        }
602
603        // Index by property
604        change_index
605            .property_changes
606            .entry(change.property.clone())
607            .or_insert_with(Vec::new)
608            .push(change);
609
610        Ok(())
611    }
612}
613
614/// Temporal query types
615#[derive(Debug, Clone)]
616pub enum TemporalQuery {
617    /// Allen interval relation query
618    AllenRelation {
619        triple1: Box<TemporalTriple>,
620        triple2: Box<TemporalTriple>,
621        relation: AllenRelation,
622    },
623    /// Temporal path query
624    TemporalPath {
625        start: String,
626        end: String,
627        predicate: Option<String>,
628        max_hops: usize,
629    },
630    /// Change detection query
631    ChangeDetection {
632        entity: String,
633        property: String,
634        threshold: f64,
635    },
636    /// Trend analysis query
637    TrendAnalysis {
638        entity: String,
639        property: String,
640        window: Duration,
641    },
642}
643
644/// Allen's interval relations
645#[derive(Debug, Clone)]
646pub enum AllenRelation {
647    Before,
648    After,
649    Meets,
650    MetBy,
651    Overlaps,
652    OverlappedBy,
653    Starts,
654    StartedBy,
655    During,
656    Contains,
657    Finishes,
658    FinishedBy,
659    Equals,
660}
661
662/// Temporal query result
663#[derive(Debug)]
664pub enum TemporalResult {
665    Boolean(bool),
666    Paths(Vec<Vec<TemporalTriple>>),
667    Changes(Vec<ChangeEvent>),
668    Trend(TrendData),
669}
670
671/// Trend analysis data
672#[derive(Debug, Default)]
673pub struct TrendData {
674    pub slope: f64,
675    pub intercept: f64,
676    pub r_squared: f64,
677    pub predictions: Vec<(DateTime<Utc>, f64)>,
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683    use crate::model::NamedNode;
684
685    #[tokio::test]
686    async fn test_temporal_storage() {
687        let config = TemporalConfig {
688            path: PathBuf::from("/tmp/oxirs_temporal_test"),
689            ..Default::default()
690        };
691
692        let storage = TemporalStorage::new(config)
693            .await
694            .expect("async operation should succeed");
695
696        // Create temporal triple
697        let triple = Triple::new(
698            NamedNode::new("http://example.org/person1").expect("valid IRI"),
699            NamedNode::new("http://example.org/age").expect("valid IRI"),
700            crate::model::Object::Literal(Literal::new("25")),
701        );
702
703        let valid_from = Utc::now() - Duration::days(365);
704        let valid_to = Some(Utc::now() - Duration::days(180));
705
706        // Store temporal triple
707        storage
708            .store_temporal(triple.clone(), valid_from, valid_to, None)
709            .await
710            .expect("operation should succeed");
711
712        // Query at a time when triple was valid
713        let query_time = Utc::now() - Duration::days(270);
714        let pattern = TriplePattern::new(
715            Some(crate::model::SubjectPattern::NamedNode(
716                NamedNode::new("http://example.org/person1").expect("valid IRI"),
717            )),
718            None,
719            None,
720        );
721
722        let results = storage
723            .query_at_time(&pattern, query_time)
724            .await
725            .expect("async operation should succeed");
726        assert_eq!(results.len(), 1);
727        assert_eq!(results[0], triple);
728
729        // Query at current time (should be empty as triple is no longer valid)
730        let current_results = storage
731            .query_at_time(&pattern, Utc::now())
732            .await
733            .expect("async operation should succeed");
734        assert_eq!(current_results.len(), 0);
735    }
736
737    #[tokio::test]
738    async fn test_entity_history() {
739        let config = TemporalConfig {
740            path: PathBuf::from("/tmp/oxirs_temporal_history"),
741            ..Default::default()
742        };
743
744        let storage = TemporalStorage::new(config)
745            .await
746            .expect("async operation should succeed");
747
748        let entity = "http://example.org/person1";
749
750        // Store multiple versions of age
751        for age in 20..=25 {
752            let triple = Triple::new(
753                NamedNode::new(entity).expect("valid IRI"),
754                NamedNode::new("http://example.org/age").expect("valid IRI"),
755                crate::model::Object::Literal(Literal::new(age.to_string())),
756            );
757
758            let valid_from = Utc::now() - Duration::days((26 - age) as i64 * 365);
759            storage
760                .store_temporal(triple, valid_from, None, None)
761                .await
762                .expect("operation should succeed");
763        }
764
765        // Get entity history
766        let history = storage
767            .get_entity_history(entity)
768            .await
769            .expect("async operation should succeed");
770        assert!(history.is_some());
771
772        let history = history.expect("history should be available");
773        assert_eq!(history.states.len(), 6);
774    }
775}