Skip to main content

reifydb_sdk/state/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, hash::Hash, mem};
5
6use postcard::{from_bytes, to_allocvec};
7use reifydb_core::{
8	encoded::{key::IntoEncodedKey, row::EncodedRow},
9	util::lru::LruCache,
10};
11use reifydb_type::util::cowvec::CowVec;
12use serde::{Serialize, de::DeserializeOwned};
13
14use crate::{
15	error::{FFIError, Result},
16	operator::context::OperatorContext,
17};
18
19pub struct StateCache<K, V> {
20	cache: LruCache<K, V>,
21	/// Keys mutated since the last `flush`. `Some(value)` means a `set`,
22	/// `None` means a `remove`. The cache acts as a coalescing buffer:
23	/// repeated `set`s of the same key only flush the final value once.
24	dirty: HashMap<K, Option<V>>,
25}
26
27impl<K, V> StateCache<K, V>
28where
29	K: Hash + Eq + Clone,
30	for<'a> &'a K: IntoEncodedKey,
31	V: Clone + Serialize + DeserializeOwned,
32{
33	pub fn new(capacity: usize) -> Self {
34		Self {
35			cache: LruCache::new(capacity),
36			dirty: HashMap::new(),
37		}
38	}
39
40	pub fn get(&mut self, ctx: &mut OperatorContext, key: &K) -> Result<Option<V>> {
41		// Check cache first
42		if let Some(cached) = self.cache.get(key) {
43			return Ok(Some(cached.clone()));
44		}
45
46		// Cache miss - load from FFI
47		let encoded_key = key.into_encoded_key();
48		let state = ctx.state();
49		match state.get(&encoded_key)? {
50			Some(encoded_row) => {
51				// Deserialize and cache
52				let value: V = from_bytes(encoded_row.as_ref()).map_err(|e| {
53					FFIError::Serialization(format!("deserialization failed: {}", e))
54				})?;
55				self.cache.put(key.clone(), value.clone());
56				Ok(Some(value))
57			}
58			None => Ok(None),
59		}
60	}
61
62	pub fn set(&mut self, _ctx: &mut OperatorContext, key: &K, value: &V) -> Result<()> {
63		self.cache.put(key.clone(), value.clone());
64		self.dirty.insert(key.clone(), Some(value.clone()));
65		Ok(())
66	}
67
68	pub fn remove(&mut self, _ctx: &mut OperatorContext, key: &K) -> Result<()> {
69		self.cache.remove(key);
70		self.dirty.insert(key.clone(), None);
71		Ok(())
72	}
73
74	/// Drain dirty entries and write them through to host storage.
75	///
76	/// Called once per txn at commit time, normally from the operator's
77	/// `FFIOperator::flush_state` impl. Repeated `set`s of the same key
78	/// produce a single write here; coalesced `set` + `remove` produces a
79	/// single remove.
80	pub fn flush(&mut self, ctx: &mut OperatorContext) -> Result<()> {
81		let dirty = mem::take(&mut self.dirty);
82		for (key, slot) in dirty {
83			let encoded_key = (&key).into_encoded_key();
84			match slot {
85				Some(value) => {
86					let bytes = to_allocvec(&value).map_err(|e| {
87						FFIError::Serialization(format!("serialization failed: {}", e))
88					})?;
89					let encoded_row = EncodedRow(CowVec::new(bytes));
90					ctx.state().set(&encoded_key, &encoded_row)?;
91				}
92				None => {
93					ctx.state().remove(&encoded_key)?;
94				}
95			}
96		}
97		Ok(())
98	}
99
100	pub fn clear_cache(&mut self) {
101		self.cache.clear();
102	}
103
104	pub fn invalidate(&mut self, key: &K) {
105		self.cache.remove(key);
106	}
107
108	pub fn is_cached(&self, key: &K) -> bool {
109		self.cache.contains_key(key)
110	}
111
112	pub fn len(&self) -> usize {
113		self.cache.len()
114	}
115
116	pub fn is_empty(&self) -> bool {
117		self.cache.is_empty()
118	}
119
120	pub fn capacity(&self) -> usize {
121		self.cache.capacity()
122	}
123}
124
125impl<K, V> StateCache<K, V>
126where
127	K: Hash + Eq + Clone,
128	for<'a> &'a K: IntoEncodedKey,
129	V: Clone + Default + Serialize + DeserializeOwned,
130{
131	pub fn get_or_default(&mut self, ctx: &mut OperatorContext, key: &K) -> Result<V> {
132		match self.get(ctx, key)? {
133			Some(value) => Ok(value),
134			None => Ok(V::default()),
135		}
136	}
137
138	pub fn update<U>(&mut self, ctx: &mut OperatorContext, key: &K, updater: U) -> Result<V>
139	where
140		U: FnOnce(&mut V) -> Result<()>,
141	{
142		// Load or create default
143		let mut value = self.get_or_default(ctx, key)?;
144
145		// Apply update
146		updater(&mut value)?;
147
148		// Save (write-through)
149		self.set(ctx, key, &value)?;
150
151		Ok(value)
152	}
153}
154
155#[cfg(test)]
156pub mod tests {
157	use reifydb_core::encoded::key::IntoEncodedKey;
158
159	use super::*;
160
161	#[test]
162	fn test_cache_capacity() {
163		let cache: StateCache<String, i32> = StateCache::new(100);
164		assert_eq!(cache.capacity(), 100);
165		assert!(cache.is_empty());
166		assert_eq!(cache.len(), 0);
167	}
168
169	#[test]
170	#[should_panic(expected = "capacity must be greater than 0")]
171	fn test_zero_capacity_panics() {
172		let _cache: StateCache<String, i32> = StateCache::new(0);
173	}
174
175	#[test]
176	fn test_into_encoded_key_string() {
177		let key = "test_key".to_string();
178		let encoded = (&key).into_encoded_key();
179		assert!(!encoded.as_bytes().is_empty());
180	}
181
182	#[test]
183	fn test_into_encoded_key_str() {
184		let key = "test_key";
185		let encoded = key.into_encoded_key();
186		assert!(!encoded.as_bytes().is_empty());
187	}
188
189	#[test]
190	fn test_into_encoded_key_tuple2() {
191		let key = ("base".to_string(), "quote".to_string());
192		let encoded = (&key).into_encoded_key();
193		assert!(!encoded.as_bytes().is_empty());
194	}
195
196	#[test]
197	fn test_into_encoded_key_tuple3() {
198		let key = ("a".to_string(), "b".to_string(), "c".to_string());
199		let encoded = (&key).into_encoded_key();
200		assert!(!encoded.as_bytes().is_empty());
201	}
202
203	#[test]
204	fn test_into_encoded_key_consistency() {
205		let key1 = ("base".to_string(), "quote".to_string());
206		let key2 = ("base".to_string(), "quote".to_string());
207		assert_eq!((&key1).into_encoded_key().as_bytes(), (&key2).into_encoded_key().as_bytes());
208	}
209
210	#[test]
211	fn test_into_encoded_key_different_keys() {
212		let key1 = ("a".to_string(), "b".to_string());
213		let key2 = ("c".to_string(), "d".to_string());
214		assert_ne!((&key1).into_encoded_key().as_bytes(), (&key2).into_encoded_key().as_bytes());
215	}
216}