uni_query/projection_store.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Per-`StorageManager` cache of materialised graph projections.
5//!
6//! Backs the `uni.graph.{project, drop, list, exists}` procedure family
7//! and the V2 `(graphRef, config)` `Named` projection variant. Each
8//! [`ProjectionStore`] is scoped to a `StorageManager` instance — that
9//! is the closest available proxy for "this database" in the current
10//! architecture (the proposal calls for a per-`Database`
11//! [`ProjectionStore`]; uni-db's `Database` type hangs off
12//! `StorageManager`, so we use the manager's Arc pointer identity as
13//! the cache key in the process-global registry below).
14//!
15//! v1 caveats (per proposal §4.10.3): in-memory only (not persisted),
16//! no eviction policy other than `drop`, no LRU. The `bytes` field on
17//! [`ProjectionEntry`] exists for a future LRU policy. Staleness is
18//! explicit — a projection is frozen at materialisation time;
19//! recomputing it requires a `drop` + `project` cycle.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23use std::time::SystemTime;
24
25use uni_algo::algo::GraphProjection;
26use uni_store::storage::manager::StorageManager;
27
28/// How a projection was materialised. Surfaced through
29/// `uni.graph.list` so operators can tell at a glance which
30/// projections came from native label/edge-type scans vs inner Cypher
31/// queries.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ProjectionSourceKind {
34 /// Materialised from native labels + edge types
35 /// (`graphRef = {nodeLabels, edgeTypes, ...}`).
36 Native,
37 /// Materialised from two inner Cypher queries
38 /// (`graphRef = {nodeQuery, edgeQuery, ...}`).
39 Cypher,
40}
41
42impl ProjectionSourceKind {
43 /// Stable string label for diagnostic output (`uni.graph.list`).
44 #[must_use]
45 pub fn as_str(&self) -> &'static str {
46 match self {
47 Self::Native => "Native",
48 Self::Cypher => "Cypher",
49 }
50 }
51}
52
53/// One stored projection plus the bookkeeping `uni.graph.list` needs.
54#[derive(Clone)]
55pub struct ProjectionEntry {
56 /// The materialised projection. `Arc` so callers can take cheap
57 /// clones for repeated algorithm invocations.
58 pub projection: Arc<GraphProjection>,
59 /// Vertex count at materialisation time.
60 pub node_count: usize,
61 /// Edge count at materialisation time.
62 pub edge_count: usize,
63 /// Approximate memory footprint in bytes (advisory — used by a
64 /// future LRU policy that v1 does not implement).
65 pub bytes: usize,
66 /// Wall-clock instant the projection was materialised.
67 pub created_at: SystemTime,
68 /// Where the projection's rows came from.
69 pub source_kind: ProjectionSourceKind,
70}
71
72impl std::fmt::Debug for ProjectionEntry {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("ProjectionEntry")
75 .field("node_count", &self.node_count)
76 .field("edge_count", &self.edge_count)
77 .field("bytes", &self.bytes)
78 .field("source_kind", &self.source_kind)
79 .finish_non_exhaustive()
80 }
81}
82
83/// In-memory cache of named graph projections, keyed by user-chosen
84/// name strings. See module docs for scope and eviction semantics.
85#[derive(Default)]
86pub struct ProjectionStore {
87 entries: RwLock<HashMap<String, ProjectionEntry>>,
88}
89
90impl std::fmt::Debug for ProjectionStore {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 let len = self.entries.read().map(|g| g.len()).unwrap_or(0);
93 f.debug_struct("ProjectionStore")
94 .field("entries", &len)
95 .finish()
96 }
97}
98
99impl ProjectionStore {
100 /// Construct an empty store.
101 #[must_use]
102 pub fn new() -> Self {
103 Self::default()
104 }
105
106 /// Insert a projection. Returns `Err` if a projection with the same
107 /// name already exists (caller must `drop` first — the proposal
108 /// rejects implicit replace).
109 ///
110 /// # Errors
111 ///
112 /// Returns the duplicate name as `Err(String)` so the calling
113 /// procedure can surface it through `FnError`.
114 pub fn insert(&self, name: String, entry: ProjectionEntry) -> Result<(), String> {
115 let mut g = self
116 .entries
117 .write()
118 .map_err(|_| "store lock poisoned".to_owned())?;
119 if g.contains_key(&name) {
120 return Err(name);
121 }
122 g.insert(name, entry);
123 Ok(())
124 }
125
126 /// Look up a projection by name.
127 #[must_use]
128 pub fn get(&self, name: &str) -> Option<ProjectionEntry> {
129 self.entries.read().ok()?.get(name).cloned()
130 }
131
132 /// Remove a projection by name. Returns `true` if a projection was
133 /// removed, `false` if no such projection existed.
134 ///
135 /// Named `drop_by_name` (not just `drop`) because `drop` is a
136 /// reserved method-name slot in Rust's `Drop` trait resolution
137 /// and the compiler refuses ambient destructor calls.
138 pub fn drop_by_name(&self, name: &str) -> bool {
139 self.entries
140 .write()
141 .map(|mut g| g.remove(name).is_some())
142 .unwrap_or(false)
143 }
144
145 /// List every stored projection as `(name, entry)` pairs.
146 #[must_use]
147 pub fn list(&self) -> Vec<(String, ProjectionEntry)> {
148 self.entries
149 .read()
150 .map(|g| g.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
151 .unwrap_or_default()
152 }
153
154 /// Probe membership without cloning the entry.
155 #[must_use]
156 pub fn contains(&self, name: &str) -> bool {
157 self.entries
158 .read()
159 .map(|g| g.contains_key(name))
160 .unwrap_or(false)
161 }
162}
163
164/// Look up (or create) the [`ProjectionStore`] for the given
165/// `StorageManager`. Uses the backing `Arc<SchemaManager>` pointer
166/// identity as the registry key — callers sharing the same
167/// `schema_manager` Arc (e.g. a pinned transaction and the live
168/// session) see the same store, while a fork (which holds a distinct
169/// `schema_manager`) gets an isolated store.
170///
171/// The registry leaks one entry per distinct `schema_manager` over
172/// the process lifetime, which is bounded (a typical embedded use
173/// constructs one or two `Uni` instances per process).
174pub fn for_storage(storage: &Arc<StorageManager>) -> Arc<ProjectionStore> {
175 static REGISTRY: OnceLock<Mutex<HashMap<usize, Arc<ProjectionStore>>>> = OnceLock::new();
176 // Key on the `schema_manager` Arc identity, not the `StorageManager` Arc:
177 // a pinned transaction and the live session clone the *same*
178 // `schema_manager` Arc (so they must share the projection store), while a
179 // fork holds a distinct one (so it stays isolated).
180 let key = Arc::as_ptr(storage.schema_manager_arc_ref()) as usize;
181 let reg = REGISTRY.get_or_init(|| Mutex::new(HashMap::new()));
182 let mut g = match reg.lock() {
183 Ok(g) => g,
184 Err(p) => p.into_inner(),
185 };
186 g.entry(key)
187 .or_insert_with(|| Arc::new(ProjectionStore::new()))
188 .clone()
189}
190
191/// Best-effort byte-size estimate for a [`GraphProjection`]. Used by
192/// [`ProjectionEntry::bytes`] — informational only (a future LRU
193/// policy will consult it).
194#[must_use]
195pub fn estimate_bytes(p: &GraphProjection) -> usize {
196 use std::mem::size_of;
197 let v = p.vertex_count();
198 // Approximate: CSR offsets (V+1)*4 each (out + in) + neighbors
199 // count*4 each. We don't have direct accessors for the edge
200 // count, but `out_offsets[V]` would tell us; for v1 estimate
201 // very loosely as 32 bytes per vertex.
202 v * 32 + size_of::<GraphProjection>()
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use std::time::SystemTime;
209
210 fn empty_entry() -> ProjectionEntry {
211 ProjectionEntry {
212 projection: Arc::new(GraphProjection::from_rows(&[], &[], None, false).unwrap()),
213 node_count: 0,
214 edge_count: 0,
215 bytes: 0,
216 created_at: SystemTime::now(),
217 source_kind: ProjectionSourceKind::Native,
218 }
219 }
220
221 #[test]
222 fn insert_get_drop_round_trip() {
223 let s = ProjectionStore::new();
224 s.insert("g".to_owned(), empty_entry()).unwrap();
225 assert!(s.contains("g"));
226 assert!(s.get("g").is_some());
227 assert!(s.drop_by_name("g"));
228 assert!(!s.contains("g"));
229 assert!(!s.drop_by_name("g"));
230 }
231
232 #[test]
233 fn duplicate_insert_rejected() {
234 let s = ProjectionStore::new();
235 s.insert("g".to_owned(), empty_entry()).unwrap();
236 let err = s.insert("g".to_owned(), empty_entry()).unwrap_err();
237 assert_eq!(err, "g");
238 }
239
240 #[test]
241 fn list_returns_all_entries() {
242 let s = ProjectionStore::new();
243 s.insert("a".to_owned(), empty_entry()).unwrap();
244 s.insert("b".to_owned(), empty_entry()).unwrap();
245 let l = s.list();
246 assert_eq!(l.len(), 2);
247 let names: Vec<&str> = l.iter().map(|(n, _)| n.as_str()).collect();
248 assert!(names.contains(&"a"));
249 assert!(names.contains(&"b"));
250 }
251}