1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17 sort_value: SortValue,
19 entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for SortKey {
30 fn cmp(&self, other: &Self) -> Ordering {
31 match self.sort_value.cmp(&other.sort_value) {
32 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33 other => other,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41 Null,
42 Bool(bool),
43 Integer(i64),
44 Float(OrderedFloat),
45 String(String),
46}
47
48impl Ord for SortValue {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self, other) {
51 (SortValue::Null, SortValue::Null) => Ordering::Equal,
52 (SortValue::Null, _) => Ordering::Less,
53 (_, SortValue::Null) => Ordering::Greater,
54 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56 (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61 (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62 (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63 (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65 (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66 (SortValue::Bool(_), _) => Ordering::Less,
68 (_, SortValue::Bool(_)) => Ordering::Greater,
69 }
70 }
71}
72
73impl PartialOrd for SortValue {
74 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88 if self.0.is_nan() && other.0.is_nan() {
89 Ordering::Equal
90 } else if self.0.is_nan() {
91 Ordering::Less
92 } else {
93 Ordering::Greater
94 }
95 })
96 }
97}
98
99impl PartialOrd for OrderedFloat {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 Some(self.cmp(other))
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109 Asc,
110 Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114 fn from(order: crate::materialized_view::SortOrder) -> Self {
115 match order {
116 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125 None,
127 Add { key: String, entity: Value },
129 Remove { key: String },
131 Update { key: String, entity: Value },
133}
134
135#[derive(Debug)]
137pub struct SortedViewCache {
138 view_id: String,
140 sort_field: Vec<String>,
142 order: SortOrder,
144 sorted: BTreeMap<SortKey, ()>,
146 entities: HashMap<String, (SortKey, Value)>,
148 keys_cache: Vec<String>,
150 cache_dirty: bool,
152}
153
154impl SortedViewCache {
155 pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156 Self {
157 view_id,
158 sort_field,
159 order,
160 sorted: BTreeMap::new(),
161 entities: HashMap::new(),
162 keys_cache: Vec::new(),
163 cache_dirty: true,
164 }
165 }
166
167 pub fn view_id(&self) -> &str {
168 &self.view_id
169 }
170
171 pub fn len(&self) -> usize {
172 self.entities.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.entities.is_empty()
177 }
178
179 pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181 let sort_value = self.extract_sort_value(&entity);
182
183 if let Some((old_sort_key, old_entity)) = self.entities.get(&entity_key).cloned() {
185 let effective_sort_value = if matches!(sort_value, SortValue::Null)
186 && !matches!(old_sort_key.sort_value, SortValue::Null)
187 {
188 old_sort_key.sort_value.clone()
189 } else {
190 sort_value
191 };
192
193 let new_sort_key = SortKey {
194 sort_value: effective_sort_value,
195 entity_key: entity_key.clone(),
196 };
197
198 let merged_entity = Self::deep_merge(old_entity, entity);
200
201 if old_sort_key == new_sort_key {
202 self.entities
204 .insert(entity_key.clone(), (new_sort_key, merged_entity));
205 let position = self.find_position(&entity_key);
207 return UpsertResult::Updated { position };
208 }
209
210 self.sorted.remove(&old_sort_key);
212 self.sorted.insert(new_sort_key.clone(), ());
213 self.entities
214 .insert(entity_key.clone(), (new_sort_key, merged_entity));
215 self.cache_dirty = true;
216
217 let position = self.find_position(&entity_key);
218 return UpsertResult::Inserted { position };
219 }
220
221 let new_sort_key = SortKey {
222 sort_value,
223 entity_key: entity_key.clone(),
224 };
225
226 self.sorted.insert(new_sort_key.clone(), ());
227 self.entities
228 .insert(entity_key.clone(), (new_sort_key, entity));
229 self.cache_dirty = true;
230
231 let position = self.find_position(&entity_key);
232 UpsertResult::Inserted { position }
233 }
234
235 fn deep_merge(base: Value, patch: Value) -> Value {
236 match (base, patch) {
237 (Value::Object(mut base_map), Value::Object(patch_map)) => {
238 for (key, patch_value) in patch_map {
239 if let Some(base_value) = base_map.remove(&key) {
240 base_map.insert(key, Self::deep_merge(base_value, patch_value));
241 } else {
242 base_map.insert(key, patch_value);
243 }
244 }
245 Value::Object(base_map)
246 }
247 (_, patch) => patch,
248 }
249 }
250
251 pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
253 if let Some((sort_key, _)) = self.entities.remove(entity_key) {
254 let position = self.find_position_by_sort_key(&sort_key);
255 self.sorted.remove(&sort_key);
256 self.cache_dirty = true;
257 Some(position)
258 } else {
259 None
260 }
261 }
262
263 pub fn get(&self, entity_key: &str) -> Option<&Value> {
265 self.entities.get(entity_key).map(|(_, v)| v)
266 }
267
268 pub fn ordered_keys(&mut self) -> &[String] {
270 if self.cache_dirty {
271 self.rebuild_keys_cache();
272 }
273 &self.keys_cache
274 }
275
276 pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
278 if self.cache_dirty {
279 self.rebuild_keys_cache();
280 }
281
282 self.keys_cache
283 .iter()
284 .skip(skip)
285 .take(take)
286 .filter_map(|key| {
287 self.entities
288 .get(key)
289 .map(|(_, v)| (key.clone(), v.clone()))
290 })
291 .collect()
292 }
293
294 pub fn compute_window_deltas(
296 &mut self,
297 old_window_keys: &[String],
298 skip: usize,
299 take: usize,
300 ) -> Vec<ViewDelta> {
301 if self.cache_dirty {
302 self.rebuild_keys_cache();
303 }
304
305 let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
306
307 let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
308 let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
309
310 let mut deltas = Vec::new();
311
312 for key in old_set.difference(&new_set) {
314 deltas.push(ViewDelta::Remove {
315 key: (*key).clone(),
316 });
317 }
318
319 for key in new_set.difference(&old_set) {
321 if let Some((_, entity)) = self.entities.get(*key) {
322 deltas.push(ViewDelta::Add {
323 key: (*key).clone(),
324 entity: entity.clone(),
325 });
326 }
327 }
328
329 deltas
330 }
331
332 fn extract_sort_value(&self, entity: &Value) -> SortValue {
333 let mut current = entity;
334 for segment in &self.sort_field {
335 match current.get(segment) {
336 Some(v) => current = v,
337 None => return SortValue::Null,
338 }
339 }
340
341 match self.order {
342 SortOrder::Asc => value_to_sort_value(current),
343 SortOrder::Desc => value_to_sort_value_desc(current),
344 }
345 }
346
347 fn find_position(&self, entity_key: &str) -> usize {
348 if let Some((sort_key, _)) = self.entities.get(entity_key) {
349 self.find_position_by_sort_key(sort_key)
350 } else {
351 0
352 }
353 }
354
355 fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
356 self.sorted.range(..sort_key).count()
357 }
358
359 fn rebuild_keys_cache(&mut self) {
360 self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
361 self.cache_dirty = false;
362 }
363}
364
365#[derive(Debug, Clone, PartialEq)]
367pub enum UpsertResult {
368 Inserted { position: usize },
370 Updated { position: usize },
372}
373
374fn value_to_sort_value(v: &Value) -> SortValue {
375 match v {
376 Value::Null => SortValue::Null,
377 Value::Bool(b) => SortValue::Bool(*b),
378 Value::Number(n) => {
379 if let Some(i) = n.as_i64() {
380 SortValue::Integer(i)
381 } else if let Some(f) = n.as_f64() {
382 SortValue::Float(OrderedFloat(f))
383 } else {
384 SortValue::Null
385 }
386 }
387 Value::String(s) => SortValue::String(s.clone()),
388 _ => SortValue::Null,
389 }
390}
391
392fn value_to_sort_value_desc(v: &Value) -> SortValue {
393 match v {
394 Value::Null => SortValue::Null,
395 Value::Bool(b) => SortValue::Bool(!*b),
396 Value::Number(n) => {
397 if let Some(i) = n.as_i64() {
398 SortValue::Integer(-i)
399 } else if let Some(f) = n.as_f64() {
400 SortValue::Float(OrderedFloat(-f))
401 } else {
402 SortValue::Null
403 }
404 }
405 Value::String(s) => {
406 SortValue::String(s.clone())
410 }
411 _ => SortValue::Null,
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use serde_json::json;
419
420 #[test]
421 fn test_sorted_cache_basic() {
422 let mut cache = SortedViewCache::new(
423 "test/latest".to_string(),
424 vec!["id".to_string()],
425 SortOrder::Desc,
426 );
427
428 cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
429 cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
430 cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
431
432 let keys = cache.ordered_keys();
433 assert_eq!(keys, vec!["b", "c", "a"]);
435 }
436
437 #[test]
438 fn test_sorted_cache_window() {
439 let mut cache = SortedViewCache::new(
440 "test/latest".to_string(),
441 vec!["id".to_string()],
442 SortOrder::Desc,
443 );
444
445 for i in 1..=10 {
446 cache.upsert(format!("e{}", i), json!({"id": i}));
447 }
448
449 let window = cache.get_window(0, 3);
451 assert_eq!(window.len(), 3);
452 assert_eq!(window[0].0, "e10");
453 assert_eq!(window[1].0, "e9");
454 assert_eq!(window[2].0, "e8");
455
456 let window = cache.get_window(3, 3);
457 assert_eq!(window[0].0, "e7");
458 }
459
460 #[test]
461 fn test_sorted_cache_update_moves_position() {
462 let mut cache = SortedViewCache::new(
463 "test/latest".to_string(),
464 vec!["score".to_string()],
465 SortOrder::Desc,
466 );
467
468 cache.upsert("a".to_string(), json!({"score": 10}));
469 cache.upsert("b".to_string(), json!({"score": 20}));
470 cache.upsert("c".to_string(), json!({"score": 15}));
471
472 assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
474
475 cache.upsert("a".to_string(), json!({"score": 25}));
477
478 assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
480 }
481
482 #[test]
483 fn test_sorted_cache_remove() {
484 let mut cache = SortedViewCache::new(
485 "test/latest".to_string(),
486 vec!["id".to_string()],
487 SortOrder::Asc,
488 );
489
490 cache.upsert("a".to_string(), json!({"id": 1}));
491 cache.upsert("b".to_string(), json!({"id": 2}));
492 cache.upsert("c".to_string(), json!({"id": 3}));
493
494 assert_eq!(cache.len(), 3);
495
496 let pos = cache.remove("b");
497 assert_eq!(pos, Some(1));
498 assert_eq!(cache.len(), 2);
499 assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
500 }
501
502 #[test]
503 fn test_compute_window_deltas() {
504 let mut cache = SortedViewCache::new(
505 "test/latest".to_string(),
506 vec!["id".to_string()],
507 SortOrder::Desc,
508 );
509
510 for i in 1..=5 {
512 cache.upsert(format!("e{}", i), json!({"id": i}));
513 }
514
515 let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
516
517 cache.upsert("e6".to_string(), json!({"id": 6}));
519
520 let deltas = cache.compute_window_deltas(&old_window, 0, 3);
523
524 assert_eq!(deltas.len(), 2);
525 assert!(deltas
527 .iter()
528 .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
529 assert!(deltas
531 .iter()
532 .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
533 }
534
535 #[test]
536 fn test_nested_sort_field() {
537 let mut cache = SortedViewCache::new(
538 "test/latest".to_string(),
539 vec!["id".to_string(), "round_id".to_string()],
540 SortOrder::Desc,
541 );
542
543 cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
544 cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
545 cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
546
547 let keys = cache.ordered_keys();
548 assert_eq!(keys, vec!["b", "c", "a"]);
549 }
550
551 #[test]
552 fn test_update_with_missing_sort_field_preserves_position() {
553 let mut cache = SortedViewCache::new(
554 "test/latest".to_string(),
555 vec!["id".to_string(), "round_id".to_string()],
556 SortOrder::Desc,
557 );
558
559 cache.upsert(
560 "100".to_string(),
561 json!({"id": {"round_id": 100}, "data": "initial"}),
562 );
563 cache.upsert(
564 "200".to_string(),
565 json!({"id": {"round_id": 200}, "data": "initial"}),
566 );
567 cache.upsert(
568 "300".to_string(),
569 json!({"id": {"round_id": 300}, "data": "initial"}),
570 );
571
572 assert_eq!(cache.ordered_keys(), vec!["300", "200", "100"]);
573
574 cache.upsert("200".to_string(), json!({"data": "updated_without_id"}));
575
576 assert_eq!(
577 cache.ordered_keys(),
578 vec!["300", "200", "100"],
579 "Entity 200 should retain its position even when updated without sort field"
580 );
581
582 let entity = cache.get("200").unwrap();
583 assert_eq!(entity["data"], "updated_without_id");
584 }
585
586 #[test]
587 fn test_new_entity_with_missing_sort_field_gets_null_position() {
588 let mut cache = SortedViewCache::new(
589 "test/latest".to_string(),
590 vec!["id".to_string(), "round_id".to_string()],
591 SortOrder::Desc,
592 );
593
594 cache.upsert("100".to_string(), json!({"id": {"round_id": 100}}));
595 cache.upsert("200".to_string(), json!({"id": {"round_id": 200}}));
596
597 cache.upsert("new".to_string(), json!({"data": "no_sort_field"}));
598
599 let keys = cache.ordered_keys();
600 assert_eq!(
601 keys.first().unwrap(),
602 "new",
603 "New entity without sort field gets Null which sorts first (Null < any value)"
604 );
605 }
606}