Skip to main content

hyperstack_sdk/
store.rs

1use crate::frame::{
2    parse_snapshot_entities, Frame, Operation, SortConfig, SortOrder, SubscribedFrame,
3};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use std::cmp::Ordering;
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::sync::Arc;
9use tokio::sync::{broadcast, watch, RwLock};
10
11/// Default maximum number of entries per view before LRU eviction kicks in.
12/// Set to 10,000 to provide a reasonable balance between memory usage and data retention.
13pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
14
15/// Configuration for the SharedStore.
16#[derive(Debug, Clone)]
17pub struct StoreConfig {
18    /// Maximum number of entries to keep per view. When exceeded, oldest entries
19    /// are evicted using LRU (Least Recently Used) strategy.
20    /// Set to `None` to disable size limiting (not recommended for long-running clients).
21    pub max_entries_per_view: Option<usize>,
22}
23
24impl Default for StoreConfig {
25    fn default() -> Self {
26        Self {
27            max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
28        }
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33struct SortKey {
34    sort_value: SortValue,
35    entity_key: String,
36}
37
38impl PartialOrd for SortKey {
39    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
40        Some(self.cmp(other))
41    }
42}
43
44impl Ord for SortKey {
45    fn cmp(&self, other: &Self) -> Ordering {
46        match self.sort_value.cmp(&other.sort_value) {
47            Ordering::Equal => self.entity_key.cmp(&other.entity_key),
48            other => other,
49        }
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum SortValue {
55    Null,
56    Bool(bool),
57    Integer(i64),
58    String(String),
59}
60
61impl Ord for SortValue {
62    fn cmp(&self, other: &Self) -> Ordering {
63        match (self, other) {
64            (SortValue::Null, SortValue::Null) => Ordering::Equal,
65            (SortValue::Null, _) => Ordering::Less,
66            (_, SortValue::Null) => Ordering::Greater,
67            (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
68            (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
69            (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
70            (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
71            (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
72            (SortValue::Bool(_), _) => Ordering::Less,
73            (_, SortValue::Bool(_)) => Ordering::Greater,
74        }
75    }
76}
77
78impl PartialOrd for SortValue {
79    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84fn extract_sort_value(entity: &Value, field_path: &[String]) -> SortValue {
85    let mut current = entity;
86    for segment in field_path {
87        match current.get(segment) {
88            Some(v) => current = v,
89            None => return SortValue::Null,
90        }
91    }
92
93    match current {
94        Value::Null => SortValue::Null,
95        Value::Bool(b) => SortValue::Bool(*b),
96        Value::Number(n) => {
97            if let Some(i) = n.as_i64() {
98                SortValue::Integer(i)
99            } else if let Some(f) = n.as_f64() {
100                SortValue::Integer(f as i64)
101            } else {
102                SortValue::Null
103            }
104        }
105        Value::String(s) => SortValue::String(s.clone()),
106        _ => SortValue::Null,
107    }
108}
109
110struct ViewData {
111    entities: HashMap<String, serde_json::Value>,
112    access_order: VecDeque<String>,
113    sort_config: Option<SortConfig>,
114    sorted_keys: BTreeMap<SortKey, ()>,
115}
116
117pub fn deep_merge_with_append(
118    target: &mut Value,
119    patch: &Value,
120    append_paths: &[String],
121    current_path: &str,
122) {
123    match (target, patch) {
124        (Value::Object(target_map), Value::Object(patch_map)) => {
125            for (key, patch_value) in patch_map {
126                let field_path = if current_path.is_empty() {
127                    key.clone()
128                } else {
129                    format!("{}.{}", current_path, key)
130                };
131                match target_map.get_mut(key) {
132                    Some(target_value) => {
133                        deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
134                    }
135                    None => {
136                        target_map.insert(key.clone(), patch_value.clone());
137                    }
138                }
139            }
140        }
141        (Value::Array(target_arr), Value::Array(patch_arr))
142            if append_paths.contains(&current_path.to_string()) =>
143        {
144            target_arr.extend(patch_arr.iter().cloned());
145        }
146        (target, patch) => {
147            *target = patch.clone();
148        }
149    }
150}
151
152#[derive(Debug, Clone)]
153pub struct StoreUpdate {
154    pub view: String,
155    pub key: String,
156    pub operation: Operation,
157    pub data: Option<serde_json::Value>,
158    pub previous: Option<serde_json::Value>,
159    /// The raw patch data for Patch operations (before merging into full state).
160    /// This allows consumers to see exactly what fields changed without diffing.
161    pub patch: Option<serde_json::Value>,
162}
163
164pub struct SharedStore {
165    views: Arc<RwLock<HashMap<String, ViewData>>>,
166    view_configs: Arc<RwLock<HashMap<String, SortConfig>>>,
167    updates_tx: broadcast::Sender<StoreUpdate>,
168    ready_views: Arc<RwLock<HashSet<String>>>,
169    ready_tx: watch::Sender<HashSet<String>>,
170    ready_rx: watch::Receiver<HashSet<String>>,
171    config: StoreConfig,
172}
173
174impl ViewData {
175    fn new() -> Self {
176        Self {
177            entities: HashMap::new(),
178            access_order: VecDeque::new(),
179            sort_config: None,
180            sorted_keys: BTreeMap::new(),
181        }
182    }
183
184    fn with_sort_config(sort_config: SortConfig) -> Self {
185        Self {
186            entities: HashMap::new(),
187            access_order: VecDeque::new(),
188            sort_config: Some(sort_config),
189            sorted_keys: BTreeMap::new(),
190        }
191    }
192
193    fn set_sort_config(&mut self, config: SortConfig) {
194        if self.sort_config.is_some() {
195            return;
196        }
197        self.sort_config = Some(config);
198        self.rebuild_sorted_keys();
199    }
200
201    fn rebuild_sorted_keys(&mut self) {
202        self.sorted_keys.clear();
203        if let Some(ref config) = self.sort_config {
204            for (key, value) in &self.entities {
205                let sort_value = extract_sort_value(value, &config.field);
206                let sort_key = SortKey {
207                    sort_value,
208                    entity_key: key.clone(),
209                };
210                self.sorted_keys.insert(sort_key, ());
211            }
212        }
213        self.access_order.clear();
214    }
215
216    fn touch(&mut self, key: &str) {
217        if self.sort_config.is_some() {
218            return;
219        }
220        self.access_order.retain(|k| k != key);
221        self.access_order.push_back(key.to_string());
222    }
223
224    fn insert(&mut self, key: String, value: serde_json::Value) {
225        if let Some(ref config) = self.sort_config {
226            if let Some(old_value) = self.entities.get(&key) {
227                let old_sort_value = extract_sort_value(old_value, &config.field);
228                let old_sort_key = SortKey {
229                    sort_value: old_sort_value,
230                    entity_key: key.clone(),
231                };
232                self.sorted_keys.remove(&old_sort_key);
233            }
234
235            let sort_value = extract_sort_value(&value, &config.field);
236            let sort_key = SortKey {
237                sort_value,
238                entity_key: key.clone(),
239            };
240            self.sorted_keys.insert(sort_key, ());
241        } else if !self.entities.contains_key(&key) {
242            self.access_order.push_back(key.clone());
243        } else {
244            self.touch(&key);
245        }
246        self.entities.insert(key, value);
247    }
248
249    fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
250        if let Some(ref config) = self.sort_config {
251            if let Some(value) = self.entities.get(key) {
252                let sort_value = extract_sort_value(value, &config.field);
253                let sort_key = SortKey {
254                    sort_value,
255                    entity_key: key.to_string(),
256                };
257                self.sorted_keys.remove(&sort_key);
258            }
259        } else {
260            self.access_order.retain(|k| k != key);
261        }
262        self.entities.remove(key)
263    }
264
265    fn evict_oldest(&mut self) -> Option<String> {
266        if self.sort_config.is_some() {
267            if let Some((sort_key, _)) = self
268                .sorted_keys
269                .iter()
270                .next_back()
271                .map(|(k, v)| (k.clone(), *v))
272            {
273                self.sorted_keys.remove(&sort_key);
274                self.entities.remove(&sort_key.entity_key);
275                return Some(sort_key.entity_key);
276            }
277            return None;
278        }
279
280        if let Some(oldest_key) = self.access_order.pop_front() {
281            self.entities.remove(&oldest_key);
282            Some(oldest_key)
283        } else {
284            None
285        }
286    }
287
288    fn len(&self) -> usize {
289        self.entities.len()
290    }
291
292    #[allow(dead_code)]
293    fn ordered_keys(&self) -> Vec<String> {
294        if let Some(ref config) = self.sort_config {
295            let keys: Vec<String> = self
296                .sorted_keys
297                .keys()
298                .map(|sk| sk.entity_key.clone())
299                .collect();
300            match config.order {
301                SortOrder::Asc => keys,
302                SortOrder::Desc => keys.into_iter().rev().collect(),
303            }
304        } else {
305            self.entities.keys().cloned().collect()
306        }
307    }
308
309    fn ordered_values(&self) -> Vec<serde_json::Value> {
310        if let Some(ref config) = self.sort_config {
311            let values: Vec<serde_json::Value> = self
312                .sorted_keys
313                .keys()
314                .filter_map(|sk| self.entities.get(&sk.entity_key).cloned())
315                .collect();
316            match config.order {
317                SortOrder::Asc => values,
318                SortOrder::Desc => values.into_iter().rev().collect(),
319            }
320        } else {
321            self.entities.values().cloned().collect()
322        }
323    }
324}
325
326impl SharedStore {
327    pub fn new() -> Self {
328        Self::with_config(StoreConfig::default())
329    }
330
331    pub fn with_config(config: StoreConfig) -> Self {
332        let (updates_tx, _) = broadcast::channel(1000);
333        let (ready_tx, ready_rx) = watch::channel(HashSet::new());
334        Self {
335            views: Arc::new(RwLock::new(HashMap::new())),
336            view_configs: Arc::new(RwLock::new(HashMap::new())),
337            updates_tx,
338            ready_views: Arc::new(RwLock::new(HashSet::new())),
339            ready_tx,
340            ready_rx,
341            config,
342        }
343    }
344
345    fn enforce_max_entries(&self, view_data: &mut ViewData) {
346        if let Some(max) = self.config.max_entries_per_view {
347            while view_data.len() > max {
348                if let Some(evicted_key) = view_data.evict_oldest() {
349                    tracing::debug!("evicted oldest entry: {}", evicted_key);
350                }
351            }
352        }
353    }
354
355    pub async fn apply_frame(&self, frame: Frame) {
356        let view_path = &frame.entity;
357        tracing::debug!(
358            "apply_frame: view={}, key={}, op={}",
359            view_path,
360            frame.key,
361            frame.op,
362        );
363
364        let operation = frame.operation();
365
366        if operation == Operation::Snapshot {
367            self.apply_snapshot(&frame).await;
368            return;
369        }
370
371        let sort_config = self.view_configs.read().await.get(view_path).cloned();
372
373        let mut views = self.views.write().await;
374        let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
375            if let Some(config) = sort_config {
376                ViewData::with_sort_config(config)
377            } else {
378                ViewData::new()
379            }
380        });
381
382        let previous = view_data.entities.get(&frame.key).cloned();
383
384        let (current, patch) = match operation {
385            Operation::Upsert | Operation::Create => {
386                view_data.insert(frame.key.clone(), frame.data.clone());
387                self.enforce_max_entries(view_data);
388                (Some(frame.data), None)
389            }
390            Operation::Patch => {
391                let raw_patch = frame.data.clone();
392                let entry = view_data
393                    .entities
394                    .entry(frame.key.clone())
395                    .or_insert_with(|| serde_json::json!({}));
396                deep_merge_with_append(entry, &frame.data, &frame.append, "");
397                let merged = entry.clone();
398                view_data.touch(&frame.key);
399                self.enforce_max_entries(view_data);
400                (Some(merged), Some(raw_patch))
401            }
402            Operation::Delete => {
403                view_data.remove(&frame.key);
404                (None, None)
405            }
406            Operation::Snapshot | Operation::Subscribed => unreachable!(),
407        };
408
409        let _ = self.updates_tx.send(StoreUpdate {
410            view: view_path.to_string(),
411            key: frame.key,
412            operation,
413            data: current,
414            previous,
415            patch,
416        });
417
418        self.mark_view_ready(view_path).await;
419    }
420
421    async fn apply_snapshot(&self, frame: &Frame) {
422        let view_path = &frame.entity;
423        let snapshot_entities = parse_snapshot_entities(&frame.data);
424
425        tracing::debug!(
426            "apply_snapshot: view={}, count={}",
427            view_path,
428            snapshot_entities.len()
429        );
430
431        let sort_config = self.view_configs.read().await.get(view_path).cloned();
432
433        let mut views = self.views.write().await;
434        let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
435            if let Some(config) = sort_config {
436                ViewData::with_sort_config(config)
437            } else {
438                ViewData::new()
439            }
440        });
441
442        for entity in snapshot_entities {
443            let previous = view_data.entities.get(&entity.key).cloned();
444            view_data.insert(entity.key.clone(), entity.data.clone());
445
446            let _ = self.updates_tx.send(StoreUpdate {
447                view: view_path.to_string(),
448                key: entity.key,
449                operation: Operation::Upsert,
450                data: Some(entity.data),
451                previous,
452                patch: None,
453            });
454        }
455
456        self.enforce_max_entries(view_data);
457        drop(views);
458        self.mark_view_ready(view_path).await;
459    }
460
461    pub async fn mark_view_ready(&self, view: &str) {
462        let mut ready = self.ready_views.write().await;
463        if ready.insert(view.to_string()) {
464            let _ = self.ready_tx.send(ready.clone());
465        }
466    }
467
468    pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
469        if self.ready_views.read().await.contains(view) {
470            return true;
471        }
472
473        let mut rx = self.ready_rx.clone();
474        let deadline = tokio::time::Instant::now() + timeout;
475
476        loop {
477            let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
478            if timeout_remaining.is_zero() {
479                return false;
480            }
481
482            tokio::select! {
483                result = rx.changed() => {
484                    if result.is_err() {
485                        return false;
486                    }
487                    if rx.borrow().contains(view) {
488                        return true;
489                    }
490                }
491                _ = tokio::time::sleep(timeout_remaining) => {
492                    return false;
493                }
494            }
495        }
496    }
497
498    pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
499        let views = self.views.read().await;
500        views
501            .get(view)?
502            .entities
503            .get(key)
504            .and_then(|v| serde_json::from_value(v.clone()).ok())
505    }
506
507    pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
508        let views = self.views.read().await;
509        views
510            .get(view)
511            .map(|view_data| {
512                view_data
513                    .ordered_values()
514                    .into_iter()
515                    .filter_map(|v| serde_json::from_value(v).ok())
516                    .collect()
517            })
518            .unwrap_or_default()
519    }
520
521    pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
522        let views = self.views.read().await;
523        views
524            .get(view)
525            .map(|view_data| view_data.entities.clone())
526            .unwrap_or_default()
527    }
528
529    /// Synchronously get a single entity by key.
530    ///
531    /// Returns `None` if the view doesn't exist or the key is not found.
532    /// This is a non-blocking operation using `try_read()`.
533    ///
534    /// Use this in synchronous contexts where you can't await.
535    /// If the lock is held by another task, returns `None`.
536    pub fn get_sync<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
537        let views = self.views.try_read().ok()?;
538        views
539            .get(view)?
540            .entities
541            .get(key)
542            .and_then(|v| serde_json::from_value(v.clone()).ok())
543    }
544
545    /// Synchronously get all entities from a view.
546    ///
547    /// Returns an empty vector if the view doesn't exist.
548    /// This is a non-blocking operation using `try_read()`.
549    ///
550    /// Use this in synchronous contexts where you can't await.
551    /// If the lock is held by another task, returns an empty vector.
552    pub fn list_sync<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
553        let Some(views) = self.views.try_read().ok() else {
554            return Vec::new();
555        };
556        views
557            .get(view)
558            .map(|view_data| {
559                view_data
560                    .ordered_values()
561                    .into_iter()
562                    .filter_map(|v| serde_json::from_value(v).ok())
563                    .collect()
564            })
565            .unwrap_or_default()
566    }
567
568    pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
569        self.updates_tx.subscribe()
570    }
571
572    pub async fn apply_subscribed_frame(&self, frame: SubscribedFrame) {
573        let view_path = &frame.view;
574        tracing::debug!(
575            "apply_subscribed_frame: view={}, mode={:?}, sort={:?}",
576            view_path,
577            frame.mode,
578            frame.sort,
579        );
580
581        if let Some(sort_config) = frame.sort {
582            self.view_configs
583                .write()
584                .await
585                .insert(view_path.to_string(), sort_config.clone());
586
587            let mut views = self.views.write().await;
588            if let Some(view_data) = views.get_mut(view_path) {
589                view_data.set_sort_config(sort_config);
590            }
591        }
592    }
593
594    pub async fn get_view_sort_config(&self, view: &str) -> Option<SortConfig> {
595        self.view_configs.read().await.get(view).cloned()
596    }
597
598    pub async fn set_view_sort_config(&self, view: &str, config: SortConfig) {
599        self.view_configs
600            .write()
601            .await
602            .insert(view.to_string(), config.clone());
603
604        let mut views = self.views.write().await;
605        if let Some(view_data) = views.get_mut(view) {
606            view_data.set_sort_config(config);
607        }
608    }
609}
610
611impl Default for SharedStore {
612    fn default() -> Self {
613        Self::new()
614    }
615}
616
617impl Clone for SharedStore {
618    fn clone(&self) -> Self {
619        Self {
620            views: self.views.clone(),
621            view_configs: self.view_configs.clone(),
622            updates_tx: self.updates_tx.clone(),
623            ready_views: self.ready_views.clone(),
624            ready_tx: self.ready_tx.clone(),
625            ready_rx: self.ready_rx.clone(),
626            config: self.config.clone(),
627        }
628    }
629}