orca_control/
cluster_state.rs1use std::collections::HashMap;
7use std::sync::Arc;
8
9use openraft::BasicNode;
10use tracing::info;
11
12use orca_core::config::ServiceConfig;
13
14use crate::raft::OrcaRaft;
15use crate::store::{Assignment, ClusterStore, NodeEntry, RaftEntry};
16
17pub struct ClusterState {
19 pub raft: Arc<OrcaRaft>,
21 pub store: Arc<ClusterStore>,
23}
24
25impl ClusterState {
26 pub fn new(raft: Arc<OrcaRaft>, store: Arc<ClusterStore>) -> Self {
28 Self { raft, store }
29 }
30
31 pub fn get_nodes(&self) -> anyhow::Result<HashMap<u64, NodeEntry>> {
35 self.store.get_all_nodes()
36 }
37
38 pub fn get_services(&self) -> anyhow::Result<HashMap<String, ServiceConfig>> {
40 self.store.get_all_services()
41 }
42
43 pub fn get_all_assignments(&self) -> anyhow::Result<Vec<Assignment>> {
45 self.store.get_all_assignments()
46 }
47
48 pub fn get_assignments(&self, service: &str) -> anyhow::Result<Vec<Assignment>> {
50 self.store.get_assignments(service)
51 }
52
53 pub fn get_node_assignments(&self, node_id: u64) -> anyhow::Result<Vec<Assignment>> {
55 let all = self.store.get_all_assignments()?;
56 Ok(all.into_iter().filter(|a| a.node_id == node_id).collect())
57 }
58
59 pub async fn propose_deploy(&self, services: &[ServiceConfig]) -> anyhow::Result<()> {
63 for svc in services {
64 self.propose(RaftEntry::SetService(Box::new(svc.clone())))
65 .await?;
66 }
67 info!("Proposed deploy of {} services to Raft", services.len());
68 Ok(())
69 }
70
71 pub async fn register_node(
73 &self,
74 node_id: u64,
75 address: String,
76 labels: HashMap<String, String>,
77 ) -> anyhow::Result<()> {
78 let node = BasicNode {
80 addr: address.clone(),
81 };
82 let _ = self.raft.add_learner(node_id, node, true).await;
83
84 self.propose(RaftEntry::RegisterNode {
86 node_id,
87 address,
88 labels,
89 })
90 .await?;
91 Ok(())
92 }
93
94 pub async fn assign_workload(
96 &self,
97 service: &str,
98 replica_idx: u32,
99 node_id: u64,
100 ) -> anyhow::Result<()> {
101 self.propose(RaftEntry::AssignWorkload {
102 service: service.to_string(),
103 replica_idx,
104 node_id,
105 })
106 .await
107 }
108
109 pub async fn unassign_workload(
111 &self,
112 service: &str,
113 replica_idx: u32,
114 node_id: u64,
115 ) -> anyhow::Result<()> {
116 self.propose(RaftEntry::UnassignWorkload {
117 service: service.to_string(),
118 replica_idx,
119 node_id,
120 })
121 .await
122 }
123
124 async fn propose(&self, entry: RaftEntry) -> anyhow::Result<()> {
126 self.raft
127 .client_write(entry)
128 .await
129 .map_err(|e| anyhow::anyhow!("Raft propose failed: {e}"))?;
130 Ok(())
131 }
132}