Skip to main content

omnigraph_server/
registry.rs

1//! `GraphRegistry` — the multi-graph routing substrate (MR-668).
2//!
3//! Holds the open `Arc<GraphHandle>` for every graph the server is currently
4//! serving. Lock-free reads via `ArcSwap<RegistrySnapshot>`; mutations
5//! serialize through `mutate: Mutex<()>` for read-modify-write atomicity.
6//!
7//! **Deletion is deferred** in v0.6.0 (MR-668 scope cut). The registry has
8//! no `tombstones` field, no `RegistryLookup::Tombstoned` variant, no
9//! `tombstone()` / `clear_tombstone()` methods. When `DELETE /graphs/{id}`
10//! lands in a follow-up release, those return without breaking caller
11//! signatures (`Gone` is the closest semantic — the graph is no longer
12//! in the registry).
13//!
14//! Engine instance survival across registry mutations:
15//! a request that grabbed `Arc<GraphHandle>` before a registry swap keeps
16//! the engine alive via its own `Arc` clone (see `server_export` at
17//! `lib.rs:1019-1033` for the spawn-and-clone pattern). The engine drops
18//! when the last `Arc<Omnigraph>` clone drops, regardless of the
19//! registry's current state.
20
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use arc_swap::ArcSwap;
25use omnigraph::db::Omnigraph;
26use omnigraph::storage::normalize_root_uri;
27#[cfg(test)]
28use tokio::sync::Mutex;
29
30use crate::identity::GraphKey;
31use crate::policy::PolicyEngine;
32use crate::queries::QueryRegistry;
33
34/// Open handle for a single graph in the registry. Cheap to clone (`Arc`-wrapped
35/// engine + policy). Cluster-mode handlers extract this via
36/// `Extension<Arc<GraphHandle>>` injected by the routing middleware.
37pub struct GraphHandle {
38    /// Registry key. In Cluster mode `key.tenant_id` is always `None`.
39    pub key: GraphKey,
40    /// The URI the engine was opened from (`s3://...` or local path).
41    /// Stable for the engine's lifetime; surfaced in responses like
42    /// `BranchCreateOutput.uri`.
43    pub uri: String,
44    /// Engine. Reads/writes go directly through `&self` methods on
45    /// `Omnigraph` (no `RwLock` — MR-686 preserved).
46    pub engine: Arc<Omnigraph>,
47    /// Per-graph Cedar policy. `None` means "no policy gate on engine-layer
48    /// `_as` writers"; the HTTP-layer `require_bearer_auth` middleware still
49    /// runs regardless.
50    pub policy: Option<Arc<PolicyEngine>>,
51    /// Per-graph stored-query registry, loaded and validated at
52    /// startup. `None` means the operator declared no stored queries for
53    /// this graph — `POST /queries/{name}` then 404s. Mirrors the
54    /// optional `policy` shape.
55    pub queries: Option<Arc<QueryRegistry>>,
56}
57
58/// Immutable snapshot of the registry's current state. Replaced atomically
59/// via `ArcSwap`; readers see a consistent view of all graphs without locking.
60///
61/// Derived state (`any_per_graph_policy`) is computed at snapshot
62/// construction so request-time middleware doesn't have to walk the
63/// graph map every call. Construct only via [`RegistrySnapshot::new`]
64/// (or `Default`) so the field stays in sync with `graphs`.
65pub struct RegistrySnapshot {
66    pub graphs: HashMap<GraphKey, Arc<GraphHandle>>,
67    /// `true` iff any registered graph has a per-graph policy installed.
68    /// Used by `AppState::requires_bearer_auth` to decide whether the
69    /// auth middleware should challenge a request — a per-graph policy
70    /// implies bearer auth is required even when no server-level tokens
71    /// or policy are configured.
72    pub any_per_graph_policy: bool,
73}
74
75impl RegistrySnapshot {
76    /// Build a snapshot from a graph map, deriving cached fields.
77    /// The only construction path — direct struct-literal use elsewhere
78    /// would let derived state drift from `graphs`.
79    pub fn new(graphs: HashMap<GraphKey, Arc<GraphHandle>>) -> Self {
80        let any_per_graph_policy = graphs.values().any(|h| h.policy.is_some());
81        Self {
82            graphs,
83            any_per_graph_policy,
84        }
85    }
86}
87
88impl Default for RegistrySnapshot {
89    fn default() -> Self {
90        Self::new(HashMap::new())
91    }
92}
93
94/// Result of a registry lookup. Two-valued — `Tombstoned` deferred with DELETE.
95pub enum RegistryLookup {
96    /// Graph is open and ready to serve.
97    Ready(Arc<GraphHandle>),
98    /// Graph is not in the registry (never existed, or was unregistered in a
99    /// future release). Handlers respond with 404.
100    Gone,
101}
102
103/// Why an `insert` was rejected.
104#[derive(Debug, thiserror::Error)]
105pub enum InsertError {
106    /// Another handle already exists for this `GraphKey`. Maps to HTTP 409.
107    #[error("graph '{0}' is already registered")]
108    DuplicateKey(GraphKey),
109    /// Another handle is open against this URI. Two graphs sharing a URI
110    /// would commit through the same Lance manifest and corrupt each other.
111    /// Maps to HTTP 409.
112    #[error("URI '{0}' is already registered as another graph")]
113    DuplicateUri(String),
114    /// A handle carried an invalid graph URI. Maps to startup failure.
115    #[error("URI '{uri}' is invalid: {message}")]
116    InvalidUri { uri: String, message: String },
117}
118
119pub struct GraphRegistry {
120    snapshot: ArcSwap<RegistrySnapshot>,
121    /// Serializes runtime mutations through [`GraphRegistry::insert`].
122    /// Gated with `insert` because they share a single contract — if
123    /// the consumer goes away, so does the lock. Re-introducing one
124    /// requires re-introducing the other.
125    #[cfg(test)]
126    mutate: Mutex<()>,
127}
128
129impl GraphRegistry {
130    /// Empty registry. Used as a placeholder before startup populates it.
131    pub fn new() -> Self {
132        Self {
133            snapshot: ArcSwap::from_pointee(RegistrySnapshot::default()),
134            #[cfg(test)]
135            mutate: Mutex::new(()),
136        }
137    }
138
139    /// Build a registry from a startup-time list of open handles.
140    /// Rejects duplicate `GraphKey`s and duplicate URIs.
141    pub fn from_handles(handles: Vec<Arc<GraphHandle>>) -> Result<Self, InsertError> {
142        let mut graphs: HashMap<GraphKey, Arc<GraphHandle>> = HashMap::with_capacity(handles.len());
143        let mut seen_uris: HashMap<String, GraphKey> = HashMap::with_capacity(handles.len());
144        for handle in handles {
145            let (canonical_uri, handle) = canonicalize_handle_uri(handle)?;
146            if graphs.contains_key(&handle.key) {
147                return Err(InsertError::DuplicateKey(handle.key.clone()));
148            }
149            if seen_uris.contains_key(&canonical_uri) {
150                return Err(InsertError::DuplicateUri(handle.uri.clone()));
151            }
152            seen_uris.insert(canonical_uri, handle.key.clone());
153            graphs.insert(handle.key.clone(), handle);
154        }
155        Ok(Self {
156            snapshot: ArcSwap::from_pointee(RegistrySnapshot::new(graphs)),
157            #[cfg(test)]
158            mutate: Mutex::new(()),
159        })
160    }
161
162    /// Lock-free snapshot read. Callers that need derived state cached
163    /// on the snapshot (e.g. `any_per_graph_policy`) go through here;
164    /// callers that only need values of `graphs` should use [`list`]
165    /// or [`get`].
166    pub fn snapshot_ref(&self) -> arc_swap::Guard<Arc<RegistrySnapshot>> {
167        self.snapshot.load()
168    }
169
170    /// Lock-free read. Returns `Ready` if the graph is in the current snapshot,
171    /// `Gone` otherwise.
172    pub fn get(&self, key: &GraphKey) -> RegistryLookup {
173        let snapshot = self.snapshot.load();
174        match snapshot.graphs.get(key) {
175            Some(handle) => RegistryLookup::Ready(Arc::clone(handle)),
176            None => RegistryLookup::Gone,
177        }
178    }
179
180    /// Snapshot the full set of currently-registered handles. Ordering
181    /// matches the underlying `HashMap` iteration (intentionally
182    /// non-deterministic — callers that need a stable order sort by
183    /// `handle.key.graph_id`).
184    pub fn list(&self) -> Vec<Arc<GraphHandle>> {
185        let snapshot = self.snapshot.load();
186        snapshot.graphs.values().cloned().collect()
187    }
188
189    /// Number of registered graphs (excluding any future tombstones).
190    pub fn len(&self) -> usize {
191        self.snapshot.load().graphs.len()
192    }
193
194    pub fn is_empty(&self) -> bool {
195        self.len() == 0
196    }
197
198    /// Add a new handle. Async because the mutex is `tokio::sync::Mutex`
199    /// (a future managed-catalog flow may hold it across `.await` points
200    /// during atomic registry mutations). Rejects duplicate `GraphKey`
201    /// and duplicate `uri`.
202    ///
203    /// **Test-only surface.** No production code reaches this — startup
204    /// uses `from_handles`, and runtime add/remove is deferred. The
205    /// race-contract tests below pin the mutex linearization point so
206    /// that when a real consumer ships (managed cluster catalog), the
207    /// concurrency contract is already proven. Ungate by removing
208    /// `#[cfg(test)]` once that consumer is in scope.
209    ///
210    /// Race semantics (pinned by `concurrent_insert_same_key_exactly_one_succeeds`):
211    /// under N concurrent calls with the same key, exactly one returns
212    /// `Ok(())` and the rest return `Err(InsertError::DuplicateKey(_))`.
213    #[cfg(test)]
214    pub async fn insert(&self, handle: Arc<GraphHandle>) -> Result<(), InsertError> {
215        let _guard = self.mutate.lock().await;
216        let current = self.snapshot.load();
217        let (canonical_uri, handle) = canonicalize_handle_uri(handle)?;
218        if current.graphs.contains_key(&handle.key) {
219            return Err(InsertError::DuplicateKey(handle.key.clone()));
220        }
221        for existing in current.graphs.values() {
222            let existing_uri =
223                normalize_root_uri(&existing.uri).map_err(|err| InsertError::InvalidUri {
224                    uri: existing.uri.clone(),
225                    message: err.to_string(),
226                })?;
227            if existing_uri == canonical_uri {
228                return Err(InsertError::DuplicateUri(handle.uri.clone()));
229            }
230        }
231        let mut new_graphs = current.graphs.clone();
232        new_graphs.insert(handle.key.clone(), handle);
233        self.snapshot
234            .store(Arc::new(RegistrySnapshot::new(new_graphs)));
235        Ok(())
236    }
237}
238
239fn canonicalize_handle_uri(
240    handle: Arc<GraphHandle>,
241) -> Result<(String, Arc<GraphHandle>), InsertError> {
242    let canonical_uri = normalize_root_uri(&handle.uri).map_err(|err| InsertError::InvalidUri {
243        uri: handle.uri.clone(),
244        message: err.to_string(),
245    })?;
246    if canonical_uri == handle.uri {
247        return Ok((canonical_uri, handle));
248    }
249    let canonical_handle = Arc::new(GraphHandle {
250        key: handle.key.clone(),
251        uri: canonical_uri.clone(),
252        engine: Arc::clone(&handle.engine),
253        policy: handle.policy.clone(),
254        queries: handle.queries.clone(),
255    });
256    Ok((canonical_uri, canonical_handle))
257}
258
259impl Default for GraphRegistry {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use std::path::Path;
268
269    use tempfile::TempDir;
270
271    use super::*;
272    use crate::graph_id::GraphId;
273
274    const TEST_SCHEMA: &str = "node Person { name: String @key }\n";
275
276    async fn build_handle(graph_id: &str, dir: &Path) -> Arc<GraphHandle> {
277        let graph_uri = dir.join(graph_id).to_str().unwrap().to_string();
278        let engine = Omnigraph::init(&graph_uri, TEST_SCHEMA)
279            .await
280            .expect("init engine for registry test");
281        Arc::new(GraphHandle {
282            key: GraphKey::cluster(GraphId::try_from(graph_id).unwrap()),
283            uri: graph_uri,
284            engine: Arc::new(engine),
285            policy: None,
286            queries: None,
287        })
288    }
289
290    #[tokio::test]
291    async fn new_registry_is_empty() {
292        let registry = GraphRegistry::new();
293        assert!(registry.is_empty());
294        assert_eq!(registry.len(), 0);
295        assert!(registry.list().is_empty());
296    }
297
298    #[tokio::test]
299    async fn insert_then_get_returns_ready() {
300        let dir = TempDir::new().unwrap();
301        let registry = GraphRegistry::new();
302        let handle = build_handle("alpha", dir.path()).await;
303        registry.insert(Arc::clone(&handle)).await.unwrap();
304
305        match registry.get(&handle.key) {
306            RegistryLookup::Ready(found) => {
307                assert!(Arc::ptr_eq(&found, &handle));
308            }
309            RegistryLookup::Gone => panic!("expected Ready, got Gone"),
310        }
311    }
312
313    #[tokio::test]
314    async fn get_nonexistent_returns_gone() {
315        let registry = GraphRegistry::new();
316        let key = GraphKey::cluster(GraphId::try_from("ghost").unwrap());
317        match registry.get(&key) {
318            RegistryLookup::Gone => {}
319            RegistryLookup::Ready(_) => panic!("expected Gone"),
320        }
321    }
322
323    #[tokio::test]
324    async fn insert_duplicate_key_returns_error() {
325        let dir = TempDir::new().unwrap();
326        let registry = GraphRegistry::new();
327        let h1 = build_handle("alpha", dir.path()).await;
328        // Same key, different URI sub-path (build_handle uses graph_id as subdir).
329        let dir2 = TempDir::new().unwrap();
330        let h2 = build_handle("alpha", dir2.path()).await;
331        registry.insert(h1).await.unwrap();
332
333        match registry.insert(h2).await {
334            Err(InsertError::DuplicateKey(_)) => {}
335            other => panic!("expected DuplicateKey, got {other:?}"),
336        }
337    }
338
339    #[tokio::test]
340    async fn insert_duplicate_uri_returns_error() {
341        let dir = TempDir::new().unwrap();
342        // Two handles with the same URI but different keys.
343        let shared_uri = dir.path().join("shared").to_str().unwrap().to_string();
344        let engine = Omnigraph::init(&shared_uri, TEST_SCHEMA).await.unwrap();
345        let engine = Arc::new(engine);
346        let h1 = Arc::new(GraphHandle {
347            key: GraphKey::cluster(GraphId::try_from("alpha").unwrap()),
348            uri: shared_uri.clone(),
349            engine: Arc::clone(&engine),
350            policy: None,
351            queries: None,
352        });
353        let h2 = Arc::new(GraphHandle {
354            key: GraphKey::cluster(GraphId::try_from("beta").unwrap()),
355            uri: shared_uri,
356            engine,
357            policy: None,
358            queries: None,
359        });
360
361        let registry = GraphRegistry::new();
362        registry.insert(h1).await.unwrap();
363        match registry.insert(h2).await {
364            Err(InsertError::DuplicateUri(_)) => {}
365            other => panic!("expected DuplicateUri, got {other:?}"),
366        }
367    }
368
369    #[tokio::test]
370    async fn list_returns_all_inserted_handles() {
371        let dir = TempDir::new().unwrap();
372        let registry = GraphRegistry::new();
373        for name in ["alpha", "beta", "gamma"] {
374            let h = build_handle(name, dir.path()).await;
375            registry.insert(h).await.unwrap();
376        }
377        assert_eq!(registry.len(), 3);
378        let mut ids: Vec<_> = registry
379            .list()
380            .into_iter()
381            .map(|h| h.key.graph_id.as_str().to_string())
382            .collect();
383        ids.sort();
384        assert_eq!(ids, vec!["alpha", "beta", "gamma"]);
385    }
386
387    #[tokio::test]
388    async fn from_handles_bulk_init_succeeds() {
389        let dir = TempDir::new().unwrap();
390        let handles = vec![
391            build_handle("alpha", dir.path()).await,
392            build_handle("beta", dir.path()).await,
393        ];
394        let registry = GraphRegistry::from_handles(handles).unwrap();
395        assert_eq!(registry.len(), 2);
396    }
397
398    #[tokio::test]
399    async fn from_handles_rejects_duplicate_keys() {
400        let dir1 = TempDir::new().unwrap();
401        let dir2 = TempDir::new().unwrap();
402        let h1 = build_handle("alpha", dir1.path()).await;
403        let h2 = build_handle("alpha", dir2.path()).await;
404        let err = match GraphRegistry::from_handles(vec![h1, h2]) {
405            Ok(_) => panic!("expected DuplicateKey, got Ok"),
406            Err(err) => err,
407        };
408        assert!(
409            matches!(err, InsertError::DuplicateKey(_)),
410            "expected DuplicateKey, got {err}",
411        );
412    }
413
414    #[tokio::test]
415    async fn from_handles_rejects_duplicate_uris() {
416        let dir = TempDir::new().unwrap();
417        let shared_uri = dir.path().join("shared").to_str().unwrap().to_string();
418        let engine = Arc::new(Omnigraph::init(&shared_uri, TEST_SCHEMA).await.unwrap());
419        let h1 = Arc::new(GraphHandle {
420            key: GraphKey::cluster(GraphId::try_from("alpha").unwrap()),
421            uri: shared_uri.clone(),
422            engine: Arc::clone(&engine),
423            policy: None,
424            queries: None,
425        });
426        let h2 = Arc::new(GraphHandle {
427            key: GraphKey::cluster(GraphId::try_from("beta").unwrap()),
428            uri: shared_uri,
429            engine,
430            policy: None,
431            queries: None,
432        });
433        let err = match GraphRegistry::from_handles(vec![h1, h2]) {
434            Ok(_) => panic!("expected DuplicateUri, got Ok"),
435            Err(err) => err,
436        };
437        assert!(
438            matches!(err, InsertError::DuplicateUri(_)),
439            "expected DuplicateUri, got {err}",
440        );
441    }
442
443    /// Race test modeled on `actor_admission_race_does_not_exceed_cap`
444    /// at `tests/server.rs:3596+`. Spawn N concurrent inserts with the
445    /// same `GraphKey` (each constructing its own `GraphHandle` against
446    /// its own tempdir). Exactly one must succeed; the others must
447    /// return `DuplicateKey`. No `unwrap` panic: the `Mutex<()>` +
448    /// in-mutex re-check is the linearization point.
449    #[tokio::test(flavor = "multi_thread")]
450    async fn concurrent_insert_same_key_exactly_one_succeeds() {
451        const N: usize = 8;
452
453        let registry = Arc::new(GraphRegistry::new());
454        // Pre-create N handles (each in its own tempdir; same key).
455        let mut handles = Vec::with_capacity(N);
456        let mut dirs = Vec::with_capacity(N);
457        for _ in 0..N {
458            let d = TempDir::new().unwrap();
459            handles.push(build_handle("contested", d.path()).await);
460            dirs.push(d);
461        }
462
463        let barrier = Arc::new(tokio::sync::Barrier::new(N));
464        let mut tasks = Vec::with_capacity(N);
465        for handle in handles {
466            let registry = Arc::clone(&registry);
467            let barrier = Arc::clone(&barrier);
468            tasks.push(tokio::spawn(async move {
469                barrier.wait().await;
470                registry.insert(handle).await
471            }));
472        }
473
474        let mut ok_count = 0usize;
475        let mut dup_count = 0usize;
476        for t in tasks {
477            match t.await.unwrap() {
478                Ok(()) => ok_count += 1,
479                Err(InsertError::DuplicateKey(_)) => dup_count += 1,
480                Err(other) => panic!("unexpected error: {other:?}"),
481            }
482        }
483        assert_eq!(ok_count, 1, "exactly one insert must succeed");
484        assert_eq!(dup_count, N - 1, "the rest must return DuplicateKey");
485        assert_eq!(registry.len(), 1);
486
487        // Drop the dirs at the end (preserves engines until tasks finish).
488        drop(dirs);
489    }
490
491    /// Concurrent inserts with **distinct** keys all succeed.
492    /// Linearizability over the mutex still serializes them.
493    #[tokio::test(flavor = "multi_thread")]
494    async fn concurrent_insert_distinct_keys_all_succeed() {
495        const N: usize = 8;
496
497        let registry = Arc::new(GraphRegistry::new());
498        // Pre-create N handles with distinct ids, each in its own tempdir.
499        let mut handles = Vec::with_capacity(N);
500        let mut dirs = Vec::with_capacity(N);
501        for i in 0..N {
502            let d = TempDir::new().unwrap();
503            handles.push(build_handle(&format!("graph-{i}"), d.path()).await);
504            dirs.push(d);
505        }
506
507        let barrier = Arc::new(tokio::sync::Barrier::new(N));
508        let mut tasks = Vec::with_capacity(N);
509        for handle in handles {
510            let registry = Arc::clone(&registry);
511            let barrier = Arc::clone(&barrier);
512            tasks.push(tokio::spawn(async move {
513                barrier.wait().await;
514                registry.insert(handle).await
515            }));
516        }
517        for t in tasks {
518            t.await.unwrap().unwrap();
519        }
520        assert_eq!(registry.len(), N);
521        drop(dirs);
522    }
523
524    /// Concurrent reads during a write must always see a consistent
525    /// snapshot (no torn state). With `ArcSwap`, the read either sees
526    /// the old snapshot or the new one — never both, never neither.
527    #[tokio::test(flavor = "multi_thread")]
528    async fn concurrent_reads_during_inserts_see_consistent_snapshots() {
529        let dir = TempDir::new().unwrap();
530        let registry = Arc::new(GraphRegistry::new());
531
532        // Spawn a writer that inserts graph-0..graph-9 sequentially.
533        const N_WRITES: usize = 10;
534        let writer_registry = Arc::clone(&registry);
535        let writer_dir = dir.path().to_path_buf();
536        let writer = tokio::spawn(async move {
537            for i in 0..N_WRITES {
538                let h = build_handle(&format!("graph-{i}"), &writer_dir).await;
539                writer_registry.insert(h).await.unwrap();
540            }
541        });
542
543        // Reader loop: repeatedly snapshot the registry until the writer
544        // finishes. Every snapshot's len must be in [0, N_WRITES], and
545        // for every key g in the snapshot, get(g) must return Ready.
546        let reader_registry = Arc::clone(&registry);
547        let reader = tokio::spawn(async move {
548            for _ in 0..200 {
549                let snap = reader_registry.list();
550                assert!(snap.len() <= N_WRITES);
551                for handle in &snap {
552                    match reader_registry.get(&handle.key) {
553                        RegistryLookup::Ready(found) => {
554                            assert!(Arc::ptr_eq(&found, handle));
555                        }
556                        RegistryLookup::Gone => panic!(
557                            "snapshot listed key {} but get() returned Gone",
558                            handle.key.graph_id
559                        ),
560                    }
561                }
562                tokio::task::yield_now().await;
563            }
564        });
565
566        writer.await.unwrap();
567        reader.await.unwrap();
568        assert_eq!(registry.len(), N_WRITES);
569    }
570}