1use std::collections::HashMap;
22use std::sync::Arc;
23
24use arc_swap::ArcSwap;
25use omnigraph::db::Omnigraph;
26use omnigraph::storage::normalize_root_uri;
27#[cfg(test)]
28use tokio::sync::Mutex;
29
30use crate::identity::GraphKey;
31use crate::policy::PolicyEngine;
32use crate::queries::QueryRegistry;
33
34pub struct GraphHandle {
38 pub key: GraphKey,
40 pub uri: String,
44 pub engine: Arc<Omnigraph>,
47 pub policy: Option<Arc<PolicyEngine>>,
51 pub queries: Option<Arc<QueryRegistry>>,
56}
57
58pub struct RegistrySnapshot {
66 pub graphs: HashMap<GraphKey, Arc<GraphHandle>>,
67 pub any_per_graph_policy: bool,
73}
74
75impl RegistrySnapshot {
76 pub fn new(graphs: HashMap<GraphKey, Arc<GraphHandle>>) -> Self {
80 let any_per_graph_policy = graphs.values().any(|h| h.policy.is_some());
81 Self {
82 graphs,
83 any_per_graph_policy,
84 }
85 }
86}
87
88impl Default for RegistrySnapshot {
89 fn default() -> Self {
90 Self::new(HashMap::new())
91 }
92}
93
94pub enum RegistryLookup {
96 Ready(Arc<GraphHandle>),
98 Gone,
101}
102
103#[derive(Debug, thiserror::Error)]
105pub enum InsertError {
106 #[error("graph '{0}' is already registered")]
108 DuplicateKey(GraphKey),
109 #[error("URI '{0}' is already registered as another graph")]
113 DuplicateUri(String),
114 #[error("URI '{uri}' is invalid: {message}")]
116 InvalidUri { uri: String, message: String },
117}
118
119pub struct GraphRegistry {
120 snapshot: ArcSwap<RegistrySnapshot>,
121 #[cfg(test)]
126 mutate: Mutex<()>,
127}
128
129impl GraphRegistry {
130 pub fn new() -> Self {
132 Self {
133 snapshot: ArcSwap::from_pointee(RegistrySnapshot::default()),
134 #[cfg(test)]
135 mutate: Mutex::new(()),
136 }
137 }
138
139 pub fn from_handles(handles: Vec<Arc<GraphHandle>>) -> Result<Self, InsertError> {
142 let mut graphs: HashMap<GraphKey, Arc<GraphHandle>> = HashMap::with_capacity(handles.len());
143 let mut seen_uris: HashMap<String, GraphKey> = HashMap::with_capacity(handles.len());
144 for handle in handles {
145 let (canonical_uri, handle) = canonicalize_handle_uri(handle)?;
146 if graphs.contains_key(&handle.key) {
147 return Err(InsertError::DuplicateKey(handle.key.clone()));
148 }
149 if seen_uris.contains_key(&canonical_uri) {
150 return Err(InsertError::DuplicateUri(handle.uri.clone()));
151 }
152 seen_uris.insert(canonical_uri, handle.key.clone());
153 graphs.insert(handle.key.clone(), handle);
154 }
155 Ok(Self {
156 snapshot: ArcSwap::from_pointee(RegistrySnapshot::new(graphs)),
157 #[cfg(test)]
158 mutate: Mutex::new(()),
159 })
160 }
161
162 pub fn snapshot_ref(&self) -> arc_swap::Guard<Arc<RegistrySnapshot>> {
167 self.snapshot.load()
168 }
169
170 pub fn get(&self, key: &GraphKey) -> RegistryLookup {
173 let snapshot = self.snapshot.load();
174 match snapshot.graphs.get(key) {
175 Some(handle) => RegistryLookup::Ready(Arc::clone(handle)),
176 None => RegistryLookup::Gone,
177 }
178 }
179
180 pub fn list(&self) -> Vec<Arc<GraphHandle>> {
185 let snapshot = self.snapshot.load();
186 snapshot.graphs.values().cloned().collect()
187 }
188
189 pub fn len(&self) -> usize {
191 self.snapshot.load().graphs.len()
192 }
193
194 pub fn is_empty(&self) -> bool {
195 self.len() == 0
196 }
197
198 #[cfg(test)]
214 pub async fn insert(&self, handle: Arc<GraphHandle>) -> Result<(), InsertError> {
215 let _guard = self.mutate.lock().await;
216 let current = self.snapshot.load();
217 let (canonical_uri, handle) = canonicalize_handle_uri(handle)?;
218 if current.graphs.contains_key(&handle.key) {
219 return Err(InsertError::DuplicateKey(handle.key.clone()));
220 }
221 for existing in current.graphs.values() {
222 let existing_uri =
223 normalize_root_uri(&existing.uri).map_err(|err| InsertError::InvalidUri {
224 uri: existing.uri.clone(),
225 message: err.to_string(),
226 })?;
227 if existing_uri == canonical_uri {
228 return Err(InsertError::DuplicateUri(handle.uri.clone()));
229 }
230 }
231 let mut new_graphs = current.graphs.clone();
232 new_graphs.insert(handle.key.clone(), handle);
233 self.snapshot
234 .store(Arc::new(RegistrySnapshot::new(new_graphs)));
235 Ok(())
236 }
237}
238
239fn canonicalize_handle_uri(
240 handle: Arc<GraphHandle>,
241) -> Result<(String, Arc<GraphHandle>), InsertError> {
242 let canonical_uri = normalize_root_uri(&handle.uri).map_err(|err| InsertError::InvalidUri {
243 uri: handle.uri.clone(),
244 message: err.to_string(),
245 })?;
246 if canonical_uri == handle.uri {
247 return Ok((canonical_uri, handle));
248 }
249 let canonical_handle = Arc::new(GraphHandle {
250 key: handle.key.clone(),
251 uri: canonical_uri.clone(),
252 engine: Arc::clone(&handle.engine),
253 policy: handle.policy.clone(),
254 queries: handle.queries.clone(),
255 });
256 Ok((canonical_uri, canonical_handle))
257}
258
259impl Default for GraphRegistry {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use std::path::Path;
268
269 use tempfile::TempDir;
270
271 use super::*;
272 use crate::graph_id::GraphId;
273
274 const TEST_SCHEMA: &str = "node Person { name: String @key }\n";
275
276 async fn build_handle(graph_id: &str, dir: &Path) -> Arc<GraphHandle> {
277 let graph_uri = dir.join(graph_id).to_str().unwrap().to_string();
278 let engine = Omnigraph::init(&graph_uri, TEST_SCHEMA)
279 .await
280 .expect("init engine for registry test");
281 Arc::new(GraphHandle {
282 key: GraphKey::cluster(GraphId::try_from(graph_id).unwrap()),
283 uri: graph_uri,
284 engine: Arc::new(engine),
285 policy: None,
286 queries: None,
287 })
288 }
289
290 #[tokio::test]
291 async fn new_registry_is_empty() {
292 let registry = GraphRegistry::new();
293 assert!(registry.is_empty());
294 assert_eq!(registry.len(), 0);
295 assert!(registry.list().is_empty());
296 }
297
298 #[tokio::test]
299 async fn insert_then_get_returns_ready() {
300 let dir = TempDir::new().unwrap();
301 let registry = GraphRegistry::new();
302 let handle = build_handle("alpha", dir.path()).await;
303 registry.insert(Arc::clone(&handle)).await.unwrap();
304
305 match registry.get(&handle.key) {
306 RegistryLookup::Ready(found) => {
307 assert!(Arc::ptr_eq(&found, &handle));
308 }
309 RegistryLookup::Gone => panic!("expected Ready, got Gone"),
310 }
311 }
312
313 #[tokio::test]
314 async fn get_nonexistent_returns_gone() {
315 let registry = GraphRegistry::new();
316 let key = GraphKey::cluster(GraphId::try_from("ghost").unwrap());
317 match registry.get(&key) {
318 RegistryLookup::Gone => {}
319 RegistryLookup::Ready(_) => panic!("expected Gone"),
320 }
321 }
322
323 #[tokio::test]
324 async fn insert_duplicate_key_returns_error() {
325 let dir = TempDir::new().unwrap();
326 let registry = GraphRegistry::new();
327 let h1 = build_handle("alpha", dir.path()).await;
328 let dir2 = TempDir::new().unwrap();
330 let h2 = build_handle("alpha", dir2.path()).await;
331 registry.insert(h1).await.unwrap();
332
333 match registry.insert(h2).await {
334 Err(InsertError::DuplicateKey(_)) => {}
335 other => panic!("expected DuplicateKey, got {other:?}"),
336 }
337 }
338
339 #[tokio::test]
340 async fn insert_duplicate_uri_returns_error() {
341 let dir = TempDir::new().unwrap();
342 let shared_uri = dir.path().join("shared").to_str().unwrap().to_string();
344 let engine = Omnigraph::init(&shared_uri, TEST_SCHEMA).await.unwrap();
345 let engine = Arc::new(engine);
346 let h1 = Arc::new(GraphHandle {
347 key: GraphKey::cluster(GraphId::try_from("alpha").unwrap()),
348 uri: shared_uri.clone(),
349 engine: Arc::clone(&engine),
350 policy: None,
351 queries: None,
352 });
353 let h2 = Arc::new(GraphHandle {
354 key: GraphKey::cluster(GraphId::try_from("beta").unwrap()),
355 uri: shared_uri,
356 engine,
357 policy: None,
358 queries: None,
359 });
360
361 let registry = GraphRegistry::new();
362 registry.insert(h1).await.unwrap();
363 match registry.insert(h2).await {
364 Err(InsertError::DuplicateUri(_)) => {}
365 other => panic!("expected DuplicateUri, got {other:?}"),
366 }
367 }
368
369 #[tokio::test]
370 async fn list_returns_all_inserted_handles() {
371 let dir = TempDir::new().unwrap();
372 let registry = GraphRegistry::new();
373 for name in ["alpha", "beta", "gamma"] {
374 let h = build_handle(name, dir.path()).await;
375 registry.insert(h).await.unwrap();
376 }
377 assert_eq!(registry.len(), 3);
378 let mut ids: Vec<_> = registry
379 .list()
380 .into_iter()
381 .map(|h| h.key.graph_id.as_str().to_string())
382 .collect();
383 ids.sort();
384 assert_eq!(ids, vec!["alpha", "beta", "gamma"]);
385 }
386
387 #[tokio::test]
388 async fn from_handles_bulk_init_succeeds() {
389 let dir = TempDir::new().unwrap();
390 let handles = vec![
391 build_handle("alpha", dir.path()).await,
392 build_handle("beta", dir.path()).await,
393 ];
394 let registry = GraphRegistry::from_handles(handles).unwrap();
395 assert_eq!(registry.len(), 2);
396 }
397
398 #[tokio::test]
399 async fn from_handles_rejects_duplicate_keys() {
400 let dir1 = TempDir::new().unwrap();
401 let dir2 = TempDir::new().unwrap();
402 let h1 = build_handle("alpha", dir1.path()).await;
403 let h2 = build_handle("alpha", dir2.path()).await;
404 let err = match GraphRegistry::from_handles(vec![h1, h2]) {
405 Ok(_) => panic!("expected DuplicateKey, got Ok"),
406 Err(err) => err,
407 };
408 assert!(
409 matches!(err, InsertError::DuplicateKey(_)),
410 "expected DuplicateKey, got {err}",
411 );
412 }
413
414 #[tokio::test]
415 async fn from_handles_rejects_duplicate_uris() {
416 let dir = TempDir::new().unwrap();
417 let shared_uri = dir.path().join("shared").to_str().unwrap().to_string();
418 let engine = Arc::new(Omnigraph::init(&shared_uri, TEST_SCHEMA).await.unwrap());
419 let h1 = Arc::new(GraphHandle {
420 key: GraphKey::cluster(GraphId::try_from("alpha").unwrap()),
421 uri: shared_uri.clone(),
422 engine: Arc::clone(&engine),
423 policy: None,
424 queries: None,
425 });
426 let h2 = Arc::new(GraphHandle {
427 key: GraphKey::cluster(GraphId::try_from("beta").unwrap()),
428 uri: shared_uri,
429 engine,
430 policy: None,
431 queries: None,
432 });
433 let err = match GraphRegistry::from_handles(vec![h1, h2]) {
434 Ok(_) => panic!("expected DuplicateUri, got Ok"),
435 Err(err) => err,
436 };
437 assert!(
438 matches!(err, InsertError::DuplicateUri(_)),
439 "expected DuplicateUri, got {err}",
440 );
441 }
442
443 #[tokio::test(flavor = "multi_thread")]
450 async fn concurrent_insert_same_key_exactly_one_succeeds() {
451 const N: usize = 8;
452
453 let registry = Arc::new(GraphRegistry::new());
454 let mut handles = Vec::with_capacity(N);
456 let mut dirs = Vec::with_capacity(N);
457 for _ in 0..N {
458 let d = TempDir::new().unwrap();
459 handles.push(build_handle("contested", d.path()).await);
460 dirs.push(d);
461 }
462
463 let barrier = Arc::new(tokio::sync::Barrier::new(N));
464 let mut tasks = Vec::with_capacity(N);
465 for handle in handles {
466 let registry = Arc::clone(®istry);
467 let barrier = Arc::clone(&barrier);
468 tasks.push(tokio::spawn(async move {
469 barrier.wait().await;
470 registry.insert(handle).await
471 }));
472 }
473
474 let mut ok_count = 0usize;
475 let mut dup_count = 0usize;
476 for t in tasks {
477 match t.await.unwrap() {
478 Ok(()) => ok_count += 1,
479 Err(InsertError::DuplicateKey(_)) => dup_count += 1,
480 Err(other) => panic!("unexpected error: {other:?}"),
481 }
482 }
483 assert_eq!(ok_count, 1, "exactly one insert must succeed");
484 assert_eq!(dup_count, N - 1, "the rest must return DuplicateKey");
485 assert_eq!(registry.len(), 1);
486
487 drop(dirs);
489 }
490
491 #[tokio::test(flavor = "multi_thread")]
494 async fn concurrent_insert_distinct_keys_all_succeed() {
495 const N: usize = 8;
496
497 let registry = Arc::new(GraphRegistry::new());
498 let mut handles = Vec::with_capacity(N);
500 let mut dirs = Vec::with_capacity(N);
501 for i in 0..N {
502 let d = TempDir::new().unwrap();
503 handles.push(build_handle(&format!("graph-{i}"), d.path()).await);
504 dirs.push(d);
505 }
506
507 let barrier = Arc::new(tokio::sync::Barrier::new(N));
508 let mut tasks = Vec::with_capacity(N);
509 for handle in handles {
510 let registry = Arc::clone(®istry);
511 let barrier = Arc::clone(&barrier);
512 tasks.push(tokio::spawn(async move {
513 barrier.wait().await;
514 registry.insert(handle).await
515 }));
516 }
517 for t in tasks {
518 t.await.unwrap().unwrap();
519 }
520 assert_eq!(registry.len(), N);
521 drop(dirs);
522 }
523
524 #[tokio::test(flavor = "multi_thread")]
528 async fn concurrent_reads_during_inserts_see_consistent_snapshots() {
529 let dir = TempDir::new().unwrap();
530 let registry = Arc::new(GraphRegistry::new());
531
532 const N_WRITES: usize = 10;
534 let writer_registry = Arc::clone(®istry);
535 let writer_dir = dir.path().to_path_buf();
536 let writer = tokio::spawn(async move {
537 for i in 0..N_WRITES {
538 let h = build_handle(&format!("graph-{i}"), &writer_dir).await;
539 writer_registry.insert(h).await.unwrap();
540 }
541 });
542
543 let reader_registry = Arc::clone(®istry);
547 let reader = tokio::spawn(async move {
548 for _ in 0..200 {
549 let snap = reader_registry.list();
550 assert!(snap.len() <= N_WRITES);
551 for handle in &snap {
552 match reader_registry.get(&handle.key) {
553 RegistryLookup::Ready(found) => {
554 assert!(Arc::ptr_eq(&found, handle));
555 }
556 RegistryLookup::Gone => panic!(
557 "snapshot listed key {} but get() returned Gone",
558 handle.key.graph_id
559 ),
560 }
561 }
562 tokio::task::yield_now().await;
563 }
564 });
565
566 writer.await.unwrap();
567 reader.await.unwrap();
568 assert_eq!(registry.len(), N_WRITES);
569 }
570}