1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17 sort_value: SortValue,
19 entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for SortKey {
30 fn cmp(&self, other: &Self) -> Ordering {
31 match self.sort_value.cmp(&other.sort_value) {
32 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33 other => other,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41 Null,
42 Bool(bool),
43 Integer(i64),
44 Float(OrderedFloat),
45 String(String),
46}
47
48impl Ord for SortValue {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self, other) {
51 (SortValue::Null, SortValue::Null) => Ordering::Equal,
52 (SortValue::Null, _) => Ordering::Less,
53 (_, SortValue::Null) => Ordering::Greater,
54 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56 (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61 (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62 (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63 (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65 (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66 (SortValue::Bool(_), _) => Ordering::Less,
68 (_, SortValue::Bool(_)) => Ordering::Greater,
69 }
70 }
71}
72
73impl PartialOrd for SortValue {
74 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88 if self.0.is_nan() && other.0.is_nan() {
89 Ordering::Equal
90 } else if self.0.is_nan() {
91 Ordering::Less
92 } else {
93 Ordering::Greater
94 }
95 })
96 }
97}
98
99impl PartialOrd for OrderedFloat {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 Some(self.cmp(other))
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109 Asc,
110 Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114 fn from(order: crate::materialized_view::SortOrder) -> Self {
115 match order {
116 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125 None,
127 Add { key: String, entity: Value },
129 Remove { key: String },
131 Update { key: String, entity: Value },
133}
134
135#[derive(Debug)]
137pub struct SortedViewCache {
138 view_id: String,
140 sort_field: Vec<String>,
142 order: SortOrder,
144 sorted: BTreeMap<SortKey, ()>,
146 entities: HashMap<String, (SortKey, Value)>,
148 keys_cache: Vec<String>,
150 cache_dirty: bool,
152}
153
154impl SortedViewCache {
155 pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156 Self {
157 view_id,
158 sort_field,
159 order,
160 sorted: BTreeMap::new(),
161 entities: HashMap::new(),
162 keys_cache: Vec::new(),
163 cache_dirty: true,
164 }
165 }
166
167 pub fn view_id(&self) -> &str {
168 &self.view_id
169 }
170
171 pub fn len(&self) -> usize {
172 self.entities.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.entities.is_empty()
177 }
178
179 pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181 let sort_value = self.extract_sort_value(&entity);
182 let new_sort_key = SortKey {
183 sort_value,
184 entity_key: entity_key.clone(),
185 };
186
187 if let Some((old_sort_key, _old_entity)) = self.entities.get(&entity_key) {
189 if old_sort_key == &new_sort_key {
190 self.entities
192 .insert(entity_key.clone(), (new_sort_key, entity));
193 let position = self.find_position(&entity_key);
195 return UpsertResult::Updated { position };
196 }
197
198 self.sorted.remove(old_sort_key);
200 }
201
202 self.sorted.insert(new_sort_key.clone(), ());
203 self.entities
204 .insert(entity_key.clone(), (new_sort_key, entity));
205 self.cache_dirty = true;
206
207 let position = self.find_position(&entity_key);
208 UpsertResult::Inserted { position }
209 }
210
211 pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
213 if let Some((sort_key, _)) = self.entities.remove(entity_key) {
214 let position = self.find_position_by_sort_key(&sort_key);
215 self.sorted.remove(&sort_key);
216 self.cache_dirty = true;
217 Some(position)
218 } else {
219 None
220 }
221 }
222
223 pub fn get(&self, entity_key: &str) -> Option<&Value> {
225 self.entities.get(entity_key).map(|(_, v)| v)
226 }
227
228 pub fn ordered_keys(&mut self) -> &[String] {
230 if self.cache_dirty {
231 self.rebuild_keys_cache();
232 }
233 &self.keys_cache
234 }
235
236 pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
238 if self.cache_dirty {
239 self.rebuild_keys_cache();
240 }
241
242 self.keys_cache
243 .iter()
244 .skip(skip)
245 .take(take)
246 .filter_map(|key| {
247 self.entities
248 .get(key)
249 .map(|(_, v)| (key.clone(), v.clone()))
250 })
251 .collect()
252 }
253
254 pub fn compute_window_deltas(
256 &mut self,
257 old_window_keys: &[String],
258 skip: usize,
259 take: usize,
260 ) -> Vec<ViewDelta> {
261 if self.cache_dirty {
262 self.rebuild_keys_cache();
263 }
264
265 let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
266
267 let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
268 let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
269
270 let mut deltas = Vec::new();
271
272 for key in old_set.difference(&new_set) {
274 deltas.push(ViewDelta::Remove {
275 key: (*key).clone(),
276 });
277 }
278
279 for key in new_set.difference(&old_set) {
281 if let Some((_, entity)) = self.entities.get(*key) {
282 deltas.push(ViewDelta::Add {
283 key: (*key).clone(),
284 entity: entity.clone(),
285 });
286 }
287 }
288
289 deltas
290 }
291
292 fn extract_sort_value(&self, entity: &Value) -> SortValue {
293 let mut current = entity;
294 for segment in &self.sort_field {
295 match current.get(segment) {
296 Some(v) => current = v,
297 None => return SortValue::Null,
298 }
299 }
300
301 match self.order {
302 SortOrder::Asc => value_to_sort_value(current),
303 SortOrder::Desc => value_to_sort_value_desc(current),
304 }
305 }
306
307 fn find_position(&self, entity_key: &str) -> usize {
308 if let Some((sort_key, _)) = self.entities.get(entity_key) {
309 self.find_position_by_sort_key(sort_key)
310 } else {
311 0
312 }
313 }
314
315 fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
316 self.sorted.range(..sort_key).count()
317 }
318
319 fn rebuild_keys_cache(&mut self) {
320 self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
321 self.cache_dirty = false;
322 }
323}
324
325#[derive(Debug, Clone, PartialEq)]
327pub enum UpsertResult {
328 Inserted { position: usize },
330 Updated { position: usize },
332}
333
334fn value_to_sort_value(v: &Value) -> SortValue {
335 match v {
336 Value::Null => SortValue::Null,
337 Value::Bool(b) => SortValue::Bool(*b),
338 Value::Number(n) => {
339 if let Some(i) = n.as_i64() {
340 SortValue::Integer(i)
341 } else if let Some(f) = n.as_f64() {
342 SortValue::Float(OrderedFloat(f))
343 } else {
344 SortValue::Null
345 }
346 }
347 Value::String(s) => SortValue::String(s.clone()),
348 _ => SortValue::Null,
349 }
350}
351
352fn value_to_sort_value_desc(v: &Value) -> SortValue {
353 match v {
354 Value::Null => SortValue::Null,
355 Value::Bool(b) => SortValue::Bool(!*b),
356 Value::Number(n) => {
357 if let Some(i) = n.as_i64() {
358 SortValue::Integer(-i)
359 } else if let Some(f) = n.as_f64() {
360 SortValue::Float(OrderedFloat(-f))
361 } else {
362 SortValue::Null
363 }
364 }
365 Value::String(s) => {
366 SortValue::String(s.clone())
370 }
371 _ => SortValue::Null,
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use serde_json::json;
379
380 #[test]
381 fn test_sorted_cache_basic() {
382 let mut cache = SortedViewCache::new(
383 "test/latest".to_string(),
384 vec!["id".to_string()],
385 SortOrder::Desc,
386 );
387
388 cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
389 cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
390 cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
391
392 let keys = cache.ordered_keys();
393 assert_eq!(keys, vec!["b", "c", "a"]);
395 }
396
397 #[test]
398 fn test_sorted_cache_window() {
399 let mut cache = SortedViewCache::new(
400 "test/latest".to_string(),
401 vec!["id".to_string()],
402 SortOrder::Desc,
403 );
404
405 for i in 1..=10 {
406 cache.upsert(format!("e{}", i), json!({"id": i}));
407 }
408
409 let window = cache.get_window(0, 3);
411 assert_eq!(window.len(), 3);
412 assert_eq!(window[0].0, "e10");
413 assert_eq!(window[1].0, "e9");
414 assert_eq!(window[2].0, "e8");
415
416 let window = cache.get_window(3, 3);
417 assert_eq!(window[0].0, "e7");
418 }
419
420 #[test]
421 fn test_sorted_cache_update_moves_position() {
422 let mut cache = SortedViewCache::new(
423 "test/latest".to_string(),
424 vec!["score".to_string()],
425 SortOrder::Desc,
426 );
427
428 cache.upsert("a".to_string(), json!({"score": 10}));
429 cache.upsert("b".to_string(), json!({"score": 20}));
430 cache.upsert("c".to_string(), json!({"score": 15}));
431
432 assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
434
435 cache.upsert("a".to_string(), json!({"score": 25}));
437
438 assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
440 }
441
442 #[test]
443 fn test_sorted_cache_remove() {
444 let mut cache = SortedViewCache::new(
445 "test/latest".to_string(),
446 vec!["id".to_string()],
447 SortOrder::Asc,
448 );
449
450 cache.upsert("a".to_string(), json!({"id": 1}));
451 cache.upsert("b".to_string(), json!({"id": 2}));
452 cache.upsert("c".to_string(), json!({"id": 3}));
453
454 assert_eq!(cache.len(), 3);
455
456 let pos = cache.remove("b");
457 assert_eq!(pos, Some(1));
458 assert_eq!(cache.len(), 2);
459 assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
460 }
461
462 #[test]
463 fn test_compute_window_deltas() {
464 let mut cache = SortedViewCache::new(
465 "test/latest".to_string(),
466 vec!["id".to_string()],
467 SortOrder::Desc,
468 );
469
470 for i in 1..=5 {
472 cache.upsert(format!("e{}", i), json!({"id": i}));
473 }
474
475 let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
476
477 cache.upsert("e6".to_string(), json!({"id": 6}));
479
480 let deltas = cache.compute_window_deltas(&old_window, 0, 3);
483
484 assert_eq!(deltas.len(), 2);
485 assert!(deltas
487 .iter()
488 .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
489 assert!(deltas
491 .iter()
492 .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
493 }
494
495 #[test]
496 fn test_nested_sort_field() {
497 let mut cache = SortedViewCache::new(
498 "test/latest".to_string(),
499 vec!["id".to_string(), "round_id".to_string()],
500 SortOrder::Desc,
501 );
502
503 cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
504 cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
505 cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
506
507 let keys = cache.ordered_keys();
508 assert_eq!(keys, vec!["b", "c", "a"]);
509 }
510}