Skip to main content

shape_runtime/multi_table/
alignment.rs

1//! Multi-table alignment and joining
2
3use crate::data::OwnedDataRow as RowValue;
4use shape_ast::ast::Timeframe;
5use shape_ast::error::Result;
6use std::collections::HashMap;
7
8/// Aligns right rows to match the timestamps of left rows
9pub fn align_tables(
10    left_rows: &[RowValue],
11    right_rows: &[RowValue],
12    _timeframe: Timeframe,
13) -> Result<Vec<RowValue>> {
14    let mut right_by_ts: HashMap<i64, &RowValue> = HashMap::new();
15    for row in right_rows {
16        right_by_ts.insert(row.timestamp, row);
17    }
18
19    let mut result = Vec::with_capacity(left_rows.len());
20    let mut last_valid: Option<&RowValue> = None;
21
22    for left_row in left_rows {
23        if let Some(&right_row) = right_by_ts.get(&left_row.timestamp) {
24            result.push(right_row.clone());
25            last_valid = Some(right_row);
26        } else if let Some(prev) = last_valid {
27            // Forward fill
28            let mut filled = prev.clone();
29            filled.timestamp = left_row.timestamp;
30            result.push(filled);
31        } else {
32            // No previous data - use NaN placeholder
33            result.push(RowValue::new_generic(left_row.timestamp, HashMap::new()));
34        }
35    }
36
37    Ok(result)
38}
39
40/// Aligns multiple row sets to their intersection of timestamps
41pub fn align_intersection(series_data: &[Vec<RowValue>]) -> Result<Vec<Vec<RowValue>>> {
42    if series_data.is_empty() {
43        return Ok(Vec::new());
44    }
45
46    // Find intersection of all timestamps
47    let mut common_ts: std::collections::HashSet<i64> =
48        series_data[0].iter().map(|r| r.timestamp).collect();
49    for series in &series_data[1..] {
50        let ts: std::collections::HashSet<i64> = series.iter().map(|r| r.timestamp).collect();
51        common_ts = common_ts.intersection(&ts).cloned().collect();
52    }
53
54    let mut sorted_ts: Vec<i64> = common_ts.into_iter().collect();
55    sorted_ts.sort_unstable();
56
57    let mut result = Vec::with_capacity(series_data.len());
58    for series in series_data {
59        let mut series_by_ts: HashMap<i64, &RowValue> =
60            series.iter().map(|r| (r.timestamp, r)).collect();
61        let mut aligned = Vec::with_capacity(sorted_ts.len());
62        for ts in &sorted_ts {
63            aligned.push(series_by_ts.remove(ts).unwrap().clone());
64        }
65        result.push(aligned);
66    }
67
68    Ok(result)
69}
70
71/// Aligns multiple row sets to the union of all timestamps
72pub fn align_union(series_data: &[Vec<RowValue>]) -> Result<Vec<Vec<RowValue>>> {
73    if series_data.is_empty() {
74        return Ok(Vec::new());
75    }
76
77    let mut all_ts = std::collections::HashSet::new();
78    for series in series_data {
79        for row in series {
80            all_ts.insert(row.timestamp);
81        }
82    }
83
84    let mut sorted_ts: Vec<i64> = all_ts.into_iter().collect();
85    sorted_ts.sort_unstable();
86
87    let mut result = Vec::with_capacity(series_data.len());
88    for series in series_data {
89        let series_by_ts: HashMap<i64, &RowValue> =
90            series.iter().map(|r| (r.timestamp, r)).collect();
91        let mut aligned = Vec::with_capacity(sorted_ts.len());
92        let mut last_valid: Option<RowValue> = None;
93
94        for ts in &sorted_ts {
95            if let Some(row) = series_by_ts.get(ts) {
96                let r = (*row).clone();
97                aligned.push(r.clone());
98                last_valid = Some(r);
99            } else if let Some(ref prev) = last_valid {
100                let mut filled = prev.clone();
101                filled.timestamp = *ts;
102                aligned.push(filled);
103            } else {
104                aligned.push(RowValue::new_generic(*ts, HashMap::new()));
105            }
106        }
107        result.push(aligned);
108    }
109
110    Ok(result)
111}
112
113/// Aligns one row set to a reference row set (left join)
114pub fn align_left(left: &[RowValue], right: &[RowValue]) -> Result<Vec<RowValue>> {
115    align_tables(left, right, Timeframe::default())
116}
117
118/// Joins two row sets by timestamp (inner join)
119pub fn join_tables(left_rows: &[RowValue], right_rows: &[RowValue]) -> Result<Vec<RowValue>> {
120    let mut right_by_ts: HashMap<i64, &RowValue> = HashMap::new();
121    for row in right_rows {
122        right_by_ts.insert(row.timestamp, row);
123    }
124
125    let mut result = Vec::new();
126
127    for left_row in left_rows {
128        if let Some(&right_row) = right_by_ts.get(&left_row.timestamp) {
129            // Merge fields
130            let mut fields = left_row.fields.clone();
131            for (k, v) in &right_row.fields {
132                fields.insert(k.clone(), *v);
133            }
134            result.push(RowValue::new_generic(left_row.timestamp, fields));
135        }
136    }
137
138    Ok(result)
139}