hyperstack_sdk/
store.rs

1use crate::frame::{
2    parse_snapshot_entities, Frame, Operation, SortConfig, SortOrder, SubscribedFrame,
3};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use std::cmp::Ordering;
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::sync::Arc;
9use tokio::sync::{broadcast, watch, RwLock};
10
11/// Default maximum number of entries per view before LRU eviction kicks in.
12/// Set to 10,000 to provide a reasonable balance between memory usage and data retention.
13pub const DEFAULT_MAX_ENTRIES_PER_VIEW: usize = 10_000;
14
15/// Configuration for the SharedStore.
16#[derive(Debug, Clone)]
17pub struct StoreConfig {
18    /// Maximum number of entries to keep per view. When exceeded, oldest entries
19    /// are evicted using LRU (Least Recently Used) strategy.
20    /// Set to `None` to disable size limiting (not recommended for long-running clients).
21    pub max_entries_per_view: Option<usize>,
22}
23
24impl Default for StoreConfig {
25    fn default() -> Self {
26        Self {
27            max_entries_per_view: Some(DEFAULT_MAX_ENTRIES_PER_VIEW),
28        }
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33struct SortKey {
34    sort_value: SortValue,
35    entity_key: String,
36}
37
38impl PartialOrd for SortKey {
39    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
40        Some(self.cmp(other))
41    }
42}
43
44impl Ord for SortKey {
45    fn cmp(&self, other: &Self) -> Ordering {
46        match self.sort_value.cmp(&other.sort_value) {
47            Ordering::Equal => self.entity_key.cmp(&other.entity_key),
48            other => other,
49        }
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum SortValue {
55    Null,
56    Bool(bool),
57    Integer(i64),
58    String(String),
59}
60
61impl Ord for SortValue {
62    fn cmp(&self, other: &Self) -> Ordering {
63        match (self, other) {
64            (SortValue::Null, SortValue::Null) => Ordering::Equal,
65            (SortValue::Null, _) => Ordering::Less,
66            (_, SortValue::Null) => Ordering::Greater,
67            (SortValue::Bool(a), SortValue::Bool(b)) => a.cmp(b),
68            (SortValue::Integer(a), SortValue::Integer(b)) => a.cmp(b),
69            (SortValue::String(a), SortValue::String(b)) => a.cmp(b),
70            (SortValue::Integer(_), SortValue::String(_)) => Ordering::Less,
71            (SortValue::String(_), SortValue::Integer(_)) => Ordering::Greater,
72            (SortValue::Bool(_), _) => Ordering::Less,
73            (_, SortValue::Bool(_)) => Ordering::Greater,
74        }
75    }
76}
77
78impl PartialOrd for SortValue {
79    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84fn extract_sort_value(entity: &Value, field_path: &[String]) -> SortValue {
85    let mut current = entity;
86    for segment in field_path {
87        match current.get(segment) {
88            Some(v) => current = v,
89            None => return SortValue::Null,
90        }
91    }
92
93    match current {
94        Value::Null => SortValue::Null,
95        Value::Bool(b) => SortValue::Bool(*b),
96        Value::Number(n) => {
97            if let Some(i) = n.as_i64() {
98                SortValue::Integer(i)
99            } else if let Some(f) = n.as_f64() {
100                SortValue::Integer(f as i64)
101            } else {
102                SortValue::Null
103            }
104        }
105        Value::String(s) => SortValue::String(s.clone()),
106        _ => SortValue::Null,
107    }
108}
109
110
111
112struct ViewData {
113    entities: HashMap<String, serde_json::Value>,
114    access_order: VecDeque<String>,
115    sort_config: Option<SortConfig>,
116    sorted_keys: BTreeMap<SortKey, ()>,
117}
118
119fn deep_merge_with_append(
120    target: &mut Value,
121    patch: &Value,
122    append_paths: &[String],
123    current_path: &str,
124) {
125    match (target, patch) {
126        (Value::Object(target_map), Value::Object(patch_map)) => {
127            for (key, patch_value) in patch_map {
128                let field_path = if current_path.is_empty() {
129                    key.clone()
130                } else {
131                    format!("{}.{}", current_path, key)
132                };
133                match target_map.get_mut(key) {
134                    Some(target_value) => {
135                        deep_merge_with_append(target_value, patch_value, append_paths, &field_path)
136                    }
137                    None => {
138                        target_map.insert(key.clone(), patch_value.clone());
139                    }
140                }
141            }
142        }
143        (Value::Array(target_arr), Value::Array(patch_arr))
144            if append_paths.contains(&current_path.to_string()) =>
145        {
146            target_arr.extend(patch_arr.iter().cloned());
147        }
148        (target, patch) => {
149            *target = patch.clone();
150        }
151    }
152}
153
154#[derive(Debug, Clone)]
155pub struct StoreUpdate {
156    pub view: String,
157    pub key: String,
158    pub operation: Operation,
159    pub data: Option<serde_json::Value>,
160    pub previous: Option<serde_json::Value>,
161    /// The raw patch data for Patch operations (before merging into full state).
162    /// This allows consumers to see exactly what fields changed without diffing.
163    pub patch: Option<serde_json::Value>,
164}
165
166pub struct SharedStore {
167    views: Arc<RwLock<HashMap<String, ViewData>>>,
168    view_configs: Arc<RwLock<HashMap<String, SortConfig>>>,
169    updates_tx: broadcast::Sender<StoreUpdate>,
170    ready_views: Arc<RwLock<HashSet<String>>>,
171    ready_tx: watch::Sender<HashSet<String>>,
172    ready_rx: watch::Receiver<HashSet<String>>,
173    config: StoreConfig,
174}
175
176impl ViewData {
177    fn new() -> Self {
178        Self {
179            entities: HashMap::new(),
180            access_order: VecDeque::new(),
181            sort_config: None,
182            sorted_keys: BTreeMap::new(),
183        }
184    }
185
186    fn with_sort_config(sort_config: SortConfig) -> Self {
187        Self {
188            entities: HashMap::new(),
189            access_order: VecDeque::new(),
190            sort_config: Some(sort_config),
191            sorted_keys: BTreeMap::new(),
192        }
193    }
194
195    fn set_sort_config(&mut self, config: SortConfig) {
196        if self.sort_config.is_some() {
197            return;
198        }
199        self.sort_config = Some(config);
200        self.rebuild_sorted_keys();
201    }
202
203    fn rebuild_sorted_keys(&mut self) {
204        self.sorted_keys.clear();
205        if let Some(ref config) = self.sort_config {
206            for (key, value) in &self.entities {
207                let sort_value = extract_sort_value(value, &config.field);
208                let sort_key = SortKey {
209                    sort_value,
210                    entity_key: key.clone(),
211                };
212                self.sorted_keys.insert(sort_key, ());
213            }
214        }
215        self.access_order.clear();
216    }
217
218    fn touch(&mut self, key: &str) {
219        if self.sort_config.is_some() {
220            return;
221        }
222        self.access_order.retain(|k| k != key);
223        self.access_order.push_back(key.to_string());
224    }
225
226    fn insert(&mut self, key: String, value: serde_json::Value) {
227        if let Some(ref config) = self.sort_config {
228            if let Some(old_value) = self.entities.get(&key) {
229                let old_sort_value = extract_sort_value(old_value, &config.field);
230                let old_sort_key = SortKey {
231                    sort_value: old_sort_value,
232                    entity_key: key.clone(),
233                };
234                self.sorted_keys.remove(&old_sort_key);
235            }
236
237            let sort_value = extract_sort_value(&value, &config.field);
238            let sort_key = SortKey {
239                sort_value,
240                entity_key: key.clone(),
241            };
242            self.sorted_keys.insert(sort_key, ());
243        } else if !self.entities.contains_key(&key) {
244            self.access_order.push_back(key.clone());
245        } else {
246            self.touch(&key);
247        }
248        self.entities.insert(key, value);
249    }
250
251    fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
252        if let Some(ref config) = self.sort_config {
253            if let Some(value) = self.entities.get(key) {
254                let sort_value = extract_sort_value(value, &config.field);
255                let sort_key = SortKey {
256                    sort_value,
257                    entity_key: key.to_string(),
258                };
259                self.sorted_keys.remove(&sort_key);
260            }
261        } else {
262            self.access_order.retain(|k| k != key);
263        }
264        self.entities.remove(key)
265    }
266
267    fn evict_oldest(&mut self) -> Option<String> {
268        if self.sort_config.is_some() {
269            if let Some((sort_key, _)) = self
270                .sorted_keys
271                .iter()
272                .next_back()
273                .map(|(k, v)| (k.clone(), *v))
274            {
275                self.sorted_keys.remove(&sort_key);
276                self.entities.remove(&sort_key.entity_key);
277                return Some(sort_key.entity_key);
278            }
279            return None;
280        }
281
282        if let Some(oldest_key) = self.access_order.pop_front() {
283            self.entities.remove(&oldest_key);
284            Some(oldest_key)
285        } else {
286            None
287        }
288    }
289
290    fn len(&self) -> usize {
291        self.entities.len()
292    }
293
294    #[allow(dead_code)]
295    fn ordered_keys(&self) -> Vec<String> {
296        if let Some(ref config) = self.sort_config {
297            let keys: Vec<String> = self
298                .sorted_keys
299                .keys()
300                .map(|sk| sk.entity_key.clone())
301                .collect();
302            match config.order {
303                SortOrder::Asc => keys,
304                SortOrder::Desc => keys.into_iter().rev().collect(),
305            }
306        } else {
307            self.entities.keys().cloned().collect()
308        }
309    }
310
311    fn ordered_values(&self) -> Vec<serde_json::Value> {
312        if let Some(ref config) = self.sort_config {
313            let values: Vec<serde_json::Value> = self
314                .sorted_keys
315                .keys()
316                .filter_map(|sk| self.entities.get(&sk.entity_key).cloned())
317                .collect();
318            match config.order {
319                SortOrder::Asc => values,
320                SortOrder::Desc => values.into_iter().rev().collect(),
321            }
322        } else {
323            self.entities.values().cloned().collect()
324        }
325    }
326}
327
328impl SharedStore {
329    pub fn new() -> Self {
330        Self::with_config(StoreConfig::default())
331    }
332
333    pub fn with_config(config: StoreConfig) -> Self {
334        let (updates_tx, _) = broadcast::channel(1000);
335        let (ready_tx, ready_rx) = watch::channel(HashSet::new());
336        Self {
337            views: Arc::new(RwLock::new(HashMap::new())),
338            view_configs: Arc::new(RwLock::new(HashMap::new())),
339            updates_tx,
340            ready_views: Arc::new(RwLock::new(HashSet::new())),
341            ready_tx,
342            ready_rx,
343            config,
344        }
345    }
346
347    fn enforce_max_entries(&self, view_data: &mut ViewData) {
348        if let Some(max) = self.config.max_entries_per_view {
349            while view_data.len() > max {
350                if let Some(evicted_key) = view_data.evict_oldest() {
351                    tracing::debug!("evicted oldest entry: {}", evicted_key);
352                }
353            }
354        }
355    }
356
357    pub async fn apply_frame(&self, frame: Frame) {
358        let view_path = &frame.entity;
359        tracing::debug!(
360            "apply_frame: view={}, key={}, op={}",
361            view_path,
362            frame.key,
363            frame.op,
364        );
365
366        let operation = frame.operation();
367
368        if operation == Operation::Snapshot {
369            self.apply_snapshot(&frame).await;
370            return;
371        }
372
373        let sort_config = self.view_configs.read().await.get(view_path).cloned();
374
375        let mut views = self.views.write().await;
376        let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
377            if let Some(config) = sort_config {
378                ViewData::with_sort_config(config)
379            } else {
380                ViewData::new()
381            }
382        });
383
384        let previous = view_data.entities.get(&frame.key).cloned();
385
386        let (current, patch) = match operation {
387            Operation::Upsert | Operation::Create => {
388                view_data.insert(frame.key.clone(), frame.data.clone());
389                self.enforce_max_entries(view_data);
390                (Some(frame.data), None)
391            }
392            Operation::Patch => {
393                let raw_patch = frame.data.clone();
394                let entry = view_data
395                    .entities
396                    .entry(frame.key.clone())
397                    .or_insert_with(|| serde_json::json!({}));
398                deep_merge_with_append(entry, &frame.data, &frame.append, "");
399                let merged = entry.clone();
400                view_data.touch(&frame.key);
401                self.enforce_max_entries(view_data);
402                (Some(merged), Some(raw_patch))
403            }
404            Operation::Delete => {
405                view_data.remove(&frame.key);
406                (None, None)
407            }
408            Operation::Snapshot | Operation::Subscribed => unreachable!(),
409        };
410
411        let _ = self.updates_tx.send(StoreUpdate {
412            view: view_path.to_string(),
413            key: frame.key,
414            operation,
415            data: current,
416            previous,
417            patch,
418        });
419
420        self.mark_view_ready(view_path).await;
421    }
422
423    async fn apply_snapshot(&self, frame: &Frame) {
424        let view_path = &frame.entity;
425        let snapshot_entities = parse_snapshot_entities(&frame.data);
426
427        tracing::debug!(
428            "apply_snapshot: view={}, count={}",
429            view_path,
430            snapshot_entities.len()
431        );
432
433        let sort_config = self.view_configs.read().await.get(view_path).cloned();
434
435        let mut views = self.views.write().await;
436        let view_data = views.entry(view_path.to_string()).or_insert_with(|| {
437            if let Some(config) = sort_config {
438                ViewData::with_sort_config(config)
439            } else {
440                ViewData::new()
441            }
442        });
443
444        for entity in snapshot_entities {
445            let previous = view_data.entities.get(&entity.key).cloned();
446            view_data.insert(entity.key.clone(), entity.data.clone());
447
448            let _ = self.updates_tx.send(StoreUpdate {
449                view: view_path.to_string(),
450                key: entity.key,
451                operation: Operation::Upsert,
452                data: Some(entity.data),
453                previous,
454                patch: None,
455            });
456        }
457
458        self.enforce_max_entries(view_data);
459        drop(views);
460        self.mark_view_ready(view_path).await;
461    }
462
463    pub async fn mark_view_ready(&self, view: &str) {
464        let mut ready = self.ready_views.write().await;
465        if ready.insert(view.to_string()) {
466            let _ = self.ready_tx.send(ready.clone());
467        }
468    }
469
470    pub async fn wait_for_view_ready(&self, view: &str, timeout: std::time::Duration) -> bool {
471        if self.ready_views.read().await.contains(view) {
472            return true;
473        }
474
475        let mut rx = self.ready_rx.clone();
476        let deadline = tokio::time::Instant::now() + timeout;
477
478        loop {
479            let timeout_remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
480            if timeout_remaining.is_zero() {
481                return false;
482            }
483
484            tokio::select! {
485                result = rx.changed() => {
486                    if result.is_err() {
487                        return false;
488                    }
489                    if rx.borrow().contains(view) {
490                        return true;
491                    }
492                }
493                _ = tokio::time::sleep(timeout_remaining) => {
494                    return false;
495                }
496            }
497        }
498    }
499
500    pub async fn get<T: DeserializeOwned>(&self, view: &str, key: &str) -> Option<T> {
501        let views = self.views.read().await;
502        views
503            .get(view)?
504            .entities
505            .get(key)
506            .and_then(|v| serde_json::from_value(v.clone()).ok())
507    }
508
509    pub async fn list<T: DeserializeOwned>(&self, view: &str) -> Vec<T> {
510        let views = self.views.read().await;
511        views
512            .get(view)
513            .map(|view_data| {
514                view_data
515                    .ordered_values()
516                    .into_iter()
517                    .filter_map(|v| serde_json::from_value(v).ok())
518                    .collect()
519            })
520            .unwrap_or_default()
521    }
522
523    pub async fn all_raw(&self, view: &str) -> HashMap<String, serde_json::Value> {
524        let views = self.views.read().await;
525        views
526            .get(view)
527            .map(|view_data| view_data.entities.clone())
528            .unwrap_or_default()
529    }
530
531    pub fn subscribe(&self) -> broadcast::Receiver<StoreUpdate> {
532        self.updates_tx.subscribe()
533    }
534
535    pub async fn apply_subscribed_frame(&self, frame: SubscribedFrame) {
536        let view_path = &frame.view;
537        tracing::debug!(
538            "apply_subscribed_frame: view={}, mode={:?}, sort={:?}",
539            view_path,
540            frame.mode,
541            frame.sort,
542        );
543
544        if let Some(sort_config) = frame.sort {
545            self.view_configs
546                .write()
547                .await
548                .insert(view_path.to_string(), sort_config.clone());
549
550            let mut views = self.views.write().await;
551            if let Some(view_data) = views.get_mut(view_path) {
552                view_data.set_sort_config(sort_config);
553            }
554        }
555    }
556
557    pub async fn get_view_sort_config(&self, view: &str) -> Option<SortConfig> {
558        self.view_configs.read().await.get(view).cloned()
559    }
560
561    pub async fn set_view_sort_config(&self, view: &str, config: SortConfig) {
562        self.view_configs
563            .write()
564            .await
565            .insert(view.to_string(), config.clone());
566
567        let mut views = self.views.write().await;
568        if let Some(view_data) = views.get_mut(view) {
569            view_data.set_sort_config(config);
570        }
571    }
572}
573
574impl Default for SharedStore {
575    fn default() -> Self {
576        Self::new()
577    }
578}
579
580impl Clone for SharedStore {
581    fn clone(&self) -> Self {
582        Self {
583            views: self.views.clone(),
584            view_configs: self.view_configs.clone(),
585            updates_tx: self.updates_tx.clone(),
586            ready_views: self.ready_views.clone(),
587            ready_tx: self.ready_tx.clone(),
588            ready_rx: self.ready_rx.clone(),
589            config: self.config.clone(),
590        }
591    }
592}