1use crate::cache::CacheKey;
8use crate::schema::Schema;
9use crate::value::Value;
10use serde::{Deserialize, Serialize};
11use std::fmt;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub enum VirtualValue {
27 Materialized { value: Value, schema: Schema },
29
30 Cached { key: CacheKey, schema: Schema },
32
33 Deferred {
36 producer_node_id: String,
37 cache_key: CacheKey,
38 schema: Schema,
39 },
40
41 Stream { source_id: String, schema: Schema },
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ValueStatus {
48 InMemory,
50 OnDisk,
52 NotComputed,
54 Streaming,
56}
57
58impl VirtualValue {
59 pub fn materialized(value: Value) -> Self {
61 let schema = Self::infer_schema(&value);
62 Self::Materialized { value, schema }
63 }
64
65 pub fn materialized_with_schema(value: Value, schema: Schema) -> Self {
67 Self::Materialized { value, schema }
68 }
69
70 pub fn cached(key: CacheKey, schema: Schema) -> Self {
72 Self::Cached { key, schema }
73 }
74
75 pub fn deferred(
77 producer_node_id: impl Into<String>,
78 cache_key: CacheKey,
79 schema: Schema,
80 ) -> Self {
81 Self::Deferred {
82 producer_node_id: producer_node_id.into(),
83 cache_key,
84 schema,
85 }
86 }
87
88 pub fn schema(&self) -> &Schema {
90 match self {
91 Self::Materialized { schema, .. }
92 | Self::Cached { schema, .. }
93 | Self::Deferred { schema, .. }
94 | Self::Stream { schema, .. } => schema,
95 }
96 }
97
98 pub fn status(&self) -> ValueStatus {
100 match self {
101 Self::Materialized { .. } => ValueStatus::InMemory,
102 Self::Cached { .. } => ValueStatus::OnDisk,
103 Self::Deferred { .. } => ValueStatus::NotComputed,
104 Self::Stream { .. } => ValueStatus::Streaming,
105 }
106 }
107
108 pub fn as_value(&self) -> Option<&Value> {
110 match self {
111 Self::Materialized { value, .. } => Some(value),
112 _ => None,
113 }
114 }
115
116 pub fn cache_key(&self) -> Option<&CacheKey> {
118 match self {
119 Self::Cached { key, .. } | Self::Deferred { cache_key: key, .. } => Some(key),
120 _ => None,
121 }
122 }
123
124 pub fn try_load(
126 &self,
127 cache: &dyn crate::cache::CacheStore,
128 ) -> crate::error::Result<Option<Value>> {
129 match self {
130 Self::Materialized { value, .. } => Ok(Some(value.clone())),
131 Self::Cached { key, .. } => cache.get(key),
132 Self::Deferred { cache_key, .. } => cache.get(cache_key),
133 _ => Ok(None),
134 }
135 }
136
137 pub fn resolve(&self, cache: &dyn crate::cache::CacheStore) -> crate::error::Result<Self> {
140 match self {
141 Self::Materialized { .. } => Ok(self.clone()),
142 Self::Cached { key, schema } => {
143 if let Some(value) = cache.get(key)? {
144 Ok(Self::Materialized {
145 value,
146 schema: schema.clone(),
147 })
148 } else {
149 Ok(self.clone()) }
151 }
152 Self::Deferred {
153 cache_key, schema, ..
154 } => {
155 if let Some(value) = cache.get(cache_key)? {
156 Ok(Self::Materialized {
157 value,
158 schema: schema.clone(),
159 })
160 } else {
161 Ok(self.clone()) }
163 }
164 _ => Ok(self.clone()),
165 }
166 }
167
168 fn infer_schema(value: &Value) -> Schema {
170 match value {
171 Value::Tensor { values: _, shape } => Schema {
172 dtype: crate::schema::DataType::Float64,
173 shape: Some(
174 shape
175 .iter()
176 .map(|&d| crate::schema::Dimension::Fixed(d))
177 .collect(),
178 ),
179 },
180 Value::Json(_) => Schema::json(),
181 Value::Bytes(_) | Value::Object(_) => Schema::bytes(),
182 Value::Empty => Schema::dynamic(crate::schema::DataType::Float64),
183 }
184 }
185}
186
187impl fmt::Display for VirtualValue {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 match self {
190 Self::Materialized { schema, value } => {
191 write!(f, "Materialized({schema}, size={})", value.size())
192 }
193 Self::Cached { key, schema } => {
194 write!(f, "Cached({schema}, key={key})")
195 }
196 Self::Deferred {
197 producer_node_id,
198 schema,
199 ..
200 } => {
201 write!(f, "Deferred({schema}, producer={producer_node_id})")
202 }
203 Self::Stream { source_id, schema } => {
204 write!(f, "Stream({schema}, source={source_id})")
205 }
206 }
207 }
208}
209
210impl From<Value> for VirtualValue {
211 fn from(value: Value) -> Self {
212 Self::materialized(value)
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::cache::CacheKey;
220 use crate::schema::{DataType, Schema};
221
222 #[test]
223 fn materialized_from_value() {
224 let val = Value::tensor(vec![1.0, 2.0, 3.0], vec![3]);
225 let vv = VirtualValue::materialized(val.clone());
226
227 assert_eq!(vv.status(), ValueStatus::InMemory);
228 assert_eq!(vv.as_value(), Some(&val));
229 assert_eq!(vv.schema().dtype, DataType::Float64);
230 assert_eq!(vv.schema().rank(), Some(1));
231 }
232
233 #[test]
234 fn cached_reference() {
235 let key = CacheKey::hash_data(b"test");
236 let schema = Schema::vector(DataType::Float64, 100);
237 let vv = VirtualValue::cached(key.clone(), schema.clone());
238
239 assert_eq!(vv.status(), ValueStatus::OnDisk);
240 assert_eq!(vv.cache_key(), Some(&key));
241 assert_eq!(vv.schema(), &schema);
242 assert!(vv.as_value().is_none());
243 }
244
245 #[test]
246 fn deferred_reference() {
247 let key = CacheKey::hash_data(b"deferred");
248 let schema = Schema::batched(DataType::Float64, &[128]);
249 let vv = VirtualValue::deferred("my_node", key.clone(), schema.clone());
250
251 assert_eq!(vv.status(), ValueStatus::NotComputed);
252 assert_eq!(vv.cache_key(), Some(&key));
253 assert_eq!(vv.schema(), &schema);
254 }
255
256 #[test]
257 fn schema_inferred_from_tensor() {
258 let val = Value::tensor(vec![0.0; 12], vec![3, 4]);
259 let vv = VirtualValue::materialized(val);
260 assert_eq!(vv.schema().dtype, DataType::Float64);
261 assert_eq!(vv.schema().rank(), Some(2));
262 }
263
264 #[test]
265 fn schema_inferred_from_json() {
266 let val = Value::json(serde_json::json!({"a": 1}));
267 let vv = VirtualValue::materialized(val);
268 assert_eq!(vv.schema().dtype, DataType::Json);
269 }
270
271 #[test]
272 fn display_formatting() {
273 let vv = VirtualValue::materialized(Value::tensor(vec![1.0], vec![1]));
274 assert!(vv.to_string().contains("Materialized"));
275
276 let vv = VirtualValue::cached(CacheKey::hash_data(b"k"), Schema::json());
277 assert!(vv.to_string().contains("Cached"));
278
279 let vv = VirtualValue::deferred("node_1", CacheKey::hash_data(b"k"), Schema::json());
280 assert!(vv.to_string().contains("Deferred"));
281 }
282
283 #[test]
284 fn from_value_conversion() {
285 let val = Value::tensor(vec![1.0, 2.0], vec![2]);
286 let vv: VirtualValue = val.clone().into();
287 assert_eq!(vv.status(), ValueStatus::InMemory);
288 assert_eq!(vv.as_value(), Some(&val));
289 }
290
291 #[test]
292 fn resolve_materialized_stays_materialized() {
293 use crate::cache::CacheStore;
294
295 struct EmptyCache;
297 impl CacheStore for EmptyCache {
298 fn get(&self, _: &CacheKey) -> crate::error::Result<Option<Value>> {
299 Ok(None)
300 }
301 fn put(&self, _: &CacheKey, _: &Value) -> crate::error::Result<()> {
302 Ok(())
303 }
304 fn exists(&self, _: &CacheKey) -> crate::error::Result<bool> {
305 Ok(false)
306 }
307 fn remove(&self, _: &CacheKey) -> crate::error::Result<()> {
308 Ok(())
309 }
310 fn metadata(
311 &self,
312 _: &CacheKey,
313 ) -> crate::error::Result<Option<crate::cache::EntryMeta>> {
314 Ok(None)
315 }
316 }
317
318 let val = Value::tensor(vec![1.0], vec![1]);
319 let vv = VirtualValue::materialized(val);
320 let resolved = vv.resolve(&EmptyCache).unwrap();
321 assert_eq!(resolved.status(), ValueStatus::InMemory);
322 }
323
324 #[test]
325 fn resolve_deferred_checks_cache() {
326 use crate::cache::{CacheStore, EntryMeta};
327 use std::collections::HashMap;
328 use std::sync::Mutex;
329
330 struct TestCache {
331 store: Mutex<HashMap<CacheKey, Value>>,
332 }
333 impl TestCache {
334 fn with(key: CacheKey, value: Value) -> Self {
335 let mut store = HashMap::new();
336 store.insert(key, value);
337 Self {
338 store: Mutex::new(store),
339 }
340 }
341 }
342 impl CacheStore for TestCache {
343 fn get(&self, key: &CacheKey) -> crate::error::Result<Option<Value>> {
344 Ok(self.store.lock().unwrap().get(key).cloned())
345 }
346 fn put(&self, _: &CacheKey, _: &Value) -> crate::error::Result<()> {
347 Ok(())
348 }
349 fn exists(&self, key: &CacheKey) -> crate::error::Result<bool> {
350 Ok(self.store.lock().unwrap().contains_key(key))
351 }
352 fn remove(&self, _: &CacheKey) -> crate::error::Result<()> {
353 Ok(())
354 }
355 fn metadata(&self, _: &CacheKey) -> crate::error::Result<Option<EntryMeta>> {
356 Ok(None)
357 }
358 }
359
360 let key = CacheKey::hash_data(b"cached_value");
361 let expected = Value::tensor(vec![42.0], vec![1]);
362 let cache = TestCache::with(key.clone(), expected.clone());
363
364 let vv = VirtualValue::deferred("producer", key, Schema::vector(DataType::Float64, 1));
366 assert_eq!(vv.status(), ValueStatus::NotComputed);
367
368 let resolved = vv.resolve(&cache).unwrap();
369 assert_eq!(resolved.status(), ValueStatus::InMemory);
370 assert_eq!(resolved.as_value(), Some(&expected));
371 }
372
373 #[test]
374 fn serde_roundtrip() {
375 let values = vec![
376 VirtualValue::materialized(Value::tensor(vec![1.0], vec![1])),
377 VirtualValue::cached(CacheKey::hash_data(b"k"), Schema::json()),
378 VirtualValue::deferred(
379 "n",
380 CacheKey::hash_data(b"d"),
381 Schema::vector(DataType::Float64, 10),
382 ),
383 ];
384 for vv in values {
385 let json = serde_json::to_string(&vv).unwrap();
386 let deserialized: VirtualValue = serde_json::from_str(&json).unwrap();
387 assert_eq!(vv.status(), deserialized.status());
388 assert_eq!(vv.schema(), deserialized.schema());
389 }
390 }
391}