Skip to main content

orlando_cluster/
cross_cluster_directory.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use orlando_core::{ClusterId, GrainId};
7
8/// Tracks which cluster owns each grain activation globally.
9/// Implementations must provide CAS semantics: only one cluster can own
10/// a grain at a time. Concurrent registrations are resolved by first-writer-wins.
11#[async_trait]
12pub trait CrossClusterDirectory: Send + Sync + 'static {
13    /// Look up which cluster owns a grain.
14    async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError>;
15
16    /// CAS register. Returns actual owner (may differ from requested if another
17    /// cluster won the race). The epoch is a monotonic fencing token — stale
18    /// registrations with a lower epoch are rejected.
19    async fn register(
20        &self,
21        grain_id: &GrainId,
22        cluster_id: &ClusterId,
23        epoch: u64,
24    ) -> Result<GrainOwnership, DirectoryError>;
25
26    /// Release ownership. Only the owning cluster can deregister.
27    async fn deregister(
28        &self,
29        grain_id: &GrainId,
30        cluster_id: &ClusterId,
31    ) -> Result<(), DirectoryError>;
32
33    /// Extend TTL for ownership. No-op for backends without TTL (e.g., PostgreSQL).
34    async fn renew(
35        &self,
36        _grain_id: &GrainId,
37        _cluster_id: &ClusterId,
38    ) -> Result<(), DirectoryError> {
39        Ok(())
40    }
41
42    /// Enumerate grains currently owned by a given cluster. Used by failover
43    /// to discover which grains require promotion when a peer is declared dead.
44    ///
45    /// Default returns an empty list so existing implementors compile without
46    /// modification; backends should override to enable failover support.
47    async fn list_owned_by(
48        &self,
49        _cluster_id: &ClusterId,
50    ) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
51        Ok(Vec::new())
52    }
53}
54
55/// Who owns a grain activation.
56#[derive(Debug, Clone)]
57pub struct GrainOwnership {
58    pub cluster_id: ClusterId,
59    pub epoch: u64,
60    pub registered_at: SystemTime,
61}
62
63#[derive(Debug, thiserror::Error)]
64#[non_exhaustive]
65pub enum DirectoryError {
66    #[error("directory backend unavailable: {0}")]
67    Unavailable(String),
68    #[error("stale epoch: current is {current}, requested {requested}")]
69    StaleEpoch { current: u64, requested: u64 },
70    #[error("directory backend error: {0}")]
71    Backend(String),
72}
73
74/// In-memory implementation for testing and single-process multi-cluster setups.
75#[derive(Debug, Default)]
76pub struct InMemoryCrossClusterDirectory {
77    entries: Mutex<HashMap<String, (GrainId, GrainOwnership)>>,
78}
79
80impl InMemoryCrossClusterDirectory {
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    fn key(grain_id: &GrainId) -> String {
86        format!("{}/{}", grain_id.type_name, grain_id.key)
87    }
88}
89
90#[async_trait]
91impl CrossClusterDirectory for InMemoryCrossClusterDirectory {
92    async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError> {
93        let entries = self
94            .entries
95            .lock()
96            .map_err(|e| DirectoryError::Backend(e.to_string()))?;
97        Ok(entries.get(&Self::key(grain_id)).map(|(_, o)| o.clone()))
98    }
99
100    async fn register(
101        &self,
102        grain_id: &GrainId,
103        cluster_id: &ClusterId,
104        epoch: u64,
105    ) -> Result<GrainOwnership, DirectoryError> {
106        let mut entries = self
107            .entries
108            .lock()
109            .map_err(|e| DirectoryError::Backend(e.to_string()))?;
110        let key = Self::key(grain_id);
111
112        // CAS: first writer wins, but higher epoch can reclaim
113        if let Some((_, existing)) = entries.get(&key) {
114            if existing.cluster_id != *cluster_id && epoch <= existing.epoch {
115                // Another cluster owns it at a higher or equal epoch
116                return Ok(existing.clone());
117            }
118            if existing.cluster_id == *cluster_id && epoch <= existing.epoch {
119                // We already own it at this or higher epoch
120                return Ok(existing.clone());
121            }
122            // Higher epoch — allow re-registration (failover promotion)
123        }
124
125        let ownership = GrainOwnership {
126            cluster_id: cluster_id.clone(),
127            epoch,
128            registered_at: SystemTime::now(),
129        };
130        entries.insert(key, (grain_id.clone(), ownership.clone()));
131        Ok(ownership)
132    }
133
134    async fn deregister(
135        &self,
136        grain_id: &GrainId,
137        cluster_id: &ClusterId,
138    ) -> Result<(), DirectoryError> {
139        let mut entries = self
140            .entries
141            .lock()
142            .map_err(|e| DirectoryError::Backend(e.to_string()))?;
143        let key = Self::key(grain_id);
144
145        // Only the owning cluster can deregister
146        if let Some((_, existing)) = entries.get(&key)
147            && existing.cluster_id == *cluster_id
148        {
149            entries.remove(&key);
150        }
151        Ok(())
152    }
153
154    async fn list_owned_by(
155        &self,
156        cluster_id: &ClusterId,
157    ) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
158        let entries = self
159            .entries
160            .lock()
161            .map_err(|e| DirectoryError::Backend(e.to_string()))?;
162        Ok(entries
163            .values()
164            .filter(|(_, o)| o.cluster_id == *cluster_id)
165            .cloned()
166            .collect())
167    }
168}