reifydb_sdk/state/
cache.rs1use std::{collections::HashMap, hash::Hash, mem};
5
6use postcard::{from_bytes, to_allocvec};
7use reifydb_core::{
8 encoded::{key::IntoEncodedKey, row::EncodedRow},
9 util::lru::LruCache,
10};
11use reifydb_type::util::cowvec::CowVec;
12use serde::{Serialize, de::DeserializeOwned};
13
14use crate::{
15 error::{FFIError, Result},
16 operator::context::OperatorContext,
17};
18
19pub struct StateCache<K, V> {
20 cache: LruCache<K, V>,
21 dirty: HashMap<K, Option<V>>,
25}
26
27impl<K, V> StateCache<K, V>
28where
29 K: Hash + Eq + Clone,
30 for<'a> &'a K: IntoEncodedKey,
31 V: Clone + Serialize + DeserializeOwned,
32{
33 pub fn new(capacity: usize) -> Self {
34 Self {
35 cache: LruCache::new(capacity),
36 dirty: HashMap::new(),
37 }
38 }
39
40 pub fn get(&mut self, ctx: &mut OperatorContext, key: &K) -> Result<Option<V>> {
41 if let Some(cached) = self.cache.get(key) {
43 return Ok(Some(cached.clone()));
44 }
45
46 let encoded_key = key.into_encoded_key();
48 let state = ctx.state();
49 match state.get(&encoded_key)? {
50 Some(encoded_row) => {
51 let value: V = from_bytes(encoded_row.as_ref()).map_err(|e| {
53 FFIError::Serialization(format!("deserialization failed: {}", e))
54 })?;
55 self.cache.put(key.clone(), value.clone());
56 Ok(Some(value))
57 }
58 None => Ok(None),
59 }
60 }
61
62 pub fn set(&mut self, _ctx: &mut OperatorContext, key: &K, value: &V) -> Result<()> {
63 self.cache.put(key.clone(), value.clone());
64 self.dirty.insert(key.clone(), Some(value.clone()));
65 Ok(())
66 }
67
68 pub fn remove(&mut self, _ctx: &mut OperatorContext, key: &K) -> Result<()> {
69 self.cache.remove(key);
70 self.dirty.insert(key.clone(), None);
71 Ok(())
72 }
73
74 pub fn flush(&mut self, ctx: &mut OperatorContext) -> Result<()> {
81 let dirty = mem::take(&mut self.dirty);
82 for (key, slot) in dirty {
83 let encoded_key = (&key).into_encoded_key();
84 match slot {
85 Some(value) => {
86 let bytes = to_allocvec(&value).map_err(|e| {
87 FFIError::Serialization(format!("serialization failed: {}", e))
88 })?;
89 let encoded_row = EncodedRow(CowVec::new(bytes));
90 ctx.state().set(&encoded_key, &encoded_row)?;
91 }
92 None => {
93 ctx.state().remove(&encoded_key)?;
94 }
95 }
96 }
97 Ok(())
98 }
99
100 pub fn clear_cache(&mut self) {
101 self.cache.clear();
102 }
103
104 pub fn invalidate(&mut self, key: &K) {
105 self.cache.remove(key);
106 }
107
108 pub fn is_cached(&self, key: &K) -> bool {
109 self.cache.contains_key(key)
110 }
111
112 pub fn len(&self) -> usize {
113 self.cache.len()
114 }
115
116 pub fn is_empty(&self) -> bool {
117 self.cache.is_empty()
118 }
119
120 pub fn capacity(&self) -> usize {
121 self.cache.capacity()
122 }
123}
124
125impl<K, V> StateCache<K, V>
126where
127 K: Hash + Eq + Clone,
128 for<'a> &'a K: IntoEncodedKey,
129 V: Clone + Default + Serialize + DeserializeOwned,
130{
131 pub fn get_or_default(&mut self, ctx: &mut OperatorContext, key: &K) -> Result<V> {
132 match self.get(ctx, key)? {
133 Some(value) => Ok(value),
134 None => Ok(V::default()),
135 }
136 }
137
138 pub fn update<U>(&mut self, ctx: &mut OperatorContext, key: &K, updater: U) -> Result<V>
139 where
140 U: FnOnce(&mut V) -> Result<()>,
141 {
142 let mut value = self.get_or_default(ctx, key)?;
144
145 updater(&mut value)?;
147
148 self.set(ctx, key, &value)?;
150
151 Ok(value)
152 }
153}
154
155#[cfg(test)]
156pub mod tests {
157 use reifydb_core::encoded::key::IntoEncodedKey;
158
159 use super::*;
160
161 #[test]
162 fn test_cache_capacity() {
163 let cache: StateCache<String, i32> = StateCache::new(100);
164 assert_eq!(cache.capacity(), 100);
165 assert!(cache.is_empty());
166 assert_eq!(cache.len(), 0);
167 }
168
169 #[test]
170 #[should_panic(expected = "capacity must be greater than 0")]
171 fn test_zero_capacity_panics() {
172 let _cache: StateCache<String, i32> = StateCache::new(0);
173 }
174
175 #[test]
176 fn test_into_encoded_key_string() {
177 let key = "test_key".to_string();
178 let encoded = (&key).into_encoded_key();
179 assert!(!encoded.as_bytes().is_empty());
180 }
181
182 #[test]
183 fn test_into_encoded_key_str() {
184 let key = "test_key";
185 let encoded = key.into_encoded_key();
186 assert!(!encoded.as_bytes().is_empty());
187 }
188
189 #[test]
190 fn test_into_encoded_key_tuple2() {
191 let key = ("base".to_string(), "quote".to_string());
192 let encoded = (&key).into_encoded_key();
193 assert!(!encoded.as_bytes().is_empty());
194 }
195
196 #[test]
197 fn test_into_encoded_key_tuple3() {
198 let key = ("a".to_string(), "b".to_string(), "c".to_string());
199 let encoded = (&key).into_encoded_key();
200 assert!(!encoded.as_bytes().is_empty());
201 }
202
203 #[test]
204 fn test_into_encoded_key_consistency() {
205 let key1 = ("base".to_string(), "quote".to_string());
206 let key2 = ("base".to_string(), "quote".to_string());
207 assert_eq!((&key1).into_encoded_key().as_bytes(), (&key2).into_encoded_key().as_bytes());
208 }
209
210 #[test]
211 fn test_into_encoded_key_different_keys() {
212 let key1 = ("a".to_string(), "b".to_string());
213 let key2 = ("c".to_string(), "d".to_string());
214 assert_ne!((&key1).into_encoded_key().as_bytes(), (&key2).into_encoded_key().as_bytes());
215 }
216}