1use std::collections::{BTreeMap, HashMap};
2use std::path::Path;
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7
8use crate::config::WatchBacklogMode;
9use crate::errors::StoreError;
10use crate::memory::{MemoryPressure, MemoryTracker};
11use crate::metrics;
12use crate::watch::{WatchEvent, WatchEventKind, WatchFilter, WatchRing, WatchSubscription};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ValueEntry {
16 pub value: Vec<u8>,
17 pub create_revision: i64,
18 pub mod_revision: i64,
19 pub version: i64,
20 pub lease: i64,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct LeaseEntry {
25 pub id: i64,
26 pub granted_ttl: i64,
27 pub ttl: i64,
28}
29
30#[derive(Debug, Default, Clone, Serialize, Deserialize)]
31pub struct SnapshotState {
32 pub kv: Vec<(Vec<u8>, ValueEntry)>,
33 pub leases: Vec<(i64, LeaseEntry)>,
34 pub next_lease_id: i64,
35 pub revision: i64,
36 pub compact_revision: i64,
37}
38
39#[derive(Debug)]
40struct StoreState {
41 map: BTreeMap<Vec<u8>, ValueEntry>,
42 leases: HashMap<i64, LeaseEntry>,
43 prefix_max_mod_revision: HashMap<Vec<u8>, i64>,
44 next_lease_id: i64,
45 revision: i64,
46 compact_revision: i64,
47}
48
49impl Default for StoreState {
50 fn default() -> Self {
51 Self {
52 map: BTreeMap::new(),
53 leases: HashMap::new(),
54 prefix_max_mod_revision: HashMap::new(),
55 next_lease_id: 1,
56 revision: 0,
57 compact_revision: 0,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
63pub struct PutOutput {
64 pub revision: i64,
65 pub prev: Option<ValueEntry>,
66 pub current: ValueEntry,
67}
68
69#[derive(Debug, Clone)]
70pub struct RangeOutput {
71 pub revision: i64,
72 pub count: i64,
73 pub more: bool,
74 pub kvs: Vec<(Vec<u8>, ValueEntry)>,
75}
76
77#[derive(Debug, Clone)]
78pub struct DeleteOutput {
79 pub revision: i64,
80 pub deleted: i64,
81 pub prev_kvs: Vec<(Vec<u8>, ValueEntry)>,
82}
83
84#[derive(Debug, Clone)]
85pub struct LeaseGrantOutput {
86 pub revision: i64,
87 pub id: i64,
88 pub ttl: i64,
89}
90
91#[derive(Debug, Clone)]
92pub struct LeaseRevokeOutput {
93 pub revision: i64,
94 pub deleted: i64,
95}
96
97#[derive(Debug, Clone)]
98pub struct LeaseTtlOutput {
99 pub revision: i64,
100 pub id: i64,
101 pub ttl: i64,
102 pub granted_ttl: i64,
103 pub keys: Vec<Vec<u8>>,
104}
105
106#[derive(Debug)]
107pub struct KvStore {
108 state: RwLock<StoreState>,
109 memory: MemoryTracker,
110 watch_ring: WatchRing,
111 hot_revision_window: i64,
112 prefix_filter_enabled: bool,
113 revision_filter_enabled: bool,
114}
115
116impl KvStore {
117 pub fn open(
118 data_dir: &Path,
119 max_memory_bytes: usize,
120 hot_revision_window: i64,
121 prefix_filter_enabled: bool,
122 revision_filter_enabled: bool,
123 watch_ring_capacity: usize,
124 watch_broadcast_capacity: usize,
125 watch_backlog_mode: WatchBacklogMode,
126 ) -> Result<Self, StoreError> {
127 std::fs::create_dir_all(data_dir)?;
128
129 Ok(Self {
130 state: RwLock::new(StoreState::default()),
131 memory: MemoryTracker::new(max_memory_bytes),
132 watch_ring: WatchRing::new(
133 watch_ring_capacity,
134 watch_broadcast_capacity,
135 watch_backlog_mode,
136 ),
137 hot_revision_window,
138 prefix_filter_enabled,
139 revision_filter_enabled,
140 })
141 }
142
143 fn with_indexed_prefixes(key: &[u8], mut visit: impl FnMut(&[u8])) {
144 if key.is_empty() {
145 return;
146 }
147
148 let mut segment_count = 0usize;
149 for (idx, byte) in key.iter().enumerate() {
150 if *byte != b'/' || idx == 0 {
151 continue;
152 }
153 segment_count = segment_count.saturating_add(1);
154 if segment_count > 4 {
155 break;
156 }
157 visit(&key[..=idx]);
158 }
159 visit(key);
160 }
161
162 fn update_prefix_revision_index_for_entry(state: &mut StoreState, key: &[u8], revision: i64) {
163 Self::with_indexed_prefixes(key, |prefix| {
164 let current = state
165 .prefix_max_mod_revision
166 .entry(prefix.to_vec())
167 .or_insert(revision);
168 if revision > *current {
169 *current = revision;
170 }
171 });
172 }
173
174 fn rebuild_prefix_revision_index_locked(state: &mut StoreState) {
175 state.prefix_max_mod_revision.clear();
176 let rows = state
177 .map
178 .iter()
179 .map(|(k, v)| (k.clone(), v.mod_revision))
180 .collect::<Vec<_>>();
181 for (key, revision) in rows {
182 Self::update_prefix_revision_index_for_entry(state, &key, revision);
183 }
184 }
185
186 fn refresh_compaction_locked(&self, state: &mut StoreState) {
187 let watermark = state.revision.saturating_sub(self.hot_revision_window);
188 if watermark > state.compact_revision {
189 state.compact_revision = watermark;
190 }
191 }
192
193 fn bump_revision(&self) -> i64 {
194 let mut state = self.state.write();
195 state.revision += 1;
196 self.refresh_compaction_locked(&mut state);
197 state.revision
198 }
199
200 pub fn reserve_revision(&self) -> i64 {
201 self.bump_revision()
202 }
203
204 pub fn memory_pressure(&self) -> MemoryPressure {
205 self.memory.pressure()
206 }
207
208 pub fn current_revision(&self) -> i64 {
209 self.state.read().revision
210 }
211
212 pub fn compact_revision(&self) -> i64 {
213 self.state.read().compact_revision
214 }
215
216 pub fn compact_to(&self, revision: i64) -> Result<i64, StoreError> {
217 if revision <= 0 {
218 return Err(StoreError::InvalidArgument(
219 "compact revision must be > 0".to_string(),
220 ));
221 }
222
223 let mut state = self.state.write();
224 if revision > state.revision {
225 return Err(StoreError::InvalidArgument(format!(
226 "compact revision {} is ahead of current revision {}",
227 revision, state.revision
228 )));
229 }
230 if revision > state.compact_revision {
231 state.compact_revision = revision;
232 }
233 Ok(state.compact_revision)
234 }
235
236 pub fn is_empty(&self) -> bool {
237 self.state.read().map.is_empty()
238 }
239
240 pub fn put(
241 &self,
242 key: Vec<u8>,
243 value: Vec<u8>,
244 lease: i64,
245 ignore_value: bool,
246 ignore_lease: bool,
247 ) -> Result<PutOutput, StoreError> {
248 let revision = self.bump_revision();
249 self.apply_put_at_revision(key, value, lease, ignore_value, ignore_lease, revision)
250 }
251
252 pub fn apply_put_at_revision(
253 &self,
254 key: Vec<u8>,
255 value: Vec<u8>,
256 lease: i64,
257 ignore_value: bool,
258 ignore_lease: bool,
259 revision: i64,
260 ) -> Result<PutOutput, StoreError> {
261 if key.is_empty() {
262 return Err(StoreError::InvalidArgument("empty key".to_string()));
263 }
264
265 let (prev, current) = {
266 let mut state = self.state.write();
267 let prev = state.map.get(&key).cloned();
268
269 if ignore_value && prev.is_none() {
270 return Err(StoreError::InvalidArgument(
271 "ignore_value requires existing key".to_string(),
272 ));
273 }
274
275 if ignore_lease && prev.is_none() {
276 return Err(StoreError::InvalidArgument(
277 "ignore_lease requires existing key".to_string(),
278 ));
279 }
280
281 let next_value = if ignore_value {
282 prev.as_ref().map(|v| v.value.clone()).unwrap_or_default()
283 } else {
284 value
285 };
286
287 let next_lease = if ignore_lease {
288 prev.as_ref().map(|v| v.lease).unwrap_or(0)
289 } else {
290 lease
291 };
292
293 if next_lease > 0 && !state.leases.contains_key(&next_lease) {
294 return Err(StoreError::InvalidArgument(format!(
295 "lease {} not found",
296 next_lease
297 )));
298 }
299
300 let prev_len = prev.as_ref().map(|v| v.value.len()).unwrap_or(0);
301 if next_value.len() > prev_len {
302 self.memory.try_increase(next_value.len() - prev_len)?;
303 } else {
304 self.memory.decrease(prev_len - next_value.len());
305 }
306
307 let current = ValueEntry {
308 create_revision: prev.as_ref().map(|v| v.create_revision).unwrap_or(revision),
309 mod_revision: revision,
310 version: prev.as_ref().map(|v| v.version + 1).unwrap_or(1),
311 value: next_value,
312 lease: next_lease,
313 };
314
315 state.map.insert(key.clone(), current.clone());
316 Self::update_prefix_revision_index_for_entry(&mut state, &key, revision);
317 state.revision = state.revision.max(revision);
318 self.refresh_compaction_locked(&mut state);
319
320 (prev, current)
321 };
322
323 self.watch_ring.publish(WatchEvent {
324 kind: WatchEventKind::Put,
325 key,
326 value: Arc::<[u8]>::from(current.value.clone()),
327 prev_value: Arc::<[u8]>::from(
328 prev.as_ref().map(|v| v.value.clone()).unwrap_or_default(),
329 ),
330 create_revision: current.create_revision,
331 mod_revision: current.mod_revision,
332 version: current.version,
333 lease: current.lease,
334 });
335
336 Ok(PutOutput {
337 revision,
338 prev,
339 current,
340 })
341 }
342
343 pub fn range(
344 &self,
345 key: &[u8],
346 range_end: &[u8],
347 limit: i64,
348 revision: i64,
349 keys_only: bool,
350 count_only: bool,
351 ) -> Result<RangeOutput, StoreError> {
352 let state = self.state.read();
353
354 if revision > 0 && revision <= state.compact_revision {
355 return Err(StoreError::Compacted(state.compact_revision));
356 }
357
358 let is_prefix_query =
359 !key.is_empty() && !range_end.is_empty() && range_end == prefix_end(key).as_slice();
360 if self.prefix_filter_enabled && is_prefix_query {
361 let has_prefix = state
362 .map
363 .range(key.to_vec()..)
364 .next()
365 .map(|(k, _)| k.starts_with(key))
366 .unwrap_or(false);
367 if !has_prefix {
368 metrics::inc_list_prefix_filter_skips();
369 return Ok(RangeOutput {
370 revision: state.revision,
371 count: 0,
372 more: false,
373 kvs: Vec::new(),
374 });
375 }
376 metrics::inc_list_prefix_filter_hits();
377 }
378 if self.revision_filter_enabled && is_prefix_query && revision > 0 {
379 let max_mod_revision = state
380 .prefix_max_mod_revision
381 .get(key)
382 .copied()
383 .unwrap_or_default();
384 if max_mod_revision < revision {
385 metrics::inc_list_revision_filter_skips();
386 return Ok(RangeOutput {
387 revision: state.revision,
388 count: 0,
389 more: false,
390 kvs: Vec::new(),
391 });
392 }
393 metrics::inc_list_revision_filter_hits();
394 }
395
396 let mut found = Vec::new();
397 let mut total: i64 = 0;
398 let mut more = false;
399 let limit = if limit > 0 {
400 Some(limit as usize)
401 } else {
402 None
403 };
404
405 let mut visit = |k: &Vec<u8>, v: &ValueEntry| {
406 total = total.saturating_add(1);
407 if count_only {
408 return;
409 }
410 if let Some(lim) = limit {
411 if found.len() >= lim {
412 more = true;
413 return;
414 }
415 }
416 let mut vv = v.clone();
417 if keys_only {
418 vv.value.clear();
419 }
420 found.push((k.clone(), vv));
421 };
422
423 if range_end.is_empty() {
424 if let Some(v) = state.map.get(key) {
425 visit(&key.to_vec(), v);
426 }
427 } else if key.is_empty() && range_end == [0] {
428 for (k, v) in state.map.iter() {
429 visit(k, v);
430 }
431 } else {
432 for (k, v) in state.map.range(key.to_vec()..range_end.to_vec()) {
433 visit(k, v);
434 }
435 }
436
437 Ok(RangeOutput {
438 revision: state.revision,
439 count: total,
440 more,
441 kvs: found,
442 })
443 }
444
445 pub fn delete_range(
446 &self,
447 key: &[u8],
448 range_end: &[u8],
449 prev_kv: bool,
450 ) -> Result<DeleteOutput, StoreError> {
451 let revision = self.bump_revision();
452 self.apply_delete_at_revision(key, range_end, prev_kv, revision)
453 }
454
455 pub fn apply_delete_at_revision(
456 &self,
457 key: &[u8],
458 range_end: &[u8],
459 prev_kv: bool,
460 revision: i64,
461 ) -> Result<DeleteOutput, StoreError> {
462 let mut removed = Vec::new();
463 {
464 let mut state = self.state.write();
465 let keys = state
466 .map
467 .keys()
468 .filter(|k| key_in_range(k.as_slice(), key, range_end))
469 .cloned()
470 .collect::<Vec<_>>();
471
472 for k in keys {
473 if let Some(v) = state.map.remove(&k) {
474 self.memory.decrease(v.value.len());
475 removed.push((k, v));
476 }
477 }
478 if !removed.is_empty() && self.revision_filter_enabled {
479 Self::rebuild_prefix_revision_index_locked(&mut state);
480 }
481
482 state.revision = state.revision.max(revision);
483 self.refresh_compaction_locked(&mut state);
484 }
485
486 for (k, v) in &removed {
487 self.watch_ring.publish(WatchEvent {
488 kind: WatchEventKind::Delete,
489 key: k.clone(),
490 value: Arc::<[u8]>::from(Vec::<u8>::new()),
491 prev_value: Arc::<[u8]>::from(v.value.clone()),
492 create_revision: v.create_revision,
493 mod_revision: revision,
494 version: v.version,
495 lease: v.lease,
496 });
497 }
498
499 Ok(DeleteOutput {
500 revision,
501 deleted: removed.len() as i64,
502 prev_kvs: if prev_kv { removed } else { Vec::new() },
503 })
504 }
505
506 pub fn subscribe_watch(&self, filter: WatchFilter) -> WatchSubscription {
507 self.watch_ring.subscribe(&filter)
508 }
509
510 pub fn lease_grant(&self, id: i64, ttl: i64) -> Result<LeaseGrantOutput, StoreError> {
511 if ttl <= 0 {
512 return Err(StoreError::InvalidArgument("ttl must be > 0".to_string()));
513 }
514
515 let mut state = self.state.write();
516 let lease_id = if id > 0 {
517 id
518 } else {
519 let out = state.next_lease_id.max(1);
520 state.next_lease_id = out.saturating_add(1);
521 out
522 };
523
524 state.leases.insert(
525 lease_id,
526 LeaseEntry {
527 id: lease_id,
528 granted_ttl: ttl,
529 ttl,
530 },
531 );
532 state.next_lease_id = state.next_lease_id.max(lease_id.saturating_add(1));
533
534 Ok(LeaseGrantOutput {
535 revision: state.revision,
536 id: lease_id,
537 ttl,
538 })
539 }
540
541 pub fn lease_keep_alive(&self, id: i64) -> Result<LeaseGrantOutput, StoreError> {
542 if id <= 0 {
543 return Err(StoreError::InvalidArgument(
544 "lease id must be > 0".to_string(),
545 ));
546 }
547
548 let mut state = self.state.write();
549 let lease = state.leases.get_mut(&id).ok_or(StoreError::KeyNotFound)?;
550 lease.ttl = lease.granted_ttl;
551 let ttl = lease.ttl;
552 let revision = state.revision;
553 Ok(LeaseGrantOutput { revision, id, ttl })
554 }
555
556 pub fn lease_time_to_live(
557 &self,
558 id: i64,
559 include_keys: bool,
560 ) -> Result<LeaseTtlOutput, StoreError> {
561 if id <= 0 {
562 return Err(StoreError::InvalidArgument(
563 "lease id must be > 0".to_string(),
564 ));
565 }
566 let state = self.state.read();
567 let lease = state.leases.get(&id).ok_or(StoreError::KeyNotFound)?;
568
569 let mut keys = Vec::new();
570 if include_keys {
571 for (k, v) in state.map.iter() {
572 if v.lease == id {
573 keys.push(k.clone());
574 }
575 }
576 }
577
578 Ok(LeaseTtlOutput {
579 revision: state.revision,
580 id,
581 ttl: lease.ttl,
582 granted_ttl: lease.granted_ttl,
583 keys,
584 })
585 }
586
587 pub fn lease_list(&self) -> Vec<i64> {
588 let state = self.state.read();
589 let mut leases = state.leases.keys().copied().collect::<Vec<_>>();
590 leases.sort_unstable();
591 leases
592 }
593
594 pub fn lease_revoke(&self, id: i64) -> Result<LeaseRevokeOutput, StoreError> {
595 let revision = self.bump_revision();
596 self.apply_lease_revoke_at_revision(id, revision)
597 }
598
599 pub fn apply_lease_revoke_at_revision(
600 &self,
601 id: i64,
602 revision: i64,
603 ) -> Result<LeaseRevokeOutput, StoreError> {
604 if id <= 0 {
605 return Err(StoreError::InvalidArgument(
606 "lease id must be > 0".to_string(),
607 ));
608 }
609
610 let mut removed = Vec::new();
611 {
612 let mut state = self.state.write();
613 if state.leases.remove(&id).is_none() {
614 return Err(StoreError::KeyNotFound);
615 }
616
617 let keys = state
618 .map
619 .iter()
620 .filter(|(_, v)| v.lease == id)
621 .map(|(k, _)| k.clone())
622 .collect::<Vec<_>>();
623 for k in keys {
624 if let Some(v) = state.map.remove(&k) {
625 self.memory.decrease(v.value.len());
626 removed.push((k, v));
627 }
628 }
629 if !removed.is_empty() && self.revision_filter_enabled {
630 Self::rebuild_prefix_revision_index_locked(&mut state);
631 }
632
633 state.revision = state.revision.max(revision);
634 self.refresh_compaction_locked(&mut state);
635 }
636
637 for (k, v) in &removed {
638 self.watch_ring.publish(WatchEvent {
639 kind: WatchEventKind::Delete,
640 key: k.clone(),
641 value: Arc::<[u8]>::from(Vec::<u8>::new()),
642 prev_value: Arc::<[u8]>::from(v.value.clone()),
643 create_revision: v.create_revision,
644 mod_revision: revision,
645 version: v.version,
646 lease: v.lease,
647 });
648 }
649
650 Ok(LeaseRevokeOutput {
651 revision,
652 deleted: removed.len() as i64,
653 })
654 }
655
656 pub fn snapshot_state(&self) -> SnapshotState {
657 let state = self.state.read();
658 SnapshotState {
659 kv: state
660 .map
661 .iter()
662 .map(|(k, v)| (k.clone(), v.clone()))
663 .collect::<Vec<_>>(),
664 leases: state
665 .leases
666 .iter()
667 .map(|(k, v)| (*k, v.clone()))
668 .collect::<Vec<_>>(),
669 next_lease_id: state.next_lease_id,
670 revision: state.revision,
671 compact_revision: state.compact_revision,
672 }
673 }
674
675 pub fn load_snapshot_state(&self, snapshot: SnapshotState) -> Result<(), StoreError> {
676 let mut state = self.state.write();
677
678 state.map.clear();
679 self.memory.decrease(self.memory.used_bytes());
680
681 for (k, v) in snapshot.kv {
682 self.memory.try_increase(v.value.len())?;
683 state.map.insert(k, v);
684 }
685 if self.revision_filter_enabled {
686 Self::rebuild_prefix_revision_index_locked(&mut state);
687 } else {
688 state.prefix_max_mod_revision.clear();
689 }
690 state.leases.clear();
691 for (k, v) in snapshot.leases {
692 state.leases.insert(k, v);
693 }
694 state.next_lease_id = snapshot.next_lease_id.max(1);
695
696 state.revision = snapshot.revision;
697 state.compact_revision = snapshot.compact_revision;
698 Ok(())
699 }
700}
701
702pub fn key_in_range(candidate: &[u8], key: &[u8], range_end: &[u8]) -> bool {
703 if range_end.is_empty() {
704 return candidate == key;
705 }
706
707 if key.is_empty() && range_end == [0] {
708 return true;
709 }
710
711 candidate >= key && candidate < range_end
712}
713
714fn prefix_end(prefix: &[u8]) -> Vec<u8> {
715 let mut end = prefix.to_vec();
716 for i in (0..end.len()).rev() {
717 if end[i] < 0xFF {
718 end[i] += 1;
719 end.truncate(i + 1);
720 return end;
721 }
722 }
723 vec![0]
724}
725
726#[cfg(test)]
727mod tests {
728 use anyhow::Result;
729
730 use crate::config::WatchBacklogMode;
731
732 use super::KvStore;
733
734 #[test]
735 fn put_get_delete_roundtrip() -> Result<()> {
736 let tmp = tempfile::tempdir()?;
737 let store = KvStore::open(
738 tmp.path(),
739 4 * 1024 * 1024,
740 100,
741 true,
742 true,
743 1024,
744 1024,
745 WatchBacklogMode::Strict,
746 )?;
747
748 let out = store.put(b"a".to_vec(), b"1".to_vec(), 0, false, false)?;
749 assert_eq!(out.current.mod_revision, 1);
750
751 let range = store.range(b"a", b"", 0, 0, false, false)?;
752 assert_eq!(range.count, 1);
753
754 let del = store.delete_range(b"a", b"", true)?;
755 assert_eq!(del.deleted, 1);
756
757 Ok(())
758 }
759
760 #[test]
761 fn compacted_error_for_old_revision() -> Result<()> {
762 let tmp = tempfile::tempdir()?;
763 let store = KvStore::open(
764 tmp.path(),
765 4 * 1024 * 1024,
766 1,
767 true,
768 true,
769 1024,
770 1024,
771 WatchBacklogMode::Strict,
772 )?;
773
774 store.put(b"a".to_vec(), b"1".to_vec(), 0, false, false)?;
775 store.put(b"b".to_vec(), b"2".to_vec(), 0, false, false)?;
776
777 let res = store.range(b"a", b"", 0, 1, false, false);
778 assert!(res.is_err());
779
780 Ok(())
781 }
782
783 #[test]
784 fn revision_filter_skips_prefix_when_window_is_stale() -> Result<()> {
785 let tmp = tempfile::tempdir()?;
786 let store = KvStore::open(
787 tmp.path(),
788 4 * 1024 * 1024,
789 100,
790 true,
791 true,
792 1024,
793 1024,
794 WatchBacklogMode::Strict,
795 )?;
796
797 let prefix = b"/registry/configmaps/default/";
798 let prefix_end = super::prefix_end(prefix);
799
800 store.put(
801 b"/registry/configmaps/default/cm-a".to_vec(),
802 b"v1".to_vec(),
803 0,
804 false,
805 false,
806 )?;
807 let fresh = store.range(prefix, &prefix_end, 0, 1, false, false)?;
808 assert_eq!(fresh.count, 1);
809
810 let stale = store.range(prefix, &prefix_end, 0, 5, false, false)?;
811 assert_eq!(stale.count, 0);
812
813 Ok(())
814 }
815}