Skip to main content

astra_core/
store.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::Path;
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7
8use crate::config::WatchBacklogMode;
9use crate::errors::StoreError;
10use crate::memory::{MemoryPressure, MemoryTracker};
11use crate::metrics;
12use crate::watch::{WatchEvent, WatchEventKind, WatchFilter, WatchRing, WatchSubscription};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ValueEntry {
16    pub value: Vec<u8>,
17    pub create_revision: i64,
18    pub mod_revision: i64,
19    pub version: i64,
20    pub lease: i64,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct LeaseEntry {
25    pub id: i64,
26    pub granted_ttl: i64,
27    pub ttl: i64,
28}
29
30#[derive(Debug, Default, Clone, Serialize, Deserialize)]
31pub struct SnapshotState {
32    pub kv: Vec<(Vec<u8>, ValueEntry)>,
33    pub leases: Vec<(i64, LeaseEntry)>,
34    pub next_lease_id: i64,
35    pub revision: i64,
36    pub compact_revision: i64,
37}
38
39#[derive(Debug)]
40struct StoreState {
41    map: BTreeMap<Vec<u8>, ValueEntry>,
42    leases: HashMap<i64, LeaseEntry>,
43    prefix_max_mod_revision: HashMap<Vec<u8>, i64>,
44    next_lease_id: i64,
45    revision: i64,
46    compact_revision: i64,
47}
48
49impl Default for StoreState {
50    fn default() -> Self {
51        Self {
52            map: BTreeMap::new(),
53            leases: HashMap::new(),
54            prefix_max_mod_revision: HashMap::new(),
55            next_lease_id: 1,
56            revision: 0,
57            compact_revision: 0,
58        }
59    }
60}
61
62#[derive(Debug, Clone)]
63pub struct PutOutput {
64    pub revision: i64,
65    pub prev: Option<ValueEntry>,
66    pub current: ValueEntry,
67}
68
69#[derive(Debug, Clone)]
70pub struct RangeOutput {
71    pub revision: i64,
72    pub count: i64,
73    pub more: bool,
74    pub kvs: Vec<(Vec<u8>, ValueEntry)>,
75}
76
77#[derive(Debug, Clone)]
78pub struct DeleteOutput {
79    pub revision: i64,
80    pub deleted: i64,
81    pub prev_kvs: Vec<(Vec<u8>, ValueEntry)>,
82}
83
84#[derive(Debug, Clone)]
85pub struct LeaseGrantOutput {
86    pub revision: i64,
87    pub id: i64,
88    pub ttl: i64,
89}
90
91#[derive(Debug, Clone)]
92pub struct LeaseRevokeOutput {
93    pub revision: i64,
94    pub deleted: i64,
95}
96
97#[derive(Debug, Clone)]
98pub struct LeaseTtlOutput {
99    pub revision: i64,
100    pub id: i64,
101    pub ttl: i64,
102    pub granted_ttl: i64,
103    pub keys: Vec<Vec<u8>>,
104}
105
106#[derive(Debug)]
107pub struct KvStore {
108    state: RwLock<StoreState>,
109    memory: MemoryTracker,
110    watch_ring: WatchRing,
111    hot_revision_window: i64,
112    prefix_filter_enabled: bool,
113    revision_filter_enabled: bool,
114}
115
116impl KvStore {
117    pub fn open(
118        data_dir: &Path,
119        max_memory_bytes: usize,
120        hot_revision_window: i64,
121        prefix_filter_enabled: bool,
122        revision_filter_enabled: bool,
123        watch_ring_capacity: usize,
124        watch_broadcast_capacity: usize,
125        watch_backlog_mode: WatchBacklogMode,
126    ) -> Result<Self, StoreError> {
127        std::fs::create_dir_all(data_dir)?;
128
129        Ok(Self {
130            state: RwLock::new(StoreState::default()),
131            memory: MemoryTracker::new(max_memory_bytes),
132            watch_ring: WatchRing::new(
133                watch_ring_capacity,
134                watch_broadcast_capacity,
135                watch_backlog_mode,
136            ),
137            hot_revision_window,
138            prefix_filter_enabled,
139            revision_filter_enabled,
140        })
141    }
142
143    fn with_indexed_prefixes(key: &[u8], mut visit: impl FnMut(&[u8])) {
144        if key.is_empty() {
145            return;
146        }
147
148        let mut segment_count = 0usize;
149        for (idx, byte) in key.iter().enumerate() {
150            if *byte != b'/' || idx == 0 {
151                continue;
152            }
153            segment_count = segment_count.saturating_add(1);
154            if segment_count > 4 {
155                break;
156            }
157            visit(&key[..=idx]);
158        }
159        visit(key);
160    }
161
162    fn update_prefix_revision_index_for_entry(state: &mut StoreState, key: &[u8], revision: i64) {
163        Self::with_indexed_prefixes(key, |prefix| {
164            let current = state
165                .prefix_max_mod_revision
166                .entry(prefix.to_vec())
167                .or_insert(revision);
168            if revision > *current {
169                *current = revision;
170            }
171        });
172    }
173
174    fn rebuild_prefix_revision_index_locked(state: &mut StoreState) {
175        state.prefix_max_mod_revision.clear();
176        let rows = state
177            .map
178            .iter()
179            .map(|(k, v)| (k.clone(), v.mod_revision))
180            .collect::<Vec<_>>();
181        for (key, revision) in rows {
182            Self::update_prefix_revision_index_for_entry(state, &key, revision);
183        }
184    }
185
186    fn refresh_compaction_locked(&self, state: &mut StoreState) {
187        let watermark = state.revision.saturating_sub(self.hot_revision_window);
188        if watermark > state.compact_revision {
189            state.compact_revision = watermark;
190        }
191    }
192
193    fn bump_revision(&self) -> i64 {
194        let mut state = self.state.write();
195        state.revision += 1;
196        self.refresh_compaction_locked(&mut state);
197        state.revision
198    }
199
200    pub fn reserve_revision(&self) -> i64 {
201        self.bump_revision()
202    }
203
204    pub fn memory_pressure(&self) -> MemoryPressure {
205        self.memory.pressure()
206    }
207
208    pub fn current_revision(&self) -> i64 {
209        self.state.read().revision
210    }
211
212    pub fn compact_revision(&self) -> i64 {
213        self.state.read().compact_revision
214    }
215
216    pub fn compact_to(&self, revision: i64) -> Result<i64, StoreError> {
217        if revision <= 0 {
218            return Err(StoreError::InvalidArgument(
219                "compact revision must be > 0".to_string(),
220            ));
221        }
222
223        let mut state = self.state.write();
224        if revision > state.revision {
225            return Err(StoreError::InvalidArgument(format!(
226                "compact revision {} is ahead of current revision {}",
227                revision, state.revision
228            )));
229        }
230        if revision > state.compact_revision {
231            state.compact_revision = revision;
232        }
233        Ok(state.compact_revision)
234    }
235
236    pub fn is_empty(&self) -> bool {
237        self.state.read().map.is_empty()
238    }
239
240    pub fn put(
241        &self,
242        key: Vec<u8>,
243        value: Vec<u8>,
244        lease: i64,
245        ignore_value: bool,
246        ignore_lease: bool,
247    ) -> Result<PutOutput, StoreError> {
248        let revision = self.bump_revision();
249        self.apply_put_at_revision(key, value, lease, ignore_value, ignore_lease, revision)
250    }
251
252    pub fn apply_put_at_revision(
253        &self,
254        key: Vec<u8>,
255        value: Vec<u8>,
256        lease: i64,
257        ignore_value: bool,
258        ignore_lease: bool,
259        revision: i64,
260    ) -> Result<PutOutput, StoreError> {
261        if key.is_empty() {
262            return Err(StoreError::InvalidArgument("empty key".to_string()));
263        }
264
265        let (prev, current) = {
266            let mut state = self.state.write();
267            let prev = state.map.get(&key).cloned();
268
269            if ignore_value && prev.is_none() {
270                return Err(StoreError::InvalidArgument(
271                    "ignore_value requires existing key".to_string(),
272                ));
273            }
274
275            if ignore_lease && prev.is_none() {
276                return Err(StoreError::InvalidArgument(
277                    "ignore_lease requires existing key".to_string(),
278                ));
279            }
280
281            let next_value = if ignore_value {
282                prev.as_ref().map(|v| v.value.clone()).unwrap_or_default()
283            } else {
284                value
285            };
286
287            let next_lease = if ignore_lease {
288                prev.as_ref().map(|v| v.lease).unwrap_or(0)
289            } else {
290                lease
291            };
292
293            if next_lease > 0 && !state.leases.contains_key(&next_lease) {
294                return Err(StoreError::InvalidArgument(format!(
295                    "lease {} not found",
296                    next_lease
297                )));
298            }
299
300            let prev_len = prev.as_ref().map(|v| v.value.len()).unwrap_or(0);
301            if next_value.len() > prev_len {
302                self.memory.try_increase(next_value.len() - prev_len)?;
303            } else {
304                self.memory.decrease(prev_len - next_value.len());
305            }
306
307            let current = ValueEntry {
308                create_revision: prev.as_ref().map(|v| v.create_revision).unwrap_or(revision),
309                mod_revision: revision,
310                version: prev.as_ref().map(|v| v.version + 1).unwrap_or(1),
311                value: next_value,
312                lease: next_lease,
313            };
314
315            state.map.insert(key.clone(), current.clone());
316            Self::update_prefix_revision_index_for_entry(&mut state, &key, revision);
317            state.revision = state.revision.max(revision);
318            self.refresh_compaction_locked(&mut state);
319
320            (prev, current)
321        };
322
323        self.watch_ring.publish(WatchEvent {
324            kind: WatchEventKind::Put,
325            key,
326            value: Arc::<[u8]>::from(current.value.clone()),
327            prev_value: Arc::<[u8]>::from(
328                prev.as_ref().map(|v| v.value.clone()).unwrap_or_default(),
329            ),
330            create_revision: current.create_revision,
331            mod_revision: current.mod_revision,
332            version: current.version,
333            lease: current.lease,
334        });
335
336        Ok(PutOutput {
337            revision,
338            prev,
339            current,
340        })
341    }
342
343    pub fn range(
344        &self,
345        key: &[u8],
346        range_end: &[u8],
347        limit: i64,
348        revision: i64,
349        keys_only: bool,
350        count_only: bool,
351    ) -> Result<RangeOutput, StoreError> {
352        let state = self.state.read();
353
354        if revision > 0 && revision <= state.compact_revision {
355            return Err(StoreError::Compacted(state.compact_revision));
356        }
357
358        let is_prefix_query =
359            !key.is_empty() && !range_end.is_empty() && range_end == prefix_end(key).as_slice();
360        if self.prefix_filter_enabled && is_prefix_query {
361            let has_prefix = state
362                .map
363                .range(key.to_vec()..)
364                .next()
365                .map(|(k, _)| k.starts_with(key))
366                .unwrap_or(false);
367            if !has_prefix {
368                metrics::inc_list_prefix_filter_skips();
369                return Ok(RangeOutput {
370                    revision: state.revision,
371                    count: 0,
372                    more: false,
373                    kvs: Vec::new(),
374                });
375            }
376            metrics::inc_list_prefix_filter_hits();
377        }
378        if self.revision_filter_enabled && is_prefix_query && revision > 0 {
379            let max_mod_revision = state
380                .prefix_max_mod_revision
381                .get(key)
382                .copied()
383                .unwrap_or_default();
384            if max_mod_revision < revision {
385                metrics::inc_list_revision_filter_skips();
386                return Ok(RangeOutput {
387                    revision: state.revision,
388                    count: 0,
389                    more: false,
390                    kvs: Vec::new(),
391                });
392            }
393            metrics::inc_list_revision_filter_hits();
394        }
395
396        let mut found = Vec::new();
397        let mut total: i64 = 0;
398        let mut more = false;
399        let limit = if limit > 0 {
400            Some(limit as usize)
401        } else {
402            None
403        };
404
405        let mut visit = |k: &Vec<u8>, v: &ValueEntry| {
406            total = total.saturating_add(1);
407            if count_only {
408                return;
409            }
410            if let Some(lim) = limit {
411                if found.len() >= lim {
412                    more = true;
413                    return;
414                }
415            }
416            let mut vv = v.clone();
417            if keys_only {
418                vv.value.clear();
419            }
420            found.push((k.clone(), vv));
421        };
422
423        if range_end.is_empty() {
424            if let Some(v) = state.map.get(key) {
425                visit(&key.to_vec(), v);
426            }
427        } else if key.is_empty() && range_end == [0] {
428            for (k, v) in state.map.iter() {
429                visit(k, v);
430            }
431        } else {
432            for (k, v) in state.map.range(key.to_vec()..range_end.to_vec()) {
433                visit(k, v);
434            }
435        }
436
437        Ok(RangeOutput {
438            revision: state.revision,
439            count: total,
440            more,
441            kvs: found,
442        })
443    }
444
445    pub fn delete_range(
446        &self,
447        key: &[u8],
448        range_end: &[u8],
449        prev_kv: bool,
450    ) -> Result<DeleteOutput, StoreError> {
451        let revision = self.bump_revision();
452        self.apply_delete_at_revision(key, range_end, prev_kv, revision)
453    }
454
455    pub fn apply_delete_at_revision(
456        &self,
457        key: &[u8],
458        range_end: &[u8],
459        prev_kv: bool,
460        revision: i64,
461    ) -> Result<DeleteOutput, StoreError> {
462        let mut removed = Vec::new();
463        {
464            let mut state = self.state.write();
465            let keys = state
466                .map
467                .keys()
468                .filter(|k| key_in_range(k.as_slice(), key, range_end))
469                .cloned()
470                .collect::<Vec<_>>();
471
472            for k in keys {
473                if let Some(v) = state.map.remove(&k) {
474                    self.memory.decrease(v.value.len());
475                    removed.push((k, v));
476                }
477            }
478            if !removed.is_empty() && self.revision_filter_enabled {
479                Self::rebuild_prefix_revision_index_locked(&mut state);
480            }
481
482            state.revision = state.revision.max(revision);
483            self.refresh_compaction_locked(&mut state);
484        }
485
486        for (k, v) in &removed {
487            self.watch_ring.publish(WatchEvent {
488                kind: WatchEventKind::Delete,
489                key: k.clone(),
490                value: Arc::<[u8]>::from(Vec::<u8>::new()),
491                prev_value: Arc::<[u8]>::from(v.value.clone()),
492                create_revision: v.create_revision,
493                mod_revision: revision,
494                version: v.version,
495                lease: v.lease,
496            });
497        }
498
499        Ok(DeleteOutput {
500            revision,
501            deleted: removed.len() as i64,
502            prev_kvs: if prev_kv { removed } else { Vec::new() },
503        })
504    }
505
506    pub fn subscribe_watch(&self, filter: WatchFilter) -> WatchSubscription {
507        self.watch_ring.subscribe(&filter)
508    }
509
510    pub fn lease_grant(&self, id: i64, ttl: i64) -> Result<LeaseGrantOutput, StoreError> {
511        if ttl <= 0 {
512            return Err(StoreError::InvalidArgument("ttl must be > 0".to_string()));
513        }
514
515        let mut state = self.state.write();
516        let lease_id = if id > 0 {
517            id
518        } else {
519            let out = state.next_lease_id.max(1);
520            state.next_lease_id = out.saturating_add(1);
521            out
522        };
523
524        state.leases.insert(
525            lease_id,
526            LeaseEntry {
527                id: lease_id,
528                granted_ttl: ttl,
529                ttl,
530            },
531        );
532        state.next_lease_id = state.next_lease_id.max(lease_id.saturating_add(1));
533
534        Ok(LeaseGrantOutput {
535            revision: state.revision,
536            id: lease_id,
537            ttl,
538        })
539    }
540
541    pub fn lease_keep_alive(&self, id: i64) -> Result<LeaseGrantOutput, StoreError> {
542        if id <= 0 {
543            return Err(StoreError::InvalidArgument(
544                "lease id must be > 0".to_string(),
545            ));
546        }
547
548        let mut state = self.state.write();
549        let lease = state.leases.get_mut(&id).ok_or(StoreError::KeyNotFound)?;
550        lease.ttl = lease.granted_ttl;
551        let ttl = lease.ttl;
552        let revision = state.revision;
553        Ok(LeaseGrantOutput { revision, id, ttl })
554    }
555
556    pub fn lease_time_to_live(
557        &self,
558        id: i64,
559        include_keys: bool,
560    ) -> Result<LeaseTtlOutput, StoreError> {
561        if id <= 0 {
562            return Err(StoreError::InvalidArgument(
563                "lease id must be > 0".to_string(),
564            ));
565        }
566        let state = self.state.read();
567        let lease = state.leases.get(&id).ok_or(StoreError::KeyNotFound)?;
568
569        let mut keys = Vec::new();
570        if include_keys {
571            for (k, v) in state.map.iter() {
572                if v.lease == id {
573                    keys.push(k.clone());
574                }
575            }
576        }
577
578        Ok(LeaseTtlOutput {
579            revision: state.revision,
580            id,
581            ttl: lease.ttl,
582            granted_ttl: lease.granted_ttl,
583            keys,
584        })
585    }
586
587    pub fn lease_list(&self) -> Vec<i64> {
588        let state = self.state.read();
589        let mut leases = state.leases.keys().copied().collect::<Vec<_>>();
590        leases.sort_unstable();
591        leases
592    }
593
594    pub fn lease_revoke(&self, id: i64) -> Result<LeaseRevokeOutput, StoreError> {
595        let revision = self.bump_revision();
596        self.apply_lease_revoke_at_revision(id, revision)
597    }
598
599    pub fn apply_lease_revoke_at_revision(
600        &self,
601        id: i64,
602        revision: i64,
603    ) -> Result<LeaseRevokeOutput, StoreError> {
604        if id <= 0 {
605            return Err(StoreError::InvalidArgument(
606                "lease id must be > 0".to_string(),
607            ));
608        }
609
610        let mut removed = Vec::new();
611        {
612            let mut state = self.state.write();
613            if state.leases.remove(&id).is_none() {
614                return Err(StoreError::KeyNotFound);
615            }
616
617            let keys = state
618                .map
619                .iter()
620                .filter(|(_, v)| v.lease == id)
621                .map(|(k, _)| k.clone())
622                .collect::<Vec<_>>();
623            for k in keys {
624                if let Some(v) = state.map.remove(&k) {
625                    self.memory.decrease(v.value.len());
626                    removed.push((k, v));
627                }
628            }
629            if !removed.is_empty() && self.revision_filter_enabled {
630                Self::rebuild_prefix_revision_index_locked(&mut state);
631            }
632
633            state.revision = state.revision.max(revision);
634            self.refresh_compaction_locked(&mut state);
635        }
636
637        for (k, v) in &removed {
638            self.watch_ring.publish(WatchEvent {
639                kind: WatchEventKind::Delete,
640                key: k.clone(),
641                value: Arc::<[u8]>::from(Vec::<u8>::new()),
642                prev_value: Arc::<[u8]>::from(v.value.clone()),
643                create_revision: v.create_revision,
644                mod_revision: revision,
645                version: v.version,
646                lease: v.lease,
647            });
648        }
649
650        Ok(LeaseRevokeOutput {
651            revision,
652            deleted: removed.len() as i64,
653        })
654    }
655
656    pub fn snapshot_state(&self) -> SnapshotState {
657        let state = self.state.read();
658        SnapshotState {
659            kv: state
660                .map
661                .iter()
662                .map(|(k, v)| (k.clone(), v.clone()))
663                .collect::<Vec<_>>(),
664            leases: state
665                .leases
666                .iter()
667                .map(|(k, v)| (*k, v.clone()))
668                .collect::<Vec<_>>(),
669            next_lease_id: state.next_lease_id,
670            revision: state.revision,
671            compact_revision: state.compact_revision,
672        }
673    }
674
675    pub fn load_snapshot_state(&self, snapshot: SnapshotState) -> Result<(), StoreError> {
676        let mut state = self.state.write();
677
678        state.map.clear();
679        self.memory.decrease(self.memory.used_bytes());
680
681        for (k, v) in snapshot.kv {
682            self.memory.try_increase(v.value.len())?;
683            state.map.insert(k, v);
684        }
685        if self.revision_filter_enabled {
686            Self::rebuild_prefix_revision_index_locked(&mut state);
687        } else {
688            state.prefix_max_mod_revision.clear();
689        }
690        state.leases.clear();
691        for (k, v) in snapshot.leases {
692            state.leases.insert(k, v);
693        }
694        state.next_lease_id = snapshot.next_lease_id.max(1);
695
696        state.revision = snapshot.revision;
697        state.compact_revision = snapshot.compact_revision;
698        Ok(())
699    }
700}
701
702pub fn key_in_range(candidate: &[u8], key: &[u8], range_end: &[u8]) -> bool {
703    if range_end.is_empty() {
704        return candidate == key;
705    }
706
707    if key.is_empty() && range_end == [0] {
708        return true;
709    }
710
711    candidate >= key && candidate < range_end
712}
713
714fn prefix_end(prefix: &[u8]) -> Vec<u8> {
715    let mut end = prefix.to_vec();
716    for i in (0..end.len()).rev() {
717        if end[i] < 0xFF {
718            end[i] += 1;
719            end.truncate(i + 1);
720            return end;
721        }
722    }
723    vec![0]
724}
725
726#[cfg(test)]
727mod tests {
728    use anyhow::Result;
729
730    use crate::config::WatchBacklogMode;
731
732    use super::KvStore;
733
734    #[test]
735    fn put_get_delete_roundtrip() -> Result<()> {
736        let tmp = tempfile::tempdir()?;
737        let store = KvStore::open(
738            tmp.path(),
739            4 * 1024 * 1024,
740            100,
741            true,
742            true,
743            1024,
744            1024,
745            WatchBacklogMode::Strict,
746        )?;
747
748        let out = store.put(b"a".to_vec(), b"1".to_vec(), 0, false, false)?;
749        assert_eq!(out.current.mod_revision, 1);
750
751        let range = store.range(b"a", b"", 0, 0, false, false)?;
752        assert_eq!(range.count, 1);
753
754        let del = store.delete_range(b"a", b"", true)?;
755        assert_eq!(del.deleted, 1);
756
757        Ok(())
758    }
759
760    #[test]
761    fn compacted_error_for_old_revision() -> Result<()> {
762        let tmp = tempfile::tempdir()?;
763        let store = KvStore::open(
764            tmp.path(),
765            4 * 1024 * 1024,
766            1,
767            true,
768            true,
769            1024,
770            1024,
771            WatchBacklogMode::Strict,
772        )?;
773
774        store.put(b"a".to_vec(), b"1".to_vec(), 0, false, false)?;
775        store.put(b"b".to_vec(), b"2".to_vec(), 0, false, false)?;
776
777        let res = store.range(b"a", b"", 0, 1, false, false);
778        assert!(res.is_err());
779
780        Ok(())
781    }
782
783    #[test]
784    fn revision_filter_skips_prefix_when_window_is_stale() -> Result<()> {
785        let tmp = tempfile::tempdir()?;
786        let store = KvStore::open(
787            tmp.path(),
788            4 * 1024 * 1024,
789            100,
790            true,
791            true,
792            1024,
793            1024,
794            WatchBacklogMode::Strict,
795        )?;
796
797        let prefix = b"/registry/configmaps/default/";
798        let prefix_end = super::prefix_end(prefix);
799
800        store.put(
801            b"/registry/configmaps/default/cm-a".to_vec(),
802            b"v1".to_vec(),
803            0,
804            false,
805            false,
806        )?;
807        let fresh = store.range(prefix, &prefix_end, 0, 1, false, false)?;
808        assert_eq!(fresh.count, 1);
809
810        let stale = store.range(prefix, &prefix_end, 0, 5, false, false)?;
811        assert_eq!(stale.count, 0);
812
813        Ok(())
814    }
815}