1use crate::stats::bitonics::CountingBitonic;
2use crate::table::lotable::LOTable;
3use anyhow::*;
4use itertools::Itertools;
5use slice_group_by::GroupBy;
6use std::any::Any;
7use std::borrow::{Borrow, Cow};
8use std::ops::{Deref, Range, RangeBounds};
9use std::sync::Arc;
10
11#[derive(Default, Clone, Debug)]
14pub struct Zone {
15 pub min: usize,
16 pub max: usize,
17 pub selectivity: usize,
18 pub stats: CountingBitonic,
19}
20
21unsafe impl Send for Zone {}
22unsafe impl Sync for Zone {}
23
24impl Zone {
25 pub fn hits(&self) -> usize {
27 self.stats.get()
28 }
29
30 pub fn zone_triple(&self) -> (usize, usize, usize) {
33 (self.min, self.max, self.selectivity)
34 }
35}
36
37impl From<(usize, usize)> for Zone {
38 fn from(r: (usize, usize)) -> Self {
39 Zone {
40 min: r.0,
41 max: r.1,
42 ..Self::default()
43 }
44 }
45}
46
47impl From<(usize, usize, usize)> for Zone {
48 fn from(r: (usize, usize, usize)) -> Self {
49 Zone {
50 min: r.0,
51 max: r.1,
52 selectivity: r.2,
53 ..Self::default()
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
61pub struct ColumnZoneData {
62 zones: LOTable<usize, Zone>,
64}
65
66unsafe impl Send for ColumnZoneData {}
67unsafe impl Sync for ColumnZoneData {}
68
69impl ColumnZoneData {
70 pub fn new() -> ColumnZoneData {
73 Self {
74 zones: LOTable::new(),
75 }
76 }
77
78 pub fn insert(&self, zone_id: usize, zone_data: Zone) -> Result<Arc<Option<Zone>>> {
82 self.zones.insert(zone_id, zone_data)
83 }
84
85 pub fn batch_insert(&self, zones: Vec<(usize, Zone)>) {
88 zones.iter().for_each(|(zid, zdata)| {
89 let _ = self.zones.insert(*zid, zdata.clone());
90 })
91 }
92
93 pub fn update(&self, zone_id: usize, min: usize, max: usize, selectivity: usize) {
96 let zone = Zone {
97 min,
98 max,
99 selectivity,
100 ..Default::default()
101 };
102 let _ = self.zones.insert(zone_id, zone);
103 }
104
105 pub fn update_zone(&self, zone_id: usize, zone_data: Zone) {
108 let _ = self.zones.insert(zone_id, zone_data);
109 }
110
111 pub fn selectivity(&self, zone_id: usize) -> usize {
114 self.zones
115 .replace_with(&zone_id, |z| {
116 z.map_or(Some(Zone::default()), |z| {
117 z.stats.traverse(zone_id);
118 Some(z.to_owned())
119 })
120 })
121 .map_or(0, |z| z.selectivity)
122 }
123
124 pub fn selectivity_range<R>(&self, range_min: R, range_max: R, data: &[R]) -> usize
127 where
128 R: PartialOrd + std::fmt::Debug,
129 {
130 self.zones
131 .values()
132 .into_iter()
133 .filter(|z| {
134 let (zl, zr, _) = z.zone_triple();
135 (&data[zl]..=&data[zr]).contains(&&range_min)
136 || (&data[zl]..=&data[zr]).contains(&&range_max)
137 })
138 .map(|z| z.selectivity)
139 .sum()
140 }
141
142 pub fn scan_range<R>(&self, range_min: R, range_max: R, data: &[R]) -> (usize, usize)
145 where
146 R: PartialOrd,
147 {
148 self.zones
149 .values()
150 .into_iter()
151 .filter(|z| {
152 let (zl, zr, _) = z.zone_triple();
153 (&data[zl]..=&data[zr]).contains(&&range_min)
154 || (&data[zl]..=&data[zr]).contains(&&range_max)
155 })
156 .fold((usize::MAX, 0_usize), |mut acc, e| {
157 acc.0 = acc.0.min(e.min);
158 acc.1 = acc.1.max(e.max);
159 acc
160 })
161 }
162
163 pub fn zone_hits(&self, zone_id: usize) -> usize {
165 self.zones.get(&zone_id).map_or(0, |z| z.hits())
166 }
167}
168
169#[derive(Debug, Clone)]
172pub struct ZoneMap {
173 col_zones: LOTable<String, ColumnZoneData>,
174}
175
176impl ZoneMap {
177 pub fn new() -> ZoneMap {
180 Self {
181 col_zones: LOTable::new(),
182 }
183 }
184
185 pub fn insert<T>(
189 &self,
190 column: T,
191 zone_data: ColumnZoneData,
192 ) -> Result<Arc<Option<ColumnZoneData>>>
193 where
194 T: Into<String>,
195 {
196 self.col_zones.insert(column.into(), zone_data)
197 }
198
199 pub fn selectivity_range<C, R>(
202 &self,
203 column: C,
204 range_min: R,
205 range_max: R,
206 data: &[R],
207 ) -> usize
208 where
209 C: Into<String>,
210 R: PartialOrd + std::fmt::Debug,
211 {
212 self.col_zones
213 .get(&column.into())
214 .map_or(0_usize, |c| c.selectivity_range(range_min, range_max, data))
215 }
216
217 pub fn scan_range<C, R>(
220 &self,
221 column: C,
222 range_min: R,
223 range_max: R,
224 data: &[R],
225 ) -> (usize, usize)
226 where
227 C: Into<String>,
228 R: PartialOrd + std::fmt::Debug,
229 {
230 self.col_zones
231 .get(&column.into())
232 .map_or((0, 0), |c| c.scan_range(range_min, range_max, data))
233 }
234}
235
236impl<'a, T, R> From<Vec<(T, &'a [R])>> for ZoneMap
237where
238 T: Into<String>,
239 R: PartialOrd,
240{
241 fn from(data: Vec<(T, &'a [R])>) -> Self {
242 let zm = ZoneMap::new();
243 data.into_iter().for_each(|(col, d)| {
244 let mut row_id = 0_usize;
245 let czm = ColumnZoneData::new();
246 d.linear_group_by(|l, r| l < r).for_each(|d| {
247 let r = d.len();
248 let offset = row_id;
249 let z = Zone::from((row_id, row_id + r - 1, r));
250 row_id += r;
251 let _ = czm.insert(offset, z);
252 });
253
254 let _ = zm.insert(col.into(), czm);
255 });
256 zm
257 }
258}
259
260#[cfg(test)]
261mod tests_zone_map {
262 use super::*;
263
264 #[test]
265 fn test_zone_selectivity() {
266 let customers: Vec<i32> =
267 vec![vec![1, 0, -1, -2].repeat(2), vec![1, 2, 3, 4].repeat(3)].concat();
268 let products = vec![4, 3, 2, 1].repeat(100);
269 let payouts = vec![4, 2, 6, 7].repeat(100);
270
271 let ingestion_data = vec![
272 ("customers", customers.as_slice()),
273 ("products", products.as_slice()),
274 ("payouts", payouts.as_slice()),
275 ];
276
277 let zone_map = ZoneMap::from(ingestion_data);
278
279 assert_eq!(
281 zone_map.selectivity_range("customers", 4, 4, &*customers),
282 13
283 );
284 }
285
286 #[test]
287 fn test_zone_scan_range() {
288 let customers: Vec<i32> =
289 vec![vec![1, 0, -1, -2].repeat(2), vec![1, 2, 3, 4].repeat(3)].concat();
290 let products = vec![4, 3, 2, 1].repeat(100);
291 let payouts = vec![4, 2, 6, 7].repeat(100);
292
293 let ingestion_data = vec![
294 ("customers", customers.as_slice()),
295 ("products", products.as_slice()),
296 ("payouts", payouts.as_slice()),
297 ];
298
299 let zone_map = ZoneMap::from(ingestion_data);
300
301 assert_eq!(zone_map.scan_range("customers", 4, 4, &*customers), (7, 19));
304 }
305}