orlando_cluster/
cross_cluster_directory.rs1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use orlando_core::{ClusterId, GrainId};
7
8#[async_trait]
12pub trait CrossClusterDirectory: Send + Sync + 'static {
13 async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError>;
15
16 async fn register(
20 &self,
21 grain_id: &GrainId,
22 cluster_id: &ClusterId,
23 epoch: u64,
24 ) -> Result<GrainOwnership, DirectoryError>;
25
26 async fn deregister(
28 &self,
29 grain_id: &GrainId,
30 cluster_id: &ClusterId,
31 ) -> Result<(), DirectoryError>;
32
33 async fn renew(
35 &self,
36 _grain_id: &GrainId,
37 _cluster_id: &ClusterId,
38 ) -> Result<(), DirectoryError> {
39 Ok(())
40 }
41
42 async fn list_owned_by(
48 &self,
49 _cluster_id: &ClusterId,
50 ) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
51 Ok(Vec::new())
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct GrainOwnership {
58 pub cluster_id: ClusterId,
59 pub epoch: u64,
60 pub registered_at: SystemTime,
61}
62
63#[derive(Debug, thiserror::Error)]
64#[non_exhaustive]
65pub enum DirectoryError {
66 #[error("directory backend unavailable: {0}")]
67 Unavailable(String),
68 #[error("stale epoch: current is {current}, requested {requested}")]
69 StaleEpoch { current: u64, requested: u64 },
70 #[error("directory backend error: {0}")]
71 Backend(String),
72}
73
74#[derive(Debug, Default)]
76pub struct InMemoryCrossClusterDirectory {
77 entries: Mutex<HashMap<String, (GrainId, GrainOwnership)>>,
78}
79
80impl InMemoryCrossClusterDirectory {
81 pub fn new() -> Self {
82 Self::default()
83 }
84
85 fn key(grain_id: &GrainId) -> String {
86 format!("{}/{}", grain_id.type_name, grain_id.key)
87 }
88}
89
90#[async_trait]
91impl CrossClusterDirectory for InMemoryCrossClusterDirectory {
92 async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError> {
93 let entries = self
94 .entries
95 .lock()
96 .map_err(|e| DirectoryError::Backend(e.to_string()))?;
97 Ok(entries.get(&Self::key(grain_id)).map(|(_, o)| o.clone()))
98 }
99
100 async fn register(
101 &self,
102 grain_id: &GrainId,
103 cluster_id: &ClusterId,
104 epoch: u64,
105 ) -> Result<GrainOwnership, DirectoryError> {
106 let mut entries = self
107 .entries
108 .lock()
109 .map_err(|e| DirectoryError::Backend(e.to_string()))?;
110 let key = Self::key(grain_id);
111
112 if let Some((_, existing)) = entries.get(&key) {
114 if existing.cluster_id != *cluster_id && epoch <= existing.epoch {
115 return Ok(existing.clone());
117 }
118 if existing.cluster_id == *cluster_id && epoch <= existing.epoch {
119 return Ok(existing.clone());
121 }
122 }
124
125 let ownership = GrainOwnership {
126 cluster_id: cluster_id.clone(),
127 epoch,
128 registered_at: SystemTime::now(),
129 };
130 entries.insert(key, (grain_id.clone(), ownership.clone()));
131 Ok(ownership)
132 }
133
134 async fn deregister(
135 &self,
136 grain_id: &GrainId,
137 cluster_id: &ClusterId,
138 ) -> Result<(), DirectoryError> {
139 let mut entries = self
140 .entries
141 .lock()
142 .map_err(|e| DirectoryError::Backend(e.to_string()))?;
143 let key = Self::key(grain_id);
144
145 if let Some((_, existing)) = entries.get(&key)
147 && existing.cluster_id == *cluster_id
148 {
149 entries.remove(&key);
150 }
151 Ok(())
152 }
153
154 async fn list_owned_by(
155 &self,
156 cluster_id: &ClusterId,
157 ) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
158 let entries = self
159 .entries
160 .lock()
161 .map_err(|e| DirectoryError::Backend(e.to_string()))?;
162 Ok(entries
163 .values()
164 .filter(|(_, o)| o.cluster_id == *cluster_id)
165 .cloned()
166 .collect())
167 }
168}