1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, HashMap};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SortKey {
17 sort_value: SortValue,
19 entity_key: String,
21}
22
23impl PartialOrd for SortKey {
24 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
25 Some(self.cmp(other))
26 }
27}
28
29impl Ord for SortKey {
30 fn cmp(&self, other: &Self) -> Ordering {
31 match self.sort_value.cmp(&other.sort_value) {
32 Ordering::Equal => self.entity_key.cmp(&other.entity_key),
33 other => other,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum SortValue {
41 Null,
42 Bool(bool),
43 Integer(i64),
44 Float(OrderedFloat),
45 String(String),
46}
47
48impl Ord for SortValue {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self, other) {
51 (SortValue::Null, SortValue::Null) => Ordering::Equal,
52 (SortValue::Null, _) => Ordering::Less,
53 (_, SortValue::Null) => Ordering::Greater,
54 (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
55 (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
56 (SortValue::Float(a), SortValue::Float(b)) => a.cmp(b),
57 (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
58 (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
60 (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
61 (SortValue::Float(_), SortValue::String(_)) => Ordering::Less,
62 (SortValue::String(_), SortValue::Float(_)) => Ordering::Greater,
63 (SortValue::Integer(a), SortValue::Float(b)) => OrderedFloat(*a as f64).cmp(b),
65 (SortValue::Float(a), SortValue::Integer(b)) => a.cmp(&OrderedFloat(*b as f64)),
66 (SortValue::Bool(_), _) => Ordering::Less,
68 (_, SortValue::Bool(_)) => Ordering::Greater,
69 }
70 }
71}
72
73impl PartialOrd for SortValue {
74 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq)]
81pub struct OrderedFloat(pub f64);
82
83impl Eq for OrderedFloat {}
84
85impl Ord for OrderedFloat {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.0.partial_cmp(&other.0).unwrap_or_else(|| {
88 if self.0.is_nan() && other.0.is_nan() {
89 Ordering::Equal
90 } else if self.0.is_nan() {
91 Ordering::Less
92 } else {
93 Ordering::Greater
94 }
95 })
96 }
97}
98
99impl PartialOrd for OrderedFloat {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 Some(self.cmp(other))
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108pub enum SortOrder {
109 Asc,
110 Desc,
111}
112
113impl From<crate::materialized_view::SortOrder> for SortOrder {
114 fn from(order: crate::materialized_view::SortOrder) -> Self {
115 match order {
116 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
117 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
118 }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq)]
124pub enum ViewDelta {
125 None,
127 Add { key: String, entity: Value },
129 Remove { key: String },
131 Update { key: String, entity: Value },
133}
134
135#[derive(Debug)]
137pub struct SortedViewCache {
138 view_id: String,
140 sort_field: Vec<String>,
142 order: SortOrder,
144 sorted: BTreeMap<SortKey, ()>,
146 entities: HashMap<String, (SortKey, Value)>,
148 keys_cache: Vec<String>,
150 cache_dirty: bool,
152}
153
154impl SortedViewCache {
155 pub fn new(view_id: String, sort_field: Vec<String>, order: SortOrder) -> Self {
156 Self {
157 view_id,
158 sort_field,
159 order,
160 sorted: BTreeMap::new(),
161 entities: HashMap::new(),
162 keys_cache: Vec::new(),
163 cache_dirty: true,
164 }
165 }
166
167 pub fn view_id(&self) -> &str {
168 &self.view_id
169 }
170
171 pub fn len(&self) -> usize {
172 self.entities.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
176 self.entities.is_empty()
177 }
178
179 pub fn upsert(&mut self, entity_key: String, entity: Value) -> UpsertResult {
181 let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
182
183 if debug_computed {
184 if let Some(results) = entity.get("results") {
185 if results.get("rng").is_some() {
186 tracing::warn!(
187 "[SORTED_CACHE_UPSERT] view={} key={} entity.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
188 self.view_id,
189 entity_key,
190 results.get("rng"),
191 results.get("winning_square"),
192 results.get("did_hit_motherlode")
193 );
194 }
195 }
196 }
197
198 let sort_value = self.extract_sort_value(&entity);
199
200 if let Some((old_sort_key, _)) = self.entities.get(&entity_key).cloned() {
202 let effective_sort_value = if matches!(sort_value, SortValue::Null)
203 && !matches!(old_sort_key.sort_value, SortValue::Null)
204 {
205 old_sort_key.sort_value.clone()
206 } else {
207 sort_value
208 };
209
210 let new_sort_key = SortKey {
211 sort_value: effective_sort_value,
212 entity_key: entity_key.clone(),
213 };
214
215 if old_sort_key == new_sort_key {
216 self.entities
218 .insert(entity_key.clone(), (new_sort_key, entity));
219 let position = self.find_position(&entity_key);
221 return UpsertResult::Updated { position };
222 }
223
224 self.sorted.remove(&old_sort_key);
226 self.sorted.insert(new_sort_key.clone(), ());
227 self.entities
228 .insert(entity_key.clone(), (new_sort_key, entity));
229 self.cache_dirty = true;
230
231 let position = self.find_position(&entity_key);
232 return UpsertResult::Inserted { position };
233 }
234
235 let new_sort_key = SortKey {
236 sort_value,
237 entity_key: entity_key.clone(),
238 };
239
240 self.sorted.insert(new_sort_key.clone(), ());
241 self.entities
242 .insert(entity_key.clone(), (new_sort_key, entity));
243 self.cache_dirty = true;
244
245 let position = self.find_position(&entity_key);
246 UpsertResult::Inserted { position }
247 }
248
249 pub fn remove(&mut self, entity_key: &str) -> Option<usize> {
251 if let Some((sort_key, _)) = self.entities.remove(entity_key) {
252 let position = self.find_position_by_sort_key(&sort_key);
253 self.sorted.remove(&sort_key);
254 self.cache_dirty = true;
255 Some(position)
256 } else {
257 None
258 }
259 }
260
261 pub fn get(&self, entity_key: &str) -> Option<&Value> {
263 self.entities.get(entity_key).map(|(_, v)| v)
264 }
265
266 pub fn ordered_keys(&mut self) -> &[String] {
268 if self.cache_dirty {
269 self.rebuild_keys_cache();
270 }
271 &self.keys_cache
272 }
273
274 pub fn get_window(&mut self, skip: usize, take: usize) -> Vec<(String, Value)> {
276 if self.cache_dirty {
277 self.rebuild_keys_cache();
278 }
279
280 self.keys_cache
281 .iter()
282 .skip(skip)
283 .take(take)
284 .filter_map(|key| {
285 self.entities
286 .get(key)
287 .map(|(_, v)| (key.clone(), v.clone()))
288 })
289 .collect()
290 }
291
292 pub fn compute_window_deltas(
294 &mut self,
295 old_window_keys: &[String],
296 skip: usize,
297 take: usize,
298 ) -> Vec<ViewDelta> {
299 if self.cache_dirty {
300 self.rebuild_keys_cache();
301 }
302
303 let new_window_keys: Vec<&String> = self.keys_cache.iter().skip(skip).take(take).collect();
304
305 let old_set: std::collections::HashSet<&String> = old_window_keys.iter().collect();
306 let new_set: std::collections::HashSet<&String> = new_window_keys.iter().cloned().collect();
307
308 let mut deltas = Vec::new();
309
310 for key in old_set.difference(&new_set) {
312 deltas.push(ViewDelta::Remove {
313 key: (*key).clone(),
314 });
315 }
316
317 for key in new_set.difference(&old_set) {
319 if let Some((_, entity)) = self.entities.get(*key) {
320 deltas.push(ViewDelta::Add {
321 key: (*key).clone(),
322 entity: entity.clone(),
323 });
324 }
325 }
326
327 deltas
328 }
329
330 fn extract_sort_value(&self, entity: &Value) -> SortValue {
331 let mut current = entity;
332 for segment in &self.sort_field {
333 match current.get(segment) {
334 Some(v) => current = v,
335 None => return SortValue::Null,
336 }
337 }
338
339 match self.order {
340 SortOrder::Asc => value_to_sort_value(current),
341 SortOrder::Desc => value_to_sort_value_desc(current),
342 }
343 }
344
345 fn find_position(&self, entity_key: &str) -> usize {
346 if let Some((sort_key, _)) = self.entities.get(entity_key) {
347 self.find_position_by_sort_key(sort_key)
348 } else {
349 0
350 }
351 }
352
353 fn find_position_by_sort_key(&self, sort_key: &SortKey) -> usize {
354 self.sorted.range(..sort_key).count()
355 }
356
357 fn rebuild_keys_cache(&mut self) {
358 self.keys_cache = self.sorted.keys().map(|sk| sk.entity_key.clone()).collect();
359 self.cache_dirty = false;
360 }
361}
362
363#[derive(Debug, Clone, PartialEq)]
365pub enum UpsertResult {
366 Inserted { position: usize },
368 Updated { position: usize },
370}
371
372fn value_to_sort_value(v: &Value) -> SortValue {
373 match v {
374 Value::Null => SortValue::Null,
375 Value::Bool(b) => SortValue::Bool(*b),
376 Value::Number(n) => {
377 if let Some(i) = n.as_i64() {
378 SortValue::Integer(i)
379 } else if let Some(f) = n.as_f64() {
380 SortValue::Float(OrderedFloat(f))
381 } else {
382 SortValue::Null
383 }
384 }
385 Value::String(s) => SortValue::String(s.clone()),
386 _ => SortValue::Null,
387 }
388}
389
390fn value_to_sort_value_desc(v: &Value) -> SortValue {
391 match v {
392 Value::Null => SortValue::Null,
393 Value::Bool(b) => SortValue::Bool(!*b),
394 Value::Number(n) => {
395 if let Some(i) = n.as_i64() {
396 SortValue::Integer(-i)
397 } else if let Some(f) = n.as_f64() {
398 SortValue::Float(OrderedFloat(-f))
399 } else {
400 SortValue::Null
401 }
402 }
403 Value::String(s) => {
404 SortValue::String(s.clone())
408 }
409 _ => SortValue::Null,
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use serde_json::json;
417
418 #[test]
419 fn test_sorted_cache_basic() {
420 let mut cache = SortedViewCache::new(
421 "test/latest".to_string(),
422 vec!["id".to_string()],
423 SortOrder::Desc,
424 );
425
426 cache.upsert("a".to_string(), json!({"id": 1, "name": "first"}));
427 cache.upsert("b".to_string(), json!({"id": 3, "name": "third"}));
428 cache.upsert("c".to_string(), json!({"id": 2, "name": "second"}));
429
430 let keys = cache.ordered_keys();
431 assert_eq!(keys, vec!["b", "c", "a"]);
433 }
434
435 #[test]
436 fn test_sorted_cache_window() {
437 let mut cache = SortedViewCache::new(
438 "test/latest".to_string(),
439 vec!["id".to_string()],
440 SortOrder::Desc,
441 );
442
443 for i in 1..=10 {
444 cache.upsert(format!("e{}", i), json!({"id": i}));
445 }
446
447 let window = cache.get_window(0, 3);
449 assert_eq!(window.len(), 3);
450 assert_eq!(window[0].0, "e10");
451 assert_eq!(window[1].0, "e9");
452 assert_eq!(window[2].0, "e8");
453
454 let window = cache.get_window(3, 3);
455 assert_eq!(window[0].0, "e7");
456 }
457
458 #[test]
459 fn test_sorted_cache_update_moves_position() {
460 let mut cache = SortedViewCache::new(
461 "test/latest".to_string(),
462 vec!["score".to_string()],
463 SortOrder::Desc,
464 );
465
466 cache.upsert("a".to_string(), json!({"score": 10}));
467 cache.upsert("b".to_string(), json!({"score": 20}));
468 cache.upsert("c".to_string(), json!({"score": 15}));
469
470 assert_eq!(cache.ordered_keys(), vec!["b", "c", "a"]);
472
473 cache.upsert("a".to_string(), json!({"score": 25}));
475
476 assert_eq!(cache.ordered_keys(), vec!["a", "b", "c"]);
478 }
479
480 #[test]
481 fn test_sorted_cache_remove() {
482 let mut cache = SortedViewCache::new(
483 "test/latest".to_string(),
484 vec!["id".to_string()],
485 SortOrder::Asc,
486 );
487
488 cache.upsert("a".to_string(), json!({"id": 1}));
489 cache.upsert("b".to_string(), json!({"id": 2}));
490 cache.upsert("c".to_string(), json!({"id": 3}));
491
492 assert_eq!(cache.len(), 3);
493
494 let pos = cache.remove("b");
495 assert_eq!(pos, Some(1));
496 assert_eq!(cache.len(), 2);
497 assert_eq!(cache.ordered_keys(), vec!["a", "c"]);
498 }
499
500 #[test]
501 fn test_compute_window_deltas() {
502 let mut cache = SortedViewCache::new(
503 "test/latest".to_string(),
504 vec!["id".to_string()],
505 SortOrder::Desc,
506 );
507
508 for i in 1..=5 {
510 cache.upsert(format!("e{}", i), json!({"id": i}));
511 }
512
513 let old_window: Vec<String> = vec!["e5".to_string(), "e4".to_string(), "e3".to_string()];
514
515 cache.upsert("e6".to_string(), json!({"id": 6}));
517
518 let deltas = cache.compute_window_deltas(&old_window, 0, 3);
521
522 assert_eq!(deltas.len(), 2);
523 assert!(deltas
525 .iter()
526 .any(|d| matches!(d, ViewDelta::Remove { key } if key == "e3")));
527 assert!(deltas
529 .iter()
530 .any(|d| matches!(d, ViewDelta::Add { key, .. } if key == "e6")));
531 }
532
533 #[test]
534 fn test_nested_sort_field() {
535 let mut cache = SortedViewCache::new(
536 "test/latest".to_string(),
537 vec!["id".to_string(), "round_id".to_string()],
538 SortOrder::Desc,
539 );
540
541 cache.upsert("a".to_string(), json!({"id": {"round_id": 1}}));
542 cache.upsert("b".to_string(), json!({"id": {"round_id": 3}}));
543 cache.upsert("c".to_string(), json!({"id": {"round_id": 2}}));
544
545 let keys = cache.ordered_keys();
546 assert_eq!(keys, vec!["b", "c", "a"]);
547 }
548
549 #[test]
550 fn test_update_with_missing_sort_field_preserves_position() {
551 let mut cache = SortedViewCache::new(
552 "test/latest".to_string(),
553 vec!["id".to_string(), "round_id".to_string()],
554 SortOrder::Desc,
555 );
556
557 cache.upsert(
558 "100".to_string(),
559 json!({"id": {"round_id": 100}, "data": "initial"}),
560 );
561 cache.upsert(
562 "200".to_string(),
563 json!({"id": {"round_id": 200}, "data": "initial"}),
564 );
565 cache.upsert(
566 "300".to_string(),
567 json!({"id": {"round_id": 300}, "data": "initial"}),
568 );
569
570 assert_eq!(cache.ordered_keys(), vec!["300", "200", "100"]);
571
572 cache.upsert("200".to_string(), json!({"data": "updated_without_id"}));
573
574 assert_eq!(
575 cache.ordered_keys(),
576 vec!["300", "200", "100"],
577 "Entity 200 should retain its position even when updated without sort field"
578 );
579
580 let entity = cache.get("200").unwrap();
581 assert_eq!(entity["data"], "updated_without_id");
582 }
583
584 #[test]
585 fn test_new_entity_with_missing_sort_field_gets_null_position() {
586 let mut cache = SortedViewCache::new(
587 "test/latest".to_string(),
588 vec!["id".to_string(), "round_id".to_string()],
589 SortOrder::Desc,
590 );
591
592 cache.upsert("100".to_string(), json!({"id": {"round_id": 100}}));
593 cache.upsert("200".to_string(), json!({"id": {"round_id": 200}}));
594
595 cache.upsert("new".to_string(), json!({"data": "no_sort_field"}));
596
597 let keys = cache.ordered_keys();
598 assert_eq!(
599 keys.first().unwrap(),
600 "new",
601 "New entity without sort field gets Null which sorts first (Null < any value)"
602 );
603 }
604}