shape_runtime/multi_table/
alignment.rs1use crate::data::OwnedDataRow as RowValue;
4use shape_ast::ast::Timeframe;
5use shape_ast::error::Result;
6use std::collections::HashMap;
7
8pub fn align_tables(
10 left_rows: &[RowValue],
11 right_rows: &[RowValue],
12 _timeframe: Timeframe,
13) -> Result<Vec<RowValue>> {
14 let mut right_by_ts: HashMap<i64, &RowValue> = HashMap::new();
15 for row in right_rows {
16 right_by_ts.insert(row.timestamp, row);
17 }
18
19 let mut result = Vec::with_capacity(left_rows.len());
20 let mut last_valid: Option<&RowValue> = None;
21
22 for left_row in left_rows {
23 if let Some(&right_row) = right_by_ts.get(&left_row.timestamp) {
24 result.push(right_row.clone());
25 last_valid = Some(right_row);
26 } else if let Some(prev) = last_valid {
27 let mut filled = prev.clone();
29 filled.timestamp = left_row.timestamp;
30 result.push(filled);
31 } else {
32 result.push(RowValue::new_generic(left_row.timestamp, HashMap::new()));
34 }
35 }
36
37 Ok(result)
38}
39
40pub fn align_intersection(series_data: &[Vec<RowValue>]) -> Result<Vec<Vec<RowValue>>> {
42 if series_data.is_empty() {
43 return Ok(Vec::new());
44 }
45
46 let mut common_ts: std::collections::HashSet<i64> =
48 series_data[0].iter().map(|r| r.timestamp).collect();
49 for series in &series_data[1..] {
50 let ts: std::collections::HashSet<i64> = series.iter().map(|r| r.timestamp).collect();
51 common_ts = common_ts.intersection(&ts).cloned().collect();
52 }
53
54 let mut sorted_ts: Vec<i64> = common_ts.into_iter().collect();
55 sorted_ts.sort_unstable();
56
57 let mut result = Vec::with_capacity(series_data.len());
58 for series in series_data {
59 let mut series_by_ts: HashMap<i64, &RowValue> =
60 series.iter().map(|r| (r.timestamp, r)).collect();
61 let mut aligned = Vec::with_capacity(sorted_ts.len());
62 for ts in &sorted_ts {
63 aligned.push(series_by_ts.remove(ts).unwrap().clone());
64 }
65 result.push(aligned);
66 }
67
68 Ok(result)
69}
70
71pub fn align_union(series_data: &[Vec<RowValue>]) -> Result<Vec<Vec<RowValue>>> {
73 if series_data.is_empty() {
74 return Ok(Vec::new());
75 }
76
77 let mut all_ts = std::collections::HashSet::new();
78 for series in series_data {
79 for row in series {
80 all_ts.insert(row.timestamp);
81 }
82 }
83
84 let mut sorted_ts: Vec<i64> = all_ts.into_iter().collect();
85 sorted_ts.sort_unstable();
86
87 let mut result = Vec::with_capacity(series_data.len());
88 for series in series_data {
89 let series_by_ts: HashMap<i64, &RowValue> =
90 series.iter().map(|r| (r.timestamp, r)).collect();
91 let mut aligned = Vec::with_capacity(sorted_ts.len());
92 let mut last_valid: Option<RowValue> = None;
93
94 for ts in &sorted_ts {
95 if let Some(row) = series_by_ts.get(ts) {
96 let r = (*row).clone();
97 aligned.push(r.clone());
98 last_valid = Some(r);
99 } else if let Some(ref prev) = last_valid {
100 let mut filled = prev.clone();
101 filled.timestamp = *ts;
102 aligned.push(filled);
103 } else {
104 aligned.push(RowValue::new_generic(*ts, HashMap::new()));
105 }
106 }
107 result.push(aligned);
108 }
109
110 Ok(result)
111}
112
113pub fn align_left(left: &[RowValue], right: &[RowValue]) -> Result<Vec<RowValue>> {
115 align_tables(left, right, Timeframe::default())
116}
117
118pub fn join_tables(left_rows: &[RowValue], right_rows: &[RowValue]) -> Result<Vec<RowValue>> {
120 let mut right_by_ts: HashMap<i64, &RowValue> = HashMap::new();
121 for row in right_rows {
122 right_by_ts.insert(row.timestamp, row);
123 }
124
125 let mut result = Vec::new();
126
127 for left_row in left_rows {
128 if let Some(&right_row) = right_by_ts.get(&left_row.timestamp) {
129 let mut fields = left_row.fields.clone();
131 for (k, v) in &right_row.fields {
132 fields.insert(k.clone(), *v);
133 }
134 result.push(RowValue::new_generic(left_row.timestamp, fields));
135 }
136 }
137
138 Ok(result)
139}