Skip to main content

somatize_core/
virtual_value.rs

1//! Virtual values — lazy references to data that can be materialized on demand.
2//!
3//! Instead of eagerly loading all intermediate results, [`VirtualValue`]
4//! keeps references (Materialized, Cached, Deferred, Stream) and only
5//! materializes when a filter actually needs the data.
6
7use crate::cache::CacheKey;
8use crate::schema::Schema;
9use crate::value::Value;
10use serde::{Deserialize, Serialize};
11use std::fmt;
12
13/// A lazy reference to a value that can be materialized on demand.
14///
15/// This is the core of Soma's data virtualization. Instead of computing
16/// and storing every intermediate result, values are represented as
17/// references that carry enough information to:
18///
19/// - Inspect schema without loading data
20/// - Check whether the data is already available
21/// - Compute it when needed
22///
23/// Like Denodo's data virtualization, but for computation rather than SQL.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[non_exhaustive]
26pub enum VirtualValue {
27    /// Already computed and in memory. Ready to use.
28    Materialized { value: Value, schema: Schema },
29
30    /// Stored in cache (K/V store). Can be loaded on demand.
31    Cached { key: CacheKey, schema: Schema },
32
33    /// Not computed yet. Carries the "recipe" to produce it:
34    /// which node produces it, and what its cache key would be.
35    Deferred {
36        producer_node_id: String,
37        cache_key: CacheKey,
38        schema: Schema,
39    },
40
41    /// A stream that materializes chunk by chunk.
42    Stream { source_id: String, schema: Schema },
43}
44
45/// Status of a VirtualValue without inspecting the actual data.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ValueStatus {
48    /// In memory, ready to use.
49    InMemory,
50    /// On disk/cache, needs loading.
51    OnDisk,
52    /// Not computed, needs execution.
53    NotComputed,
54    /// Streaming, partial data.
55    Streaming,
56}
57
58impl VirtualValue {
59    /// Create a materialized value with auto-inferred schema.
60    pub fn materialized(value: Value) -> Self {
61        let schema = Self::infer_schema(&value);
62        Self::Materialized { value, schema }
63    }
64
65    /// Create a materialized value with explicit schema.
66    pub fn materialized_with_schema(value: Value, schema: Schema) -> Self {
67        Self::Materialized { value, schema }
68    }
69
70    /// Create a cached reference.
71    pub fn cached(key: CacheKey, schema: Schema) -> Self {
72        Self::Cached { key, schema }
73    }
74
75    /// Create a deferred (not yet computed) reference.
76    pub fn deferred(
77        producer_node_id: impl Into<String>,
78        cache_key: CacheKey,
79        schema: Schema,
80    ) -> Self {
81        Self::Deferred {
82            producer_node_id: producer_node_id.into(),
83            cache_key,
84            schema,
85        }
86    }
87
88    /// Get the schema without materializing the value.
89    pub fn schema(&self) -> &Schema {
90        match self {
91            Self::Materialized { schema, .. }
92            | Self::Cached { schema, .. }
93            | Self::Deferred { schema, .. }
94            | Self::Stream { schema, .. } => schema,
95        }
96    }
97
98    /// Get the current status.
99    pub fn status(&self) -> ValueStatus {
100        match self {
101            Self::Materialized { .. } => ValueStatus::InMemory,
102            Self::Cached { .. } => ValueStatus::OnDisk,
103            Self::Deferred { .. } => ValueStatus::NotComputed,
104            Self::Stream { .. } => ValueStatus::Streaming,
105        }
106    }
107
108    /// Get the materialized value if already in memory.
109    pub fn as_value(&self) -> Option<&Value> {
110        match self {
111            Self::Materialized { value, .. } => Some(value),
112            _ => None,
113        }
114    }
115
116    /// Get the cache key (if this value is cached or deferred).
117    pub fn cache_key(&self) -> Option<&CacheKey> {
118        match self {
119            Self::Cached { key, .. } | Self::Deferred { cache_key: key, .. } => Some(key),
120            _ => None,
121        }
122    }
123
124    /// Materialize from a cache store. Returns the value if found, None if not cached.
125    pub fn try_load(
126        &self,
127        cache: &dyn crate::cache::CacheStore,
128    ) -> crate::error::Result<Option<Value>> {
129        match self {
130            Self::Materialized { value, .. } => Ok(Some(value.clone())),
131            Self::Cached { key, .. } => cache.get(key),
132            Self::Deferred { cache_key, .. } => cache.get(cache_key),
133            _ => Ok(None),
134        }
135    }
136
137    /// Upgrade this reference: if Deferred, check cache; if Cached, load.
138    /// Returns a new VirtualValue that may be closer to Materialized.
139    pub fn resolve(&self, cache: &dyn crate::cache::CacheStore) -> crate::error::Result<Self> {
140        match self {
141            Self::Materialized { .. } => Ok(self.clone()),
142            Self::Cached { key, schema } => {
143                if let Some(value) = cache.get(key)? {
144                    Ok(Self::Materialized {
145                        value,
146                        schema: schema.clone(),
147                    })
148                } else {
149                    Ok(self.clone()) // still cached but value missing
150                }
151            }
152            Self::Deferred {
153                cache_key, schema, ..
154            } => {
155                if let Some(value) = cache.get(cache_key)? {
156                    Ok(Self::Materialized {
157                        value,
158                        schema: schema.clone(),
159                    })
160                } else {
161                    Ok(self.clone()) // still deferred
162                }
163            }
164            _ => Ok(self.clone()),
165        }
166    }
167
168    /// Infer schema from a concrete Value.
169    fn infer_schema(value: &Value) -> Schema {
170        match value {
171            Value::Tensor { values: _, shape } => Schema {
172                dtype: crate::schema::DataType::Float64,
173                shape: Some(
174                    shape
175                        .iter()
176                        .map(|&d| crate::schema::Dimension::Fixed(d))
177                        .collect(),
178                ),
179            },
180            Value::Json(_) => Schema::json(),
181            Value::Bytes(_) => Schema::bytes(),
182            Value::Empty => Schema::dynamic(crate::schema::DataType::Float64),
183        }
184    }
185}
186
187impl fmt::Display for VirtualValue {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        match self {
190            Self::Materialized { schema, value } => {
191                write!(f, "Materialized({schema}, size={})", value.size())
192            }
193            Self::Cached { key, schema } => {
194                write!(f, "Cached({schema}, key={key})")
195            }
196            Self::Deferred {
197                producer_node_id,
198                schema,
199                ..
200            } => {
201                write!(f, "Deferred({schema}, producer={producer_node_id})")
202            }
203            Self::Stream { source_id, schema } => {
204                write!(f, "Stream({schema}, source={source_id})")
205            }
206        }
207    }
208}
209
210impl From<Value> for VirtualValue {
211    fn from(value: Value) -> Self {
212        Self::materialized(value)
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::cache::CacheKey;
220    use crate::schema::{DataType, Schema};
221
222    #[test]
223    fn materialized_from_value() {
224        let val = Value::tensor(vec![1.0, 2.0, 3.0], vec![3]);
225        let vv = VirtualValue::materialized(val.clone());
226
227        assert_eq!(vv.status(), ValueStatus::InMemory);
228        assert_eq!(vv.as_value(), Some(&val));
229        assert_eq!(vv.schema().dtype, DataType::Float64);
230        assert_eq!(vv.schema().rank(), Some(1));
231    }
232
233    #[test]
234    fn cached_reference() {
235        let key = CacheKey::hash_data(b"test");
236        let schema = Schema::vector(DataType::Float64, 100);
237        let vv = VirtualValue::cached(key.clone(), schema.clone());
238
239        assert_eq!(vv.status(), ValueStatus::OnDisk);
240        assert_eq!(vv.cache_key(), Some(&key));
241        assert_eq!(vv.schema(), &schema);
242        assert!(vv.as_value().is_none());
243    }
244
245    #[test]
246    fn deferred_reference() {
247        let key = CacheKey::hash_data(b"deferred");
248        let schema = Schema::batched(DataType::Float64, &[128]);
249        let vv = VirtualValue::deferred("my_node", key.clone(), schema.clone());
250
251        assert_eq!(vv.status(), ValueStatus::NotComputed);
252        assert_eq!(vv.cache_key(), Some(&key));
253        assert_eq!(vv.schema(), &schema);
254    }
255
256    #[test]
257    fn schema_inferred_from_tensor() {
258        let val = Value::tensor(vec![0.0; 12], vec![3, 4]);
259        let vv = VirtualValue::materialized(val);
260        assert_eq!(vv.schema().dtype, DataType::Float64);
261        assert_eq!(vv.schema().rank(), Some(2));
262    }
263
264    #[test]
265    fn schema_inferred_from_json() {
266        let val = Value::json(serde_json::json!({"a": 1}));
267        let vv = VirtualValue::materialized(val);
268        assert_eq!(vv.schema().dtype, DataType::Json);
269    }
270
271    #[test]
272    fn display_formatting() {
273        let vv = VirtualValue::materialized(Value::tensor(vec![1.0], vec![1]));
274        assert!(vv.to_string().contains("Materialized"));
275
276        let vv = VirtualValue::cached(CacheKey::hash_data(b"k"), Schema::json());
277        assert!(vv.to_string().contains("Cached"));
278
279        let vv = VirtualValue::deferred("node_1", CacheKey::hash_data(b"k"), Schema::json());
280        assert!(vv.to_string().contains("Deferred"));
281    }
282
283    #[test]
284    fn from_value_conversion() {
285        let val = Value::tensor(vec![1.0, 2.0], vec![2]);
286        let vv: VirtualValue = val.clone().into();
287        assert_eq!(vv.status(), ValueStatus::InMemory);
288        assert_eq!(vv.as_value(), Some(&val));
289    }
290
291    #[test]
292    fn resolve_materialized_stays_materialized() {
293        use crate::cache::CacheStore;
294
295        // Simple mock cache
296        struct EmptyCache;
297        impl CacheStore for EmptyCache {
298            fn get(&self, _: &CacheKey) -> crate::error::Result<Option<Value>> {
299                Ok(None)
300            }
301            fn put(&self, _: &CacheKey, _: &Value) -> crate::error::Result<()> {
302                Ok(())
303            }
304            fn exists(&self, _: &CacheKey) -> crate::error::Result<bool> {
305                Ok(false)
306            }
307            fn remove(&self, _: &CacheKey) -> crate::error::Result<()> {
308                Ok(())
309            }
310            fn metadata(
311                &self,
312                _: &CacheKey,
313            ) -> crate::error::Result<Option<crate::cache::EntryMeta>> {
314                Ok(None)
315            }
316        }
317
318        let val = Value::tensor(vec![1.0], vec![1]);
319        let vv = VirtualValue::materialized(val);
320        let resolved = vv.resolve(&EmptyCache).unwrap();
321        assert_eq!(resolved.status(), ValueStatus::InMemory);
322    }
323
324    #[test]
325    fn resolve_deferred_checks_cache() {
326        use crate::cache::{CacheStore, EntryMeta};
327        use std::collections::HashMap;
328        use std::sync::Mutex;
329
330        struct TestCache {
331            store: Mutex<HashMap<CacheKey, Value>>,
332        }
333        impl TestCache {
334            fn with(key: CacheKey, value: Value) -> Self {
335                let mut store = HashMap::new();
336                store.insert(key, value);
337                Self {
338                    store: Mutex::new(store),
339                }
340            }
341        }
342        impl CacheStore for TestCache {
343            fn get(&self, key: &CacheKey) -> crate::error::Result<Option<Value>> {
344                Ok(self.store.lock().unwrap().get(key).cloned())
345            }
346            fn put(&self, _: &CacheKey, _: &Value) -> crate::error::Result<()> {
347                Ok(())
348            }
349            fn exists(&self, key: &CacheKey) -> crate::error::Result<bool> {
350                Ok(self.store.lock().unwrap().contains_key(key))
351            }
352            fn remove(&self, _: &CacheKey) -> crate::error::Result<()> {
353                Ok(())
354            }
355            fn metadata(&self, _: &CacheKey) -> crate::error::Result<Option<EntryMeta>> {
356                Ok(None)
357            }
358        }
359
360        let key = CacheKey::hash_data(b"cached_value");
361        let expected = Value::tensor(vec![42.0], vec![1]);
362        let cache = TestCache::with(key.clone(), expected.clone());
363
364        // Deferred → resolve → Materialized (if found in cache)
365        let vv = VirtualValue::deferred("producer", key, Schema::vector(DataType::Float64, 1));
366        assert_eq!(vv.status(), ValueStatus::NotComputed);
367
368        let resolved = vv.resolve(&cache).unwrap();
369        assert_eq!(resolved.status(), ValueStatus::InMemory);
370        assert_eq!(resolved.as_value(), Some(&expected));
371    }
372
373    #[test]
374    fn serde_roundtrip() {
375        let values = vec![
376            VirtualValue::materialized(Value::tensor(vec![1.0], vec![1])),
377            VirtualValue::cached(CacheKey::hash_data(b"k"), Schema::json()),
378            VirtualValue::deferred(
379                "n",
380                CacheKey::hash_data(b"d"),
381                Schema::vector(DataType::Float64, 10),
382            ),
383        ];
384        for vv in values {
385            let json = serde_json::to_string(&vv).unwrap();
386            let deserialized: VirtualValue = serde_json::from_str(&json).unwrap();
387            assert_eq!(vv.status(), deserialized.status());
388            assert_eq!(vv.schema(), deserialized.schema());
389        }
390    }
391}