1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17 sort_value: SortValue,
19 entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for SortKey {
30 fn cmp(&self, other: &Self) -> Ordering {
31 match self.sort_value.cmp(&other.sort_value) {
32 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33 other => other,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41 Null,
42 Bool(bool),
43 Integer(i64),
44 Float(OrderedFloat),
45 String(String),
46}
47
48impl Ord for SortValue {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self, other) {
51 (SortValue::Null, SortValue::Null) => Ordering::Equal,
52 (SortValue::Null, _) => Ordering::Less,
53 (_, SortValue::Null) => Ordering::Greater,
54 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56 (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61 (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62 (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63 (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65 (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66 (SortValue::Bool(_), _) => Ordering::Less,
68 (_, SortValue::Bool(_)) => Ordering::Greater,
69 }
70 }
71}
72
73impl PartialOrd for SortValue {
74 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88 if self.0.is_nan() && other.0.is_nan() {
89 Ordering::Equal
90 } else if self.0.is_nan() {
91 Ordering::Less
92 } else {
93 Ordering::Greater
94 }
95 })
96 }
97}
98
99impl PartialOrd for OrderedFloat {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 Some(self.cmp(other))
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109 Asc,
110 Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114 fn from(order: crate::materialized_view::SortOrder) -> Self {
115 match order {
116 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125 None,
127 Add { key: String, entity: Value },
129 Remove { key: String },
131 Update { key: String, entity: Value },
133}
134
135#[derive(Debug)]
137pub struct SortedViewCache {
138 view_id: String,
140 sort_field: Vec<String>,
142 order: SortOrder,
144 sorted: BTreeMap<SortKey, ()>,
146 entities: HashMap<String, (SortKey, Value)>,
148 keys_cache: Vec<String>,
150 cache_dirty: bool,
152}
153
154impl SortedViewCache {
155 pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156 Self {
157 view_id,
158 sort_field,
159 order,
160 sorted: BTreeMap::new(),
161 entities: HashMap::new(),
162 keys_cache: Vec::new(),
163 cache_dirty: true,
164 }
165 }
166
167 pub fn view_id(&self) -> &str {
168 &self.view_id
169 }
170
171 pub fn len(&self) -> usize {
172 self.entities.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.entities.is_empty()
177 }
178
179 pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181 let sort_value = self.extract_sort_value(&entity);
182
183 if let Some((old_sort_key, _)) = self.entities.get(&entity_key).cloned() {
185 let effective_sort_value = if matches!(sort_value, SortValue::Null)
186 && !matches!(old_sort_key.sort_value, SortValue::Null)
187 {
188 old_sort_key.sort_value.clone()
189 } else {
190 sort_value
191 };
192
193 let new_sort_key = SortKey {
194 sort_value: effective_sort_value,
195 entity_key: entity_key.clone(),
196 };
197
198 if old_sort_key == new_sort_key {
199 self.entities
201 .insert(entity_key.clone(), (new_sort_key, entity));
202 let position = self.find_position(&entity_key);
204 return UpsertResult::Updated { position };
205 }
206
207 self.sorted.remove(&old_sort_key);
209 self.sorted.insert(new_sort_key.clone(), ());
210 self.entities
211 .insert(entity_key.clone(), (new_sort_key, entity));
212 self.cache_dirty = true;
213
214 let position = self.find_position(&entity_key);
215 return UpsertResult::Inserted { position };
216 }
217
218 let new_sort_key = SortKey {
219 sort_value,
220 entity_key: entity_key.clone(),
221 };
222
223 self.sorted.insert(new_sort_key.clone(), ());
224 self.entities
225 .insert(entity_key.clone(), (new_sort_key, entity));
226 self.cache_dirty = true;
227
228 let position = self.find_position(&entity_key);
229 UpsertResult::Inserted { position }
230 }
231
232 pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
234 if let Some((sort_key, _)) = self.entities.remove(entity_key) {
235 let position = self.find_position_by_sort_key(&sort_key);
236 self.sorted.remove(&sort_key);
237 self.cache_dirty = true;
238 Some(position)
239 } else {
240 None
241 }
242 }
243
244 pub fn get(&self, entity_key: &str) -> Option<&Value> {
246 self.entities.get(entity_key).map(|(_, v)| v)
247 }
248
249 pub fn ordered_keys(&mut self) -> &[String] {
251 if self.cache_dirty {
252 self.rebuild_keys_cache();
253 }
254 &self.keys_cache
255 }
256
257 pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
259 if self.cache_dirty {
260 self.rebuild_keys_cache();
261 }
262
263 self.keys_cache
264 .iter()
265 .skip(skip)
266 .take(take)
267 .filter_map(|key| {
268 self.entities
269 .get(key)
270 .map(|(_, v)| (key.clone(), v.clone()))
271 })
272 .collect()
273 }
274
275 pub fn compute_window_deltas(
277 &mut self,
278 old_window_keys: &[String],
279 skip: usize,
280 take: usize,
281 ) -> Vec<ViewDelta> {
282 if self.cache_dirty {
283 self.rebuild_keys_cache();
284 }
285
286 let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
287
288 let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
289 let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
290
291 let mut deltas = Vec::new();
292
293 for key in old_set.difference(&new_set) {
295 deltas.push(ViewDelta::Remove {
296 key: (*key).clone(),
297 });
298 }
299
300 for key in new_set.difference(&old_set) {
302 if let Some((_, entity)) = self.entities.get(*key) {
303 deltas.push(ViewDelta::Add {
304 key: (*key).clone(),
305 entity: entity.clone(),
306 });
307 }
308 }
309
310 deltas
311 }
312
313 fn extract_sort_value(&self, entity: &Value) -> SortValue {
314 let mut current = entity;
315 for segment in &self.sort_field {
316 match current.get(segment) {
317 Some(v) => current = v,
318 None => return SortValue::Null,
319 }
320 }
321
322 match self.order {
323 SortOrder::Asc => value_to_sort_value(current),
324 SortOrder::Desc => value_to_sort_value_desc(current),
325 }
326 }
327
328 fn find_position(&self, entity_key: &str) -> usize {
329 if let Some((sort_key, _)) = self.entities.get(entity_key) {
330 self.find_position_by_sort_key(sort_key)
331 } else {
332 0
333 }
334 }
335
336 fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
337 self.sorted.range(..sort_key).count()
338 }
339
340 fn rebuild_keys_cache(&mut self) {
341 self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
342 self.cache_dirty = false;
343 }
344}
345
346#[derive(Debug, Clone, PartialEq)]
348pub enum UpsertResult {
349 Inserted { position: usize },
351 Updated { position: usize },
353}
354
355fn value_to_sort_value(v: &Value) -> SortValue {
356 match v {
357 Value::Null => SortValue::Null,
358 Value::Bool(b) => SortValue::Bool(*b),
359 Value::Number(n) => {
360 if let Some(i) = n.as_i64() {
361 SortValue::Integer(i)
362 } else if let Some(f) = n.as_f64() {
363 SortValue::Float(OrderedFloat(f))
364 } else {
365 SortValue::Null
366 }
367 }
368 Value::String(s) => SortValue::String(s.clone()),
369 _ => SortValue::Null,
370 }
371}
372
373fn value_to_sort_value_desc(v: &Value) -> SortValue {
374 match v {
375 Value::Null => SortValue::Null,
376 Value::Bool(b) => SortValue::Bool(!*b),
377 Value::Number(n) => {
378 if let Some(i) = n.as_i64() {
379 SortValue::Integer(-i)
380 } else if let Some(f) = n.as_f64() {
381 SortValue::Float(OrderedFloat(-f))
382 } else {
383 SortValue::Null
384 }
385 }
386 Value::String(s) => {
387 SortValue::String(s.clone())
391 }
392 _ => SortValue::Null,
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use serde_json::json;
400
401 #[test]
402 fn test_sorted_cache_basic() {
403 let mut cache = SortedViewCache::new(
404 "test/latest".to_string(),
405 vec!["id".to_string()],
406 SortOrder::Desc,
407 );
408
409 cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
410 cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
411 cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
412
413 let keys = cache.ordered_keys();
414 assert_eq!(keys, vec!["b", "c", "a"]);
416 }
417
418 #[test]
419 fn test_sorted_cache_window() {
420 let mut cache = SortedViewCache::new(
421 "test/latest".to_string(),
422 vec!["id".to_string()],
423 SortOrder::Desc,
424 );
425
426 for i in 1..=10 {
427 cache.upsert(format!("e{}", i), json!({"id": i}));
428 }
429
430 let window = cache.get_window(0, 3);
432 assert_eq!(window.len(), 3);
433 assert_eq!(window[0].0, "e10");
434 assert_eq!(window[1].0, "e9");
435 assert_eq!(window[2].0, "e8");
436
437 let window = cache.get_window(3, 3);
438 assert_eq!(window[0].0, "e7");
439 }
440
441 #[test]
442 fn test_sorted_cache_update_moves_position() {
443 let mut cache = SortedViewCache::new(
444 "test/latest".to_string(),
445 vec!["score".to_string()],
446 SortOrder::Desc,
447 );
448
449 cache.upsert("a".to_string(), json!({"score": 10}));
450 cache.upsert("b".to_string(), json!({"score": 20}));
451 cache.upsert("c".to_string(), json!({"score": 15}));
452
453 assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
455
456 cache.upsert("a".to_string(), json!({"score": 25}));
458
459 assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
461 }
462
463 #[test]
464 fn test_sorted_cache_remove() {
465 let mut cache = SortedViewCache::new(
466 "test/latest".to_string(),
467 vec!["id".to_string()],
468 SortOrder::Asc,
469 );
470
471 cache.upsert("a".to_string(), json!({"id": 1}));
472 cache.upsert("b".to_string(), json!({"id": 2}));
473 cache.upsert("c".to_string(), json!({"id": 3}));
474
475 assert_eq!(cache.len(), 3);
476
477 let pos = cache.remove("b");
478 assert_eq!(pos, Some(1));
479 assert_eq!(cache.len(), 2);
480 assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
481 }
482
483 #[test]
484 fn test_compute_window_deltas() {
485 let mut cache = SortedViewCache::new(
486 "test/latest".to_string(),
487 vec!["id".to_string()],
488 SortOrder::Desc,
489 );
490
491 for i in 1..=5 {
493 cache.upsert(format!("e{}", i), json!({"id": i}));
494 }
495
496 let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
497
498 cache.upsert("e6".to_string(), json!({"id": 6}));
500
501 let deltas = cache.compute_window_deltas(&old_window, 0, 3);
504
505 assert_eq!(deltas.len(), 2);
506 assert!(deltas
508 .iter()
509 .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
510 assert!(deltas
512 .iter()
513 .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
514 }
515
516 #[test]
517 fn test_nested_sort_field() {
518 let mut cache = SortedViewCache::new(
519 "test/latest".to_string(),
520 vec!["id".to_string(), "round_id".to_string()],
521 SortOrder::Desc,
522 );
523
524 cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
525 cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
526 cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
527
528 let keys = cache.ordered_keys();
529 assert_eq!(keys, vec!["b", "c", "a"]);
530 }
531
532 #[test]
533 fn test_update_with_missing_sort_field_preserves_position() {
534 let mut cache = SortedViewCache::new(
535 "test/latest".to_string(),
536 vec!["id".to_string(), "round_id".to_string()],
537 SortOrder::Desc,
538 );
539
540 cache.upsert(
541 "100".to_string(),
542 json!({"id": {"round_id": 100}, "data": "initial"}),
543 );
544 cache.upsert(
545 "200".to_string(),
546 json!({"id": {"round_id": 200}, "data": "initial"}),
547 );
548 cache.upsert(
549 "300".to_string(),
550 json!({"id": {"round_id": 300}, "data": "initial"}),
551 );
552
553 assert_eq!(cache.ordered_keys(), vec!["300", "200", "100"]);
554
555 cache.upsert("200".to_string(), json!({"data": "updated_without_id"}));
556
557 assert_eq!(
558 cache.ordered_keys(),
559 vec!["300", "200", "100"],
560 "Entity 200 should retain its position even when updated without sort field"
561 );
562
563 let entity = cache.get("200").unwrap();
564 assert_eq!(entity["data"], "updated_without_id");
565 }
566
567 #[test]
568 fn test_new_entity_with_missing_sort_field_gets_null_position() {
569 let mut cache = SortedViewCache::new(
570 "test/latest".to_string(),
571 vec!["id".to_string(), "round_id".to_string()],
572 SortOrder::Desc,
573 );
574
575 cache.upsert("100".to_string(), json!({"id": {"round_id": 100}}));
576 cache.upsert("200".to_string(), json!({"id": {"round_id": 200}}));
577
578 cache.upsert("new".to_string(), json!({"data": "no_sort_field"}));
579
580 let keys = cache.ordered_keys();
581 assert_eq!(
582 keys.first().unwrap(),
583 "new",
584 "New entity without sort field gets Null which sorts first (Null < any value)"
585 );
586 }
587}