use std::collections::BTreeMap;
use std::io::Cursor;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use openraft::storage::RaftStateMachine;
use openraft::{
BasicNode, Entry, EntryPayload, LogId, RaftSnapshotBuilder, Snapshot, SnapshotMeta,
StorageError, StorageIOError, StoredMembership,
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::lease::ConsolidationLease;
use super::types::*;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StateMachineData {
pub last_applied_log: Option<LogId<NodeId>>,
pub last_membership: StoredMembership<NodeId, BasicNode>,
pub realm_owners: BTreeMap<String, NodeId>,
pub nodes: BTreeMap<NodeId, String>,
pub leases: BTreeMap<String, ConsolidationLease>,
}
#[derive(Debug)]
struct StoredSnapshot {
meta: SnapshotMeta<NodeId, BasicNode>,
data: Vec<u8>,
}
#[derive(Debug)]
pub struct HirnStateMachine {
data: RwLock<StateMachineData>,
snapshot_idx: AtomicU64,
current_snapshot: RwLock<Option<StoredSnapshot>>,
}
impl Default for HirnStateMachine {
fn default() -> Self {
Self {
data: RwLock::new(StateMachineData::default()),
snapshot_idx: AtomicU64::new(0),
current_snapshot: RwLock::new(None),
}
}
}
impl HirnStateMachine {
pub fn new() -> Self {
Self::default()
}
pub async fn realm_owner(&self, realm: &str) -> Option<NodeId> {
self.data.read().await.realm_owners.get(realm).copied()
}
pub async fn realm_owners(&self) -> BTreeMap<String, NodeId> {
self.data.read().await.realm_owners.clone()
}
pub async fn node_addr(&self, node_id: NodeId) -> Option<String> {
self.data.read().await.nodes.get(&node_id).cloned()
}
pub async fn nodes(&self) -> BTreeMap<NodeId, String> {
self.data.read().await.nodes.clone()
}
pub async fn active_lease(&self, realm: &str) -> Option<ConsolidationLease> {
let data = self.data.read().await;
data.leases.get(realm).and_then(|l| {
if l.is_expired() {
None
} else {
Some(l.clone())
}
})
}
fn apply_request(data: &mut StateMachineData, req: &RaftRequest) -> RaftResponse {
match req {
RaftRequest::AssignRealm { realm, owner_node } => {
info!(realm = %realm, owner = owner_node, "assigning realm to node");
data.realm_owners.insert(realm.clone(), *owner_node);
RaftResponse::RealmAssigned {
realm: realm.clone(),
owner: *owner_node,
}
}
RaftRequest::ReleaseRealm { realm } => {
info!(realm = %realm, "releasing realm ownership");
data.realm_owners.remove(realm);
RaftResponse::Ok
}
RaftRequest::AcquireLease {
realm,
holder,
duration_secs,
} => {
if let Some(existing) = data.leases.get(realm) {
if !existing.is_expired() && existing.holder != *holder {
debug!(
realm = %realm,
current_holder = existing.holder,
requester = holder,
"lease conflict — already held"
);
return RaftResponse::LeaseConflict {
holder: existing.holder,
expires_at_epoch_secs: existing.expires_at,
};
}
}
let lease = ConsolidationLease::new(realm.clone(), *holder, *duration_secs);
info!(realm = %realm, holder = holder, duration = duration_secs, "lease acquired");
data.leases.insert(realm.clone(), lease);
RaftResponse::Ok
}
RaftRequest::ReleaseLease { realm, holder } => {
if let Some(existing) = data.leases.get(realm) {
if existing.holder == *holder {
info!(realm = %realm, holder = holder, "lease released");
data.leases.remove(realm);
} else {
warn!(
realm = %realm,
holder = holder,
actual_holder = existing.holder,
"attempted to release lease not held by requester"
);
}
}
RaftResponse::Ok
}
RaftRequest::RenewLease {
realm,
holder,
duration_secs,
} => {
if let Some(lease) = data.leases.get_mut(realm) {
if lease.holder == *holder {
lease.renew(*duration_secs);
debug!(realm = %realm, holder = holder, "lease renewed");
return RaftResponse::Ok;
}
}
warn!(realm = %realm, holder = holder, "lease renewal failed — not held by requester");
RaftResponse::LeaseRenewalFailed {
realm: realm.clone(),
}
}
RaftRequest::RegisterNode { node_id, addr } => {
info!(node_id = node_id, addr = %addr, "node registered");
data.nodes.insert(*node_id, addr.clone());
RaftResponse::NodeRegistered { node_id: *node_id }
}
RaftRequest::DeregisterNode { node_id } => {
info!(node_id = node_id, "node deregistered");
data.nodes.remove(node_id);
data.realm_owners.retain(|_, owner| *owner != *node_id);
data.leases.retain(|_, lease| lease.holder != *node_id);
RaftResponse::Ok
}
}
}
}
impl RaftSnapshotBuilder<TypeConfig> for Arc<HirnStateMachine> {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
let (data, last_applied_log, last_membership) = {
let sm = self.data.read().await;
let data =
serde_json::to_vec(&*sm).map_err(|e| StorageIOError::read_state_machine(&e))?;
(data, sm.last_applied_log, sm.last_membership.clone())
};
let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1;
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{snapshot_idx}")
};
let meta = SnapshotMeta {
last_log_id: last_applied_log,
last_membership,
snapshot_id,
};
let stored = StoredSnapshot {
meta: meta.clone(),
data: data.clone(),
};
*self.current_snapshot.write().await = Some(stored);
Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
}
}
impl RaftStateMachine<TypeConfig> for Arc<HirnStateMachine> {
type SnapshotBuilder = Self;
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, BasicNode>), StorageError<NodeId>>
{
let sm = self.data.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
async fn apply<I>(&mut self, entries: I) -> Result<Vec<RaftResponse>, StorageError<NodeId>>
where
I: IntoIterator<Item = Entry<TypeConfig>> + Send,
{
let mut responses = Vec::new();
let mut sm = self.data.write().await;
for entry in entries {
sm.last_applied_log = Some(entry.log_id);
match entry.payload {
EntryPayload::Blank => {
responses.push(RaftResponse::Ok);
}
EntryPayload::Normal(ref req) => {
let resp = HirnStateMachine::apply_request(&mut sm, req);
responses.push(resp);
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
responses.push(RaftResponse::Ok);
}
}
}
Ok(responses)
}
async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
self.clone()
}
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<Cursor<Vec<u8>>>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<Cursor<Vec<u8>>>,
) -> Result<(), StorageError<NodeId>> {
let new_data: StateMachineData = serde_json::from_slice(snapshot.get_ref())
.map_err(|e| StorageIOError::read_snapshot(Some(meta.signature()), &e))?;
{
let mut sm = self.data.write().await;
*sm = new_data;
}
*self.current_snapshot.write().await = Some(StoredSnapshot {
meta: meta.clone(),
data: snapshot.into_inner(),
});
Ok(())
}
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
}))
}
None => Ok(None),
}
}
}