use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;
use async_trait::async_trait;
use orlando_core::{ClusterId, GrainId};
#[async_trait]
pub trait CrossClusterDirectory: Send + Sync + 'static {
async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError>;
async fn register(
&self,
grain_id: &GrainId,
cluster_id: &ClusterId,
epoch: u64,
) -> Result<GrainOwnership, DirectoryError>;
async fn deregister(
&self,
grain_id: &GrainId,
cluster_id: &ClusterId,
) -> Result<(), DirectoryError>;
async fn renew(
&self,
_grain_id: &GrainId,
_cluster_id: &ClusterId,
) -> Result<(), DirectoryError> {
Ok(())
}
async fn list_owned_by(
&self,
_cluster_id: &ClusterId,
) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
Ok(Vec::new())
}
}
#[derive(Debug, Clone)]
pub struct GrainOwnership {
pub cluster_id: ClusterId,
pub epoch: u64,
pub registered_at: SystemTime,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum DirectoryError {
#[error("directory backend unavailable: {0}")]
Unavailable(String),
#[error("stale epoch: current is {current}, requested {requested}")]
StaleEpoch { current: u64, requested: u64 },
#[error("directory backend error: {0}")]
Backend(String),
}
#[derive(Debug, Default)]
pub struct InMemoryCrossClusterDirectory {
entries: Mutex<HashMap<String, (GrainId, GrainOwnership)>>,
}
impl InMemoryCrossClusterDirectory {
pub fn new() -> Self {
Self::default()
}
fn key(grain_id: &GrainId) -> String {
format!("{}/{}", grain_id.type_name, grain_id.key)
}
}
#[async_trait]
impl CrossClusterDirectory for InMemoryCrossClusterDirectory {
async fn lookup(&self, grain_id: &GrainId) -> Result<Option<GrainOwnership>, DirectoryError> {
let entries = self
.entries
.lock()
.map_err(|e| DirectoryError::Backend(e.to_string()))?;
Ok(entries.get(&Self::key(grain_id)).map(|(_, o)| o.clone()))
}
async fn register(
&self,
grain_id: &GrainId,
cluster_id: &ClusterId,
epoch: u64,
) -> Result<GrainOwnership, DirectoryError> {
let mut entries = self
.entries
.lock()
.map_err(|e| DirectoryError::Backend(e.to_string()))?;
let key = Self::key(grain_id);
if let Some((_, existing)) = entries.get(&key) {
if existing.cluster_id != *cluster_id && epoch <= existing.epoch {
return Ok(existing.clone());
}
if existing.cluster_id == *cluster_id && epoch <= existing.epoch {
return Ok(existing.clone());
}
}
let ownership = GrainOwnership {
cluster_id: cluster_id.clone(),
epoch,
registered_at: SystemTime::now(),
};
entries.insert(key, (grain_id.clone(), ownership.clone()));
Ok(ownership)
}
async fn deregister(
&self,
grain_id: &GrainId,
cluster_id: &ClusterId,
) -> Result<(), DirectoryError> {
let mut entries = self
.entries
.lock()
.map_err(|e| DirectoryError::Backend(e.to_string()))?;
let key = Self::key(grain_id);
if let Some((_, existing)) = entries.get(&key)
&& existing.cluster_id == *cluster_id
{
entries.remove(&key);
}
Ok(())
}
async fn list_owned_by(
&self,
cluster_id: &ClusterId,
) -> Result<Vec<(GrainId, GrainOwnership)>, DirectoryError> {
let entries = self
.entries
.lock()
.map_err(|e| DirectoryError::Backend(e.to_string()))?;
Ok(entries
.values()
.filter(|(_, o)| o.cluster_id == *cluster_id)
.cloned()
.collect())
}
}