Skip to main content

uni_query/
projection_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Per-`StorageManager` cache of materialised graph projections.
5//!
6//! Backs the `uni.graph.{project, drop, list, exists}` procedure family
7//! and the V2 `(graphRef, config)` `Named` projection variant. Each
8//! [`ProjectionStore`] is scoped to a `StorageManager` instance — that
9//! is the closest available proxy for "this database" in the current
10//! architecture (the proposal calls for a per-`Database`
11//! [`ProjectionStore`]; uni-db's `Database` type hangs off
12//! `StorageManager`, so we use the manager's Arc pointer identity as
13//! the cache key in the process-global registry below).
14//!
15//! v1 caveats (per proposal §4.10.3): in-memory only (not persisted),
16//! no eviction policy other than `drop`, no LRU. The `bytes` field on
17//! [`ProjectionEntry`] exists for a future LRU policy. Staleness is
18//! explicit — a projection is frozen at materialisation time;
19//! recomputing it requires a `drop` + `project` cycle.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23use std::time::SystemTime;
24
25use uni_algo::algo::GraphProjection;
26use uni_store::storage::manager::StorageManager;
27
28/// How a projection was materialised. Surfaced through
29/// `uni.graph.list` so operators can tell at a glance which
30/// projections came from native label/edge-type scans vs inner Cypher
31/// queries.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ProjectionSourceKind {
34    /// Materialised from native labels + edge types
35    /// (`graphRef = {nodeLabels, edgeTypes, ...}`).
36    Native,
37    /// Materialised from two inner Cypher queries
38    /// (`graphRef = {nodeQuery, edgeQuery, ...}`).
39    Cypher,
40}
41
42impl ProjectionSourceKind {
43    /// Stable string label for diagnostic output (`uni.graph.list`).
44    #[must_use]
45    pub fn as_str(&self) -> &'static str {
46        match self {
47            Self::Native => "Native",
48            Self::Cypher => "Cypher",
49        }
50    }
51}
52
53/// One stored projection plus the bookkeeping `uni.graph.list` needs.
54#[derive(Clone)]
55pub struct ProjectionEntry {
56    /// The materialised projection. `Arc` so callers can take cheap
57    /// clones for repeated algorithm invocations.
58    pub projection: Arc<GraphProjection>,
59    /// Vertex count at materialisation time.
60    pub node_count: usize,
61    /// Edge count at materialisation time.
62    pub edge_count: usize,
63    /// Approximate memory footprint in bytes (advisory — used by a
64    /// future LRU policy that v1 does not implement).
65    pub bytes: usize,
66    /// Wall-clock instant the projection was materialised.
67    pub created_at: SystemTime,
68    /// Where the projection's rows came from.
69    pub source_kind: ProjectionSourceKind,
70}
71
72impl std::fmt::Debug for ProjectionEntry {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("ProjectionEntry")
75            .field("node_count", &self.node_count)
76            .field("edge_count", &self.edge_count)
77            .field("bytes", &self.bytes)
78            .field("source_kind", &self.source_kind)
79            .finish_non_exhaustive()
80    }
81}
82
83/// In-memory cache of named graph projections, keyed by user-chosen
84/// name strings. See module docs for scope and eviction semantics.
85#[derive(Default)]
86pub struct ProjectionStore {
87    entries: RwLock<HashMap<String, ProjectionEntry>>,
88}
89
90impl std::fmt::Debug for ProjectionStore {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        let len = self.entries.read().map(|g| g.len()).unwrap_or(0);
93        f.debug_struct("ProjectionStore")
94            .field("entries", &len)
95            .finish()
96    }
97}
98
99impl ProjectionStore {
100    /// Construct an empty store.
101    #[must_use]
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Insert a projection. Returns `Err` if a projection with the same
107    /// name already exists (caller must `drop` first — the proposal
108    /// rejects implicit replace).
109    ///
110    /// # Errors
111    ///
112    /// Returns the duplicate name as `Err(String)` so the calling
113    /// procedure can surface it through `FnError`.
114    pub fn insert(&self, name: String, entry: ProjectionEntry) -> Result<(), String> {
115        let mut g = self
116            .entries
117            .write()
118            .map_err(|_| "store lock poisoned".to_owned())?;
119        if g.contains_key(&name) {
120            return Err(name);
121        }
122        g.insert(name, entry);
123        Ok(())
124    }
125
126    /// Look up a projection by name.
127    #[must_use]
128    pub fn get(&self, name: &str) -> Option<ProjectionEntry> {
129        self.entries.read().ok()?.get(name).cloned()
130    }
131
132    /// Remove a projection by name. Returns `true` if a projection was
133    /// removed, `false` if no such projection existed.
134    ///
135    /// Named `drop_by_name` (not just `drop`) because `drop` is a
136    /// reserved method-name slot in Rust's `Drop` trait resolution
137    /// and the compiler refuses ambient destructor calls.
138    pub fn drop_by_name(&self, name: &str) -> bool {
139        self.entries
140            .write()
141            .map(|mut g| g.remove(name).is_some())
142            .unwrap_or(false)
143    }
144
145    /// List every stored projection as `(name, entry)` pairs.
146    #[must_use]
147    pub fn list(&self) -> Vec<(String, ProjectionEntry)> {
148        self.entries
149            .read()
150            .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
151            .unwrap_or_default()
152    }
153
154    /// Probe membership without cloning the entry.
155    #[must_use]
156    pub fn contains(&self, name: &str) -> bool {
157        self.entries
158            .read()
159            .map(|g| g.contains_key(name))
160            .unwrap_or(false)
161    }
162}
163
164/// Look up (or create) the [`ProjectionStore`] for the given
165/// `StorageManager`. Uses the backing `Arc<SchemaManager>` pointer
166/// identity as the registry key — callers sharing the same
167/// `schema_manager` Arc (e.g. a pinned transaction and the live
168/// session) see the same store, while a fork (which holds a distinct
169/// `schema_manager`) gets an isolated store.
170///
171/// The registry leaks one entry per distinct `schema_manager` over
172/// the process lifetime, which is bounded (a typical embedded use
173/// constructs one or two `Uni` instances per process).
174pub fn for_storage(storage: &Arc<StorageManager>) -> Arc<ProjectionStore> {
175    static REGISTRY: OnceLock<Mutex<HashMap<usize, Arc<ProjectionStore>>>> = OnceLock::new();
176    // Key on the `schema_manager` Arc identity, not the `StorageManager` Arc:
177    // a pinned transaction and the live session clone the *same*
178    // `schema_manager` Arc (so they must share the projection store), while a
179    // fork holds a distinct one (so it stays isolated).
180    let key = Arc::as_ptr(storage.schema_manager_arc_ref()) as usize;
181    let reg = REGISTRY.get_or_init(|| Mutex::new(HashMap::new()));
182    let mut g = match reg.lock() {
183        Ok(g) => g,
184        Err(p) => p.into_inner(),
185    };
186    g.entry(key)
187        .or_insert_with(|| Arc::new(ProjectionStore::new()))
188        .clone()
189}
190
191/// Best-effort byte-size estimate for a [`GraphProjection`]. Used by
192/// [`ProjectionEntry::bytes`] — informational only (a future LRU
193/// policy will consult it).
194#[must_use]
195pub fn estimate_bytes(p: &GraphProjection) -> usize {
196    use std::mem::size_of;
197    let v = p.vertex_count();
198    // Approximate: CSR offsets (V+1)*4 each (out + in) + neighbors
199    // count*4 each. We don't have direct accessors for the edge
200    // count, but `out_offsets[V]` would tell us; for v1 estimate
201    // very loosely as 32 bytes per vertex.
202    v * 32 + size_of::<GraphProjection>()
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use std::time::SystemTime;
209
210    fn empty_entry() -> ProjectionEntry {
211        ProjectionEntry {
212            projection: Arc::new(GraphProjection::from_rows(&[], &[], None, false).unwrap()),
213            node_count: 0,
214            edge_count: 0,
215            bytes: 0,
216            created_at: SystemTime::now(),
217            source_kind: ProjectionSourceKind::Native,
218        }
219    }
220
221    #[test]
222    fn insert_get_drop_round_trip() {
223        let s = ProjectionStore::new();
224        s.insert("g".to_owned(), empty_entry()).unwrap();
225        assert!(s.contains("g"));
226        assert!(s.get("g").is_some());
227        assert!(s.drop_by_name("g"));
228        assert!(!s.contains("g"));
229        assert!(!s.drop_by_name("g"));
230    }
231
232    #[test]
233    fn duplicate_insert_rejected() {
234        let s = ProjectionStore::new();
235        s.insert("g".to_owned(), empty_entry()).unwrap();
236        let err = s.insert("g".to_owned(), empty_entry()).unwrap_err();
237        assert_eq!(err, "g");
238    }
239
240    #[test]
241    fn list_returns_all_entries() {
242        let s = ProjectionStore::new();
243        s.insert("a".to_owned(), empty_entry()).unwrap();
244        s.insert("b".to_owned(), empty_entry()).unwrap();
245        let l = s.list();
246        assert_eq!(l.len(), 2);
247        let names: Vec<&str> = l.iter().map(|(n, _)| n.as_str()).collect();
248        assert!(names.contains(&"a"));
249        assert!(names.contains(&"b"));
250    }
251}