1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17 sort_value: SortValue,
19 entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for SortKey {
30 fn cmp(&self, other: &Self) -> Ordering {
31 match self.sort_value.cmp(&other.sort_value) {
32 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33 other => other,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41 Null,
42 Bool(bool),
43 Integer(i64),
44 Float(OrderedFloat),
45 String(String),
46}
47
48impl Ord for SortValue {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self, other) {
51 (SortValue::Null, SortValue::Null) => Ordering::Equal,
52 (SortValue::Null, _) => Ordering::Less,
53 (_, SortValue::Null) => Ordering::Greater,
54 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56 (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61 (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62 (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63 (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65 (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66 (SortValue::Bool(_), _) => Ordering::Less,
68 (_, SortValue::Bool(_)) => Ordering::Greater,
69 }
70 }
71}
72
73impl PartialOrd for SortValue {
74 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88 if self.0.is_nan() && other.0.is_nan() {
89 Ordering::Equal
90 } else if self.0.is_nan() {
91 Ordering::Less
92 } else {
93 Ordering::Greater
94 }
95 })
96 }
97}
98
99impl PartialOrd for OrderedFloat {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 Some(self.cmp(other))
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109 Asc,
110 Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114 fn from(order: crate::materialized_view::SortOrder) -> Self {
115 match order {
116 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125 None,
127 Add { key: String, entity: Value },
129 Remove { key: String },
131 Update { key: String, entity: Value },
133}
134
135#[derive(Debug)]
137pub struct SortedViewCache {
138 view_id: String,
140 sort_field: Vec<String>,
142 order: SortOrder,
144 sorted: BTreeMap<SortKey, ()>,
146 entities: HashMap<String, (SortKey, Value)>,
148 keys_cache: Vec<String>,
150 cache_dirty: bool,
152}
153
154impl SortedViewCache {
155 pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156 Self {
157 view_id,
158 sort_field,
159 order,
160 sorted: BTreeMap::new(),
161 entities: HashMap::new(),
162 keys_cache: Vec::new(),
163 cache_dirty: true,
164 }
165 }
166
167 pub fn view_id(&self) -> &str {
168 &self.view_id
169 }
170
171 pub fn len(&self) -> usize {
172 self.entities.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.entities.is_empty()
177 }
178
179 pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181 let sort_value = self.extract_sort_value(&entity);
182
183 if let Some((old_sort_key, old_entity)) = self.entities.get(&entity_key).cloned() {
185 let effective_sort_value = if matches!(sort_value, SortValue::Null)
186 && !matches!(old_sort_key.sort_value, SortValue::Null)
187 {
188 old_sort_key.sort_value.clone()
189 } else {
190 sort_value
191 };
192
193 let new_sort_key = SortKey {
194 sort_value: effective_sort_value,
195 entity_key: entity_key.clone(),
196 };
197
198 let merged_entity = Self::deep_merge(old_entity, entity);
200
201 if old_sort_key == new_sort_key {
202 self.entities
204 .insert(entity_key.clone(), (new_sort_key, merged_entity));
205 let position = self.find_position(&entity_key);
207 return UpsertResult::Updated { position };
208 }
209
210 self.sorted.remove(&old_sort_key);
212 self.sorted.insert(new_sort_key.clone(), ());
213 self.entities
214 .insert(entity_key.clone(), (new_sort_key, merged_entity));
215 self.cache_dirty = true;
216
217 let position = self.find_position(&entity_key);
218 return UpsertResult::Inserted { position };
219 }
220
221 let new_sort_key = SortKey {
222 sort_value,
223 entity_key: entity_key.clone(),
224 };
225
226 self.sorted.insert(new_sort_key.clone(), ());
227 self.entities
228 .insert(entity_key.clone(), (new_sort_key, entity));
229 self.cache_dirty = true;
230
231 let position = self.find_position(&entity_key);
232
233 UpsertResult::Inserted { position }
234 }
235
236 fn deep_merge(base: Value, patch: Value) -> Value {
237 match (base, patch) {
238 (Value::Object(mut base_map), Value::Object(patch_map)) => {
239 for (key, patch_value) in patch_map {
240 if let Some(base_value) = base_map.remove(&key) {
241 base_map.insert(key, Self::deep_merge(base_value, patch_value));
242 } else {
243 base_map.insert(key, patch_value);
244 }
245 }
246 Value::Object(base_map)
247 }
248 (_, patch) => patch,
249 }
250 }
251
252 pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
254 if let Some((sort_key, _)) = self.entities.remove(entity_key) {
255 let position = self.find_position_by_sort_key(&sort_key);
256 self.sorted.remove(&sort_key);
257 self.cache_dirty = true;
258 Some(position)
259 } else {
260 None
261 }
262 }
263
264 pub fn get(&self, entity_key: &str) -> Option<&Value> {
266 self.entities.get(entity_key).map(|(_, v)| v)
267 }
268
269 pub fn ordered_keys(&mut self) -> &[String] {
271 if self.cache_dirty {
272 self.rebuild_keys_cache();
273 }
274 &self.keys_cache
275 }
276
277 pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
279 if self.cache_dirty {
280 self.rebuild_keys_cache();
281 }
282
283 self.keys_cache
284 .iter()
285 .skip(skip)
286 .take(take)
287 .filter_map(|key| {
288 self.entities
289 .get(key)
290 .map(|(_, v)| (key.clone(), v.clone()))
291 })
292 .collect()
293 }
294
295 pub fn compute_window_deltas(
297 &mut self,
298 old_window_keys: &[String],
299 skip: usize,
300 take: usize,
301 ) -> Vec<ViewDelta> {
302 if self.cache_dirty {
303 self.rebuild_keys_cache();
304 }
305
306 let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
307
308 let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
309 let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
310
311 let mut deltas = Vec::new();
312
313 for key in old_set.difference(&new_set) {
315 deltas.push(ViewDelta::Remove {
316 key: (*key).clone(),
317 });
318 }
319
320 for key in new_set.difference(&old_set) {
322 if let Some((_, entity)) = self.entities.get(*key) {
323 deltas.push(ViewDelta::Add {
324 key: (*key).clone(),
325 entity: entity.clone(),
326 });
327 }
328 }
329
330 deltas
331 }
332
333 fn extract_sort_value(&self, entity: &Value) -> SortValue {
334 let mut current = entity;
335 for segment in &self.sort_field {
336 match current.get(segment) {
337 Some(v) => current = v,
338 None => return SortValue::Null,
339 }
340 }
341
342 match self.order {
343 SortOrder::Asc => value_to_sort_value(current),
344 SortOrder::Desc => value_to_sort_value_desc(current),
345 }
346 }
347
348 fn find_position(&self, entity_key: &str) -> usize {
349 if let Some((sort_key, _)) = self.entities.get(entity_key) {
350 self.find_position_by_sort_key(sort_key)
351 } else {
352 0
353 }
354 }
355
356 fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
357 self.sorted.range(..sort_key).count()
358 }
359
360 fn rebuild_keys_cache(&mut self) {
361 self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
362 self.cache_dirty = false;
363 }
364}
365
366#[derive(Debug, Clone, PartialEq)]
368pub enum UpsertResult {
369 Inserted { position: usize },
371 Updated { position: usize },
373}
374
375fn value_to_sort_value(v: &Value) -> SortValue {
376 match v {
377 Value::Null => SortValue::Null,
378 Value::Bool(b) => SortValue::Bool(*b),
379 Value::Number(n) => {
380 if let Some(i) = n.as_i64() {
381 SortValue::Integer(i)
382 } else if let Some(f) = n.as_f64() {
383 SortValue::Float(OrderedFloat(f))
384 } else {
385 SortValue::Null
386 }
387 }
388 Value::String(s) => SortValue::String(s.clone()),
389 _ => SortValue::Null,
390 }
391}
392
393fn value_to_sort_value_desc(v: &Value) -> SortValue {
394 match v {
395 Value::Null => SortValue::Null,
396 Value::Bool(b) => SortValue::Bool(!*b),
397 Value::Number(n) => {
398 if let Some(i) = n.as_i64() {
399 SortValue::Integer(-i)
400 } else if let Some(f) = n.as_f64() {
401 SortValue::Float(OrderedFloat(-f))
402 } else {
403 SortValue::Null
404 }
405 }
406 Value::String(s) => {
407 SortValue::String(s.clone())
411 }
412 _ => SortValue::Null,
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use serde_json::json;
420
421 #[test]
422 fn test_sorted_cache_basic() {
423 let mut cache = SortedViewCache::new(
424 "test/latest".to_string(),
425 vec!["id".to_string()],
426 SortOrder::Desc,
427 );
428
429 cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
430 cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
431 cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
432
433 let keys = cache.ordered_keys();
434 assert_eq!(keys, vec!["b", "c", "a"]);
436 }
437
438 #[test]
439 fn test_sorted_cache_window() {
440 let mut cache = SortedViewCache::new(
441 "test/latest".to_string(),
442 vec!["id".to_string()],
443 SortOrder::Desc,
444 );
445
446 for i in 1..=10 {
447 cache.upsert(format!("e{}", i), json!({"id": i}));
448 }
449
450 let window = cache.get_window(0, 3);
452 assert_eq!(window.len(), 3);
453 assert_eq!(window[0].0, "e10");
454 assert_eq!(window[1].0, "e9");
455 assert_eq!(window[2].0, "e8");
456
457 let window = cache.get_window(3, 3);
458 assert_eq!(window[0].0, "e7");
459 }
460
461 #[test]
462 fn test_sorted_cache_update_moves_position() {
463 let mut cache = SortedViewCache::new(
464 "test/latest".to_string(),
465 vec!["score".to_string()],
466 SortOrder::Desc,
467 );
468
469 cache.upsert("a".to_string(), json!({"score": 10}));
470 cache.upsert("b".to_string(), json!({"score": 20}));
471 cache.upsert("c".to_string(), json!({"score": 15}));
472
473 assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
475
476 cache.upsert("a".to_string(), json!({"score": 25}));
478
479 assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
481 }
482
483 #[test]
484 fn test_sorted_cache_remove() {
485 let mut cache = SortedViewCache::new(
486 "test/latest".to_string(),
487 vec!["id".to_string()],
488 SortOrder::Asc,
489 );
490
491 cache.upsert("a".to_string(), json!({"id": 1}));
492 cache.upsert("b".to_string(), json!({"id": 2}));
493 cache.upsert("c".to_string(), json!({"id": 3}));
494
495 assert_eq!(cache.len(), 3);
496
497 let pos = cache.remove("b");
498 assert_eq!(pos, Some(1));
499 assert_eq!(cache.len(), 2);
500 assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
501 }
502
503 #[test]
504 fn test_compute_window_deltas() {
505 let mut cache = SortedViewCache::new(
506 "test/latest".to_string(),
507 vec!["id".to_string()],
508 SortOrder::Desc,
509 );
510
511 for i in 1..=5 {
513 cache.upsert(format!("e{}", i), json!({"id": i}));
514 }
515
516 let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
517
518 cache.upsert("e6".to_string(), json!({"id": 6}));
520
521 let deltas = cache.compute_window_deltas(&old_window, 0, 3);
524
525 assert_eq!(deltas.len(), 2);
526 assert!(deltas
528 .iter()
529 .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
530 assert!(deltas
532 .iter()
533 .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
534 }
535
536 #[test]
537 fn test_nested_sort_field() {
538 let mut cache = SortedViewCache::new(
539 "test/latest".to_string(),
540 vec!["id".to_string(), "round_id".to_string()],
541 SortOrder::Desc,
542 );
543
544 cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
545 cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
546 cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
547
548 let keys = cache.ordered_keys();
549 assert_eq!(keys, vec!["b", "c", "a"]);
550 }
551
552 #[test]
553 fn test_update_with_missing_sort_field_preserves_position() {
554 let mut cache = SortedViewCache::new(
555 "test/latest".to_string(),
556 vec!["id".to_string(), "round_id".to_string()],
557 SortOrder::Desc,
558 );
559
560 cache.upsert(
561 "100".to_string(),
562 json!({"id": {"round_id": 100}, "data": "initial"}),
563 );
564 cache.upsert(
565 "200".to_string(),
566 json!({"id": {"round_id": 200}, "data": "initial"}),
567 );
568 cache.upsert(
569 "300".to_string(),
570 json!({"id": {"round_id": 300}, "data": "initial"}),
571 );
572
573 assert_eq!(cache.ordered_keys(), vec!["300", "200", "100"]);
574
575 cache.upsert("200".to_string(), json!({"data": "updated_without_id"}));
576
577 assert_eq!(
578 cache.ordered_keys(),
579 vec!["300", "200", "100"],
580 "Entity 200 should retain its position even when updated without sort field"
581 );
582
583 let entity = cache.get("200").unwrap();
584 assert_eq!(entity["data"], "updated_without_id");
585 }
586
587 #[test]
588 fn test_new_entity_with_missing_sort_field_gets_null_position() {
589 let mut cache = SortedViewCache::new(
590 "test/latest".to_string(),
591 vec!["id".to_string(), "round_id".to_string()],
592 SortOrder::Desc,
593 );
594
595 cache.upsert("100".to_string(), json!({"id": {"round_id": 100}}));
596 cache.upsert("200".to_string(), json!({"id": {"round_id": 200}}));
597
598 cache.upsert("new".to_string(), json!({"data": "no_sort_field"}));
599
600 let keys = cache.ordered_keys();
601 assert_eq!(
602 keys.first().unwrap(),
603 "new",
604 "New entity without sort field gets Null which sorts first (Null < any value)"
605 );
606 }
607}