Skip to main content

bitemporal_runtime/
queries.rs

1//! Bitemporal query operations.
2
3use chrono::{DateTime, Utc};
4
5use crate::error::BitemporalError;
6use crate::types::{BitemporalRecord, SupersessionReceipt};
7
8/// Append a new record and mark prior versions as superseded.
9///
10/// This is the core append-supersede operation:
11/// - A new row is inserted with the given `valid_time` and a new `recorded_time`
12/// - Any prior rows for the same record ID are marked as superseded
13///
14/// Returns a `SupersessionReceipt` for each superseded prior record.
15///
16/// # Arguments
17///
18/// * `records` — mutable slice of existing records (simulating a database table)
19/// * `new_record` — the new record to append
20///
21/// # Returns
22///
23/// Vector of `SupersessionReceipt` for each superseded prior version.
24pub fn append_supersede<T>(
25    records: &mut Vec<BitemporalRecord<T>>,
26    new_record: BitemporalRecord<T>,
27) -> Result<Vec<SupersessionReceipt>, BitemporalError>
28where
29    T: Clone,
30{
31    // Collect prior versions of this record ID
32    let prior_versions: Vec<_> = records
33        .iter()
34        .filter(|r| r.id == new_record.id)
35        .cloned()
36        .collect();
37
38    let mut receipts = Vec::new();
39
40    // Create supersession receipts for each prior version
41    for prior in prior_versions {
42        // We need concrete types for the receipt; use type erasure via BitemporalRecord<()>
43        // but we need the actual record IDs and times, so we use the concrete prior/new
44        let receipt = SupersessionReceipt::new(
45            BitemporalRecord {
46                id: prior.id.clone(),
47                valid_time: prior.valid_time,
48                recorded_time: prior.recorded_time,
49                value: (),
50            },
51            BitemporalRecord {
52                id: new_record.id.clone(),
53                valid_time: new_record.valid_time,
54                recorded_time: new_record.recorded_time,
55                value: (),
56            },
57        );
58        receipts.push(receipt);
59    }
60
61    // Append the new record (append-only, never mutate prior rows)
62    records.push(new_record);
63
64    Ok(receipts)
65}
66
67/// Query records valid at a specific `valid_time` as of a specific `recorded_time`.
68///
69/// This implements the bitemporal "as-of" query:
70/// - Returns records whose `valid_time` is at or before the query `valid_time`
71/// - Among those, returns the version that was current as of `recorded_time`
72///   (i.e., the latest record with `recorded_time <= query_recorded_time`)
73///
74/// # Arguments
75///
76/// * `records` — slice of all records (simulating a database table)
77/// * `valid_time` — the business time to query
78/// * `recorded_time` — the system time to query as-of
79///
80/// # Returns
81///
82/// Records valid at `valid_time` as of `recorded_time`.
83pub fn as_of_query<T>(
84    records: &[BitemporalRecord<T>],
85    valid_time: DateTime<Utc>,
86    recorded_time: DateTime<Utc>,
87) -> Vec<BitemporalRecord<T>>
88where
89    T: Clone,
90{
91    use std::collections::HashMap;
92
93    let mut latest_by_id: HashMap<String, BitemporalRecord<T>> = HashMap::new();
94
95    for record in records {
96        if record.recorded_time <= recorded_time && record.valid_time <= valid_time {
97            let existing = latest_by_id.get(&record.id);
98            if existing
99                .map(|e| record.recorded_time > e.recorded_time)
100                .unwrap_or(true)
101            {
102                latest_by_id.insert(record.id.clone(), record.clone());
103            }
104        }
105    }
106
107    latest_by_id.into_values().collect()
108}
109
110/// Return full state as of a specific `recorded_time`.
111///
112/// Returns all records that were current at the given recorded_time,
113/// one record per unique ID (the latest version as of that time).
114pub fn temporal_snapshot<T>(
115    records: &[BitemporalRecord<T>],
116    as_of_time: DateTime<Utc>,
117) -> Vec<BitemporalRecord<T>>
118where
119    T: Clone,
120{
121    as_of_query(records, as_of_time, as_of_time)
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use chrono::TimeZone;
128
129    #[test]
130    fn test_append_supersede_creates_receipt() {
131        let mut records: Vec<BitemporalRecord<&str>> = Vec::new();
132
133        let t0 = Utc.timestamp_opt(1000, 0).unwrap();
134        let v1 = BitemporalRecord {
135            id: "ep1".to_string(),
136            valid_time: t0,
137            recorded_time: t0,
138            value: "version1",
139        };
140
141        let receipts = append_supersede(&mut records, v1).unwrap();
142        assert!(receipts.is_empty());
143        assert_eq!(records.len(), 1);
144
145        let t1 = Utc.timestamp_opt(2000, 0).unwrap();
146        let v2 = BitemporalRecord {
147            id: "ep1".to_string(),
148            valid_time: t0,
149            recorded_time: t1,
150            value: "version2",
151        };
152
153        let receipts = append_supersede(&mut records, v2).unwrap();
154        assert_eq!(receipts.len(), 1);
155        assert_eq!(receipts[0].superseded.superseded_id, "ep1");
156        assert_eq!(receipts[0].superseding_id, "ep1");
157        assert_eq!(records.len(), 2);
158    }
159
160    #[test]
161    fn test_as_of_query_returns_correct_version() {
162        let t0 = Utc.timestamp_opt(1000, 0).unwrap();
163        let t1 = Utc.timestamp_opt(2000, 0).unwrap();
164        let t2 = Utc.timestamp_opt(3000, 0).unwrap();
165
166        let records: Vec<BitemporalRecord<&str>> = vec![
167            BitemporalRecord {
168                id: "ep1".to_string(),
169                valid_time: t0,
170                recorded_time: t0,
171                value: "v1",
172            },
173            BitemporalRecord {
174                id: "ep1".to_string(),
175                valid_time: t0,
176                recorded_time: t1,
177                value: "v2",
178            },
179            BitemporalRecord {
180                id: "ep1".to_string(),
181                valid_time: t1,
182                recorded_time: t2,
183                value: "v3",
184            },
185        ];
186
187        let result = as_of_query(&records, t0, t0);
188        assert_eq!(result.len(), 1);
189        assert_eq!(result[0].value, "v1");
190
191        let result = as_of_query(&records, t0, t1);
192        assert_eq!(result.len(), 1);
193        assert_eq!(result[0].value, "v2");
194
195        let result = as_of_query(&records, t1, t2);
196        assert_eq!(result.len(), 1);
197        assert_eq!(result[0].value, "v3");
198    }
199
200    #[test]
201    fn test_temporal_snapshot() {
202        let t0 = Utc.timestamp_opt(1000, 0).unwrap();
203        let t1 = Utc.timestamp_opt(2000, 0).unwrap();
204
205        let records: Vec<BitemporalRecord<&str>> = vec![
206            BitemporalRecord {
207                id: "ep1".to_string(),
208                valid_time: t0,
209                recorded_time: t0,
210                value: "v1",
211            },
212            BitemporalRecord {
213                id: "ep1".to_string(),
214                valid_time: t0,
215                recorded_time: t1,
216                value: "v2",
217            },
218            BitemporalRecord {
219                id: "ep2".to_string(),
220                valid_time: t0,
221                recorded_time: t0,
222                value: "ep2_v1",
223            },
224        ];
225
226        let snap = temporal_snapshot(&records, t0);
227        assert_eq!(snap.len(), 2);
228        let ep1 = snap.iter().find(|r| r.id == "ep1").unwrap();
229        assert_eq!(ep1.value, "v1");
230
231        let snap = temporal_snapshot(&records, t1);
232        assert_eq!(snap.len(), 2);
233        let ep1 = snap.iter().find(|r| r.id == "ep1").unwrap();
234        assert_eq!(ep1.value, "v2");
235    }
236}