Skip to main content

hyperstack_server/
sorted_cache.rs

1//! Sorted view cache for maintaining ordered entity collections.
2//!
3//! This module provides incremental maintenance of sorted entity views,
4//! enabling efficient windowed subscriptions (take/skip) with minimal
5//! recomputation on updates.
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12/// A sortable key that combines the sort value with entity key for stable ordering.
13/// Uses (sort_value, entity_key) tuple to ensure deterministic ordering even when
14/// sort values are equal.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17    /// The extracted sort value (as comparable bytes)
18    sort_value: SortValue,
19    /// Entity key for tie-breaking
20    entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25        Some(self.cmp(other))
26    }
27}
28
29impl Ord for SortKey {
30    fn cmp(&self, other: &Self) -> Ordering {
31        match self.sort_value.cmp(&other.sort_value) {
32            Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33            other => other,
34        }
35    }
36}
37
38/// Comparable sort value extracted from JSON
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41    Null,
42    Bool(bool),
43    Integer(i64),
44    Float(OrderedFloat),
45    String(String),
46}
47
48impl Ord for SortValue {
49    fn cmp(&self, other: &Self) -> Ordering {
50        match (self, other) {
51            (SortValue::Null, SortValue::Null) => Ordering::Equal,
52            (SortValue::Null, _) => Ordering::Less,
53            (_, SortValue::Null) => Ordering::Greater,
54            (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55            (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56            (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57            (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58            // Cross-type comparisons: numbers < strings
59            (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60            (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61            (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62            (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63            // Integer vs Float: convert to float
64            (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65            (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66            // Bool vs others
67            (SortValue::Bool(_), _) => Ordering::Less,
68            (_, SortValue::Bool(_)) => Ordering::Greater,
69        }
70    }
71}
72
73impl PartialOrd for SortValue {
74    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75        Some(self.cmp(other))
76    }
77}
78
79/// Wrapper for f64 that implements Ord (treats NaN as less than all values)
80#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86    fn cmp(&self, other: &Self) -> Ordering {
87        self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88            if self.0.is_nan() && other.0.is_nan() {
89                Ordering::Equal
90            } else if self.0.is_nan() {
91                Ordering::Less
92            } else {
93                Ordering::Greater
94            }
95        })
96    }
97}
98
99impl PartialOrd for OrderedFloat {
100    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101        Some(self.cmp(other))
102    }
103}
104
105/// Sort order for the cache
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109    Asc,
110    Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114    fn from(order: crate::materialized_view::SortOrder) -> Self {
115        match order {
116            crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117            crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118        }
119    }
120}
121
122/// Delta representing a change to a client's windowed view
123#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125    /// No change to the client's window
126    None,
127    /// Entity was added to the window
128    Add { key: String, entity: Value },
129    /// Entity was removed from the window
130    Remove { key: String },
131    /// Entity in the window was updated
132    Update { key: String, entity: Value },
133}
134
135/// Sorted view cache maintaining entities in sort order
136#[derive(Debug)]
137pub struct SortedViewCache {
138    /// View identifier
139    view_id: String,
140    /// Field path to sort by (e.g., ["id", "round_id"])
141    sort_field: Vec<String>,
142    /// Sort order
143    order: SortOrder,
144    /// Sorted entries: SortKey -> entity_key (for iteration in order)
145    sorted: BTreeMap<SortKey, ()>,
146    /// Entity data: entity_key -> (SortKey, Value)
147    entities: HashMap<String, (SortKey, Value)>,
148    /// Ordered keys cache (rebuilt on structural changes)
149    keys_cache: Vec<String>,
150    /// Whether keys_cache needs rebuild
151    cache_dirty: bool,
152}
153
154impl SortedViewCache {
155    pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156        Self {
157            view_id,
158            sort_field,
159            order,
160            sorted: BTreeMap::new(),
161            entities: HashMap::new(),
162            keys_cache: Vec::new(),
163            cache_dirty: true,
164        }
165    }
166
167    pub fn view_id(&self) -> &str {
168        &self.view_id
169    }
170
171    pub fn len(&self) -> usize {
172        self.entities.len()
173    }
174
175    pub fn is_empty(&self) -> bool {
176        self.entities.is_empty()
177    }
178
179    /// Insert or update an entity, returns the position where it was inserted
180    pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181        let sort_value = self.extract_sort_value(&entity);
182
183        // Check if entity already exists
184        if let Some((old_sort_key, old_entity)) = self.entities.get(&entity_key).cloned() {
185            let effective_sort_value = if matches!(sort_value, SortValue::Null)
186                && !matches!(old_sort_key.sort_value, SortValue::Null)
187            {
188                old_sort_key.sort_value.clone()
189            } else {
190                sort_value
191            };
192
193            let new_sort_key = SortKey {
194                sort_value: effective_sort_value,
195                entity_key: entity_key.clone(),
196            };
197
198            // Merge incoming entity with existing to preserve fields not in the update
199            let merged_entity = Self::deep_merge(old_entity, entity);
200
201            if old_sort_key == new_sort_key {
202                // Sort key unchanged - just update entity data
203                self.entities
204                    .insert(entity_key.clone(), (new_sort_key, merged_entity));
205                // Position unchanged, no structural change
206                let position = self.find_position(&entity_key);
207                return UpsertResult::Updated { position };
208            }
209
210            // Sort key changed - need to reposition
211            self.sorted.remove(&old_sort_key);
212            self.sorted.insert(new_sort_key.clone(), ());
213            self.entities
214                .insert(entity_key.clone(), (new_sort_key, merged_entity));
215            self.cache_dirty = true;
216
217            let position = self.find_position(&entity_key);
218            return UpsertResult::Inserted { position };
219        }
220
221        let new_sort_key = SortKey {
222            sort_value,
223            entity_key: entity_key.clone(),
224        };
225
226        self.sorted.insert(new_sort_key.clone(), ());
227        self.entities
228            .insert(entity_key.clone(), (new_sort_key, entity));
229        self.cache_dirty = true;
230
231        let position = self.find_position(&entity_key);
232
233        UpsertResult::Inserted { position }
234    }
235
236    fn deep_merge(base: Value, patch: Value) -> Value {
237        match (base, patch) {
238            (Value::Object(mut base_map), Value::Object(patch_map)) => {
239                for (key, patch_value) in patch_map {
240                    if let Some(base_value) = base_map.remove(&key) {
241                        base_map.insert(key, Self::deep_merge(base_value, patch_value));
242                    } else {
243                        base_map.insert(key, patch_value);
244                    }
245                }
246                Value::Object(base_map)
247            }
248            (_, patch) => patch,
249        }
250    }
251
252    /// Remove an entity, returns the position it was at
253    pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
254        if let Some((sort_key, _)) = self.entities.remove(entity_key) {
255            let position = self.find_position_by_sort_key(&sort_key);
256            self.sorted.remove(&sort_key);
257            self.cache_dirty = true;
258            Some(position)
259        } else {
260            None
261        }
262    }
263
264    /// Get entity by key
265    pub fn get(&self, entity_key: &str) -> Option<&Value> {
266        self.entities.get(entity_key).map(|(_, v)| v)
267    }
268
269    /// Get ordered keys (rebuilds cache if dirty)
270    pub fn ordered_keys(&mut self) -> &[String] {
271        if self.cache_dirty {
272            self.rebuild_keys_cache();
273        }
274        &self.keys_cache
275    }
276
277    /// Get a window of entities
278    pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
279        if self.cache_dirty {
280            self.rebuild_keys_cache();
281        }
282
283        self.keys_cache
284            .iter()
285            .skip(skip)
286            .take(take)
287            .filter_map(|key| {
288                self.entities
289                    .get(key)
290                    .map(|(_, v)| (key.clone(), v.clone()))
291            })
292            .collect()
293    }
294
295    /// Compute deltas for a client with a specific window
296    pub fn compute_window_deltas(
297        &mut self,
298        old_window_keys: &[String],
299        skip: usize,
300        take: usize,
301    ) -> Vec<ViewDelta> {
302        if self.cache_dirty {
303            self.rebuild_keys_cache();
304        }
305
306        let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
307
308        let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
309        let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
310
311        let mut deltas = Vec::new();
312
313        // Removed from window
314        for key in old_set.difference(&new_set) {
315            deltas.push(ViewDelta::Remove {
316                key: (*key).clone(),
317            });
318        }
319
320        // Added to window
321        for key in new_set.difference(&old_set) {
322            if let Some((_, entity)) = self.entities.get(*key) {
323                deltas.push(ViewDelta::Add {
324                    key: (*key).clone(),
325                    entity: entity.clone(),
326                });
327            }
328        }
329
330        deltas
331    }
332
333    fn extract_sort_value(&self, entity: &Value) -> SortValue {
334        let mut current = entity;
335        for segment in &self.sort_field {
336            match current.get(segment) {
337                Some(v) => current = v,
338                None => return SortValue::Null,
339            }
340        }
341
342        match self.order {
343            SortOrder::Asc => value_to_sort_value(current),
344            SortOrder::Desc => value_to_sort_value_desc(current),
345        }
346    }
347
348    fn find_position(&self, entity_key: &str) -> usize {
349        if let Some((sort_key, _)) = self.entities.get(entity_key) {
350            self.find_position_by_sort_key(sort_key)
351        } else {
352            0
353        }
354    }
355
356    fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
357        self.sorted.range(..sort_key).count()
358    }
359
360    fn rebuild_keys_cache(&mut self) {
361        self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
362        self.cache_dirty = false;
363    }
364}
365
366/// Result of an upsert operation
367#[derive(Debug, Clone, PartialEq)]
368pub enum UpsertResult {
369    /// Entity was inserted at a new position
370    Inserted { position: usize },
371    /// Entity was updated (may or may not have moved)
372    Updated { position: usize },
373}
374
375fn value_to_sort_value(v: &Value) -> SortValue {
376    match v {
377        Value::Null => SortValue::Null,
378        Value::Bool(b) => SortValue::Bool(*b),
379        Value::Number(n) => {
380            if let Some(i) = n.as_i64() {
381                SortValue::Integer(i)
382            } else if let Some(f) = n.as_f64() {
383                SortValue::Float(OrderedFloat(f))
384            } else {
385                SortValue::Null
386            }
387        }
388        Value::String(s) => SortValue::String(s.clone()),
389        _ => SortValue::Null,
390    }
391}
392
393fn value_to_sort_value_desc(v: &Value) -> SortValue {
394    match v {
395        Value::Null => SortValue::Null,
396        Value::Bool(b) => SortValue::Bool(!*b),
397        Value::Number(n) => {
398            if let Some(i) = n.as_i64() {
399                SortValue::Integer(-i)
400            } else if let Some(f) = n.as_f64() {
401                SortValue::Float(OrderedFloat(-f))
402            } else {
403                SortValue::Null
404            }
405        }
406        Value::String(s) => {
407            // For desc strings, we'd need a more complex approach
408            // For now, just negate won't work for strings
409            // We'll handle this at the comparison level instead
410            SortValue::String(s.clone())
411        }
412        _ => SortValue::Null,
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use serde_json::json;
420
421    #[test]
422    fn test_sorted_cache_basic() {
423        let mut cache = SortedViewCache::new(
424            "test/latest".to_string(),
425            vec!["id".to_string()],
426            SortOrder::Desc,
427        );
428
429        cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
430        cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
431        cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
432
433        let keys = cache.ordered_keys();
434        // Desc order: 3, 2, 1
435        assert_eq!(keys, vec!["b", "c", "a"]);
436    }
437
438    #[test]
439    fn test_sorted_cache_window() {
440        let mut cache = SortedViewCache::new(
441            "test/latest".to_string(),
442            vec!["id".to_string()],
443            SortOrder::Desc,
444        );
445
446        for i in 1..=10 {
447            cache.upsert(format!("e{}", i), json!({"id": i}));
448        }
449
450        // Desc order: 10, 9, 8, 7, 6, 5, 4, 3, 2, 1
451        let window = cache.get_window(0, 3);
452        assert_eq!(window.len(), 3);
453        assert_eq!(window[0].0, "e10");
454        assert_eq!(window[1].0, "e9");
455        assert_eq!(window[2].0, "e8");
456
457        let window = cache.get_window(3, 3);
458        assert_eq!(window[0].0, "e7");
459    }
460
461    #[test]
462    fn test_sorted_cache_update_moves_position() {
463        let mut cache = SortedViewCache::new(
464            "test/latest".to_string(),
465            vec!["score".to_string()],
466            SortOrder::Desc,
467        );
468
469        cache.upsert("a".to_string(), json!({"score": 10}));
470        cache.upsert("b".to_string(), json!({"score": 20}));
471        cache.upsert("c".to_string(), json!({"score": 15}));
472
473        // Order: b(20), c(15), a(10)
474        assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
475
476        // Update a to have highest score
477        cache.upsert("a".to_string(), json!({"score": 25}));
478
479        // New order: a(25), b(20), c(15)
480        assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
481    }
482
483    #[test]
484    fn test_sorted_cache_remove() {
485        let mut cache = SortedViewCache::new(
486            "test/latest".to_string(),
487            vec!["id".to_string()],
488            SortOrder::Asc,
489        );
490
491        cache.upsert("a".to_string(), json!({"id": 1}));
492        cache.upsert("b".to_string(), json!({"id": 2}));
493        cache.upsert("c".to_string(), json!({"id": 3}));
494
495        assert_eq!(cache.len(), 3);
496
497        let pos = cache.remove("b");
498        assert_eq!(pos, Some(1));
499        assert_eq!(cache.len(), 2);
500        assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
501    }
502
503    #[test]
504    fn test_compute_window_deltas() {
505        let mut cache = SortedViewCache::new(
506            "test/latest".to_string(),
507            vec!["id".to_string()],
508            SortOrder::Desc,
509        );
510
511        // Initial: 5, 4, 3, 2, 1
512        for i in 1..=5 {
513            cache.upsert(format!("e{}", i), json!({"id": i}));
514        }
515
516        let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
517
518        // Add e6 (new top)
519        cache.upsert("e6".to_string(), json!({"id": 6}));
520
521        // New order: 6, 5, 4, 3, 2, 1
522        // New top 3: e6, e5, e4
523        let deltas = cache.compute_window_deltas(&old_window, 0, 3);
524
525        assert_eq!(deltas.len(), 2);
526        // e3 removed from window
527        assert!(deltas
528            .iter()
529            .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
530        // e6 added to window
531        assert!(deltas
532            .iter()
533            .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
534    }
535
536    #[test]
537    fn test_nested_sort_field() {
538        let mut cache = SortedViewCache::new(
539            "test/latest".to_string(),
540            vec!["id".to_string(), "round_id".to_string()],
541            SortOrder::Desc,
542        );
543
544        cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
545        cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
546        cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
547
548        let keys = cache.ordered_keys();
549        assert_eq!(keys, vec!["b", "c", "a"]);
550    }
551
552    #[test]
553    fn test_update_with_missing_sort_field_preserves_position() {
554        let mut cache = SortedViewCache::new(
555            "test/latest".to_string(),
556            vec!["id".to_string(), "round_id".to_string()],
557            SortOrder::Desc,
558        );
559
560        cache.upsert(
561            "100".to_string(),
562            json!({"id": {"round_id": 100}, "data": "initial"}),
563        );
564        cache.upsert(
565            "200".to_string(),
566            json!({"id": {"round_id": 200}, "data": "initial"}),
567        );
568        cache.upsert(
569            "300".to_string(),
570            json!({"id": {"round_id": 300}, "data": "initial"}),
571        );
572
573        assert_eq!(cache.ordered_keys(), vec!["300", "200", "100"]);
574
575        cache.upsert("200".to_string(), json!({"data": "updated_without_id"}));
576
577        assert_eq!(
578            cache.ordered_keys(),
579            vec!["300", "200", "100"],
580            "Entity 200 should retain its position even when updated without sort field"
581        );
582
583        let entity = cache.get("200").unwrap();
584        assert_eq!(entity["data"], "updated_without_id");
585    }
586
587    #[test]
588    fn test_new_entity_with_missing_sort_field_gets_null_position() {
589        let mut cache = SortedViewCache::new(
590            "test/latest".to_string(),
591            vec!["id".to_string(), "round_id".to_string()],
592            SortOrder::Desc,
593        );
594
595        cache.upsert("100".to_string(), json!({"id": {"round_id": 100}}));
596        cache.upsert("200".to_string(), json!({"id": {"round_id": 200}}));
597
598        cache.upsert("new".to_string(), json!({"data": "no_sort_field"}));
599
600        let keys = cache.ordered_keys();
601        assert_eq!(
602            keys.first().unwrap(),
603            "new",
604            "New entity without sort field gets Null which sorts first (Null < any value)"
605        );
606    }
607}