use std::collections::HashMap;
use std::sync::Arc;
use openraft::BasicNode;
use tracing::info;
use orca_core::config::ServiceConfig;
use crate::raft::OrcaRaft;
use crate::store::{Assignment, ClusterStore, NodeEntry, RaftEntry};
pub struct ClusterState {
pub raft: Arc<OrcaRaft>,
pub store: Arc<ClusterStore>,
}
impl ClusterState {
pub fn new(raft: Arc<OrcaRaft>, store: Arc<ClusterStore>) -> Self {
Self { raft, store }
}
pub fn get_nodes(&self) -> anyhow::Result<HashMap<u64, NodeEntry>> {
self.store.get_all_nodes()
}
pub fn get_services(&self) -> anyhow::Result<HashMap<String, ServiceConfig>> {
self.store.get_all_services()
}
pub fn get_all_assignments(&self) -> anyhow::Result<Vec<Assignment>> {
self.store.get_all_assignments()
}
pub fn get_assignments(&self, service: &str) -> anyhow::Result<Vec<Assignment>> {
self.store.get_assignments(service)
}
pub fn get_node_assignments(&self, node_id: u64) -> anyhow::Result<Vec<Assignment>> {
let all = self.store.get_all_assignments()?;
Ok(all.into_iter().filter(|a| a.node_id == node_id).collect())
}
pub async fn propose_deploy(&self, services: &[ServiceConfig]) -> anyhow::Result<()> {
for svc in services {
self.propose(RaftEntry::SetService(Box::new(svc.clone())))
.await?;
}
info!("Proposed deploy of {} services to Raft", services.len());
Ok(())
}
pub async fn register_node(
&self,
node_id: u64,
address: String,
labels: HashMap<String, String>,
) -> anyhow::Result<()> {
let node = BasicNode {
addr: address.clone(),
};
let _ = self.raft.add_learner(node_id, node, true).await;
self.propose(RaftEntry::RegisterNode {
node_id,
address,
labels,
})
.await?;
Ok(())
}
pub async fn assign_workload(
&self,
service: &str,
replica_idx: u32,
node_id: u64,
) -> anyhow::Result<()> {
self.propose(RaftEntry::AssignWorkload {
service: service.to_string(),
replica_idx,
node_id,
})
.await
}
pub async fn unassign_workload(
&self,
service: &str,
replica_idx: u32,
node_id: u64,
) -> anyhow::Result<()> {
self.propose(RaftEntry::UnassignWorkload {
service: service.to_string(),
replica_idx,
node_id,
})
.await
}
async fn propose(&self, entry: RaftEntry) -> anyhow::Result<()> {
self.raft
.client_write(entry)
.await
.map_err(|e| anyhow::anyhow!("Raft propose failed: {e}"))?;
Ok(())
}
}