Skip to main content

orca_control/
cluster_state.rs

1//! Cluster-wide state backed by Raft consensus.
2//!
3//! Reads are served locally from the store. Writes are proposed
4//! to the Raft leader for consensus.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use openraft::BasicNode;
10use tracing::info;
11
12use orca_core::config::ServiceConfig;
13
14use crate::raft::OrcaRaft;
15use crate::store::{Assignment, ClusterStore, NodeEntry, RaftEntry};
16
17/// Cluster state manager wrapping Raft consensus.
18pub struct ClusterState {
19    /// The Raft node.
20    pub raft: Arc<OrcaRaft>,
21    /// Local state store (reads served directly).
22    pub store: Arc<ClusterStore>,
23}
24
25impl ClusterState {
26    /// Create a new cluster state manager.
27    pub fn new(raft: Arc<OrcaRaft>, store: Arc<ClusterStore>) -> Self {
28        Self { raft, store }
29    }
30
31    // -- Read operations (local, no Raft) --
32
33    /// Get all registered nodes.
34    pub fn get_nodes(&self) -> anyhow::Result<HashMap<u64, NodeEntry>> {
35        self.store.get_all_nodes()
36    }
37
38    /// Get all service configs.
39    pub fn get_services(&self) -> anyhow::Result<HashMap<String, ServiceConfig>> {
40        self.store.get_all_services()
41    }
42
43    /// Get all workload assignments.
44    pub fn get_all_assignments(&self) -> anyhow::Result<Vec<Assignment>> {
45        self.store.get_all_assignments()
46    }
47
48    /// Get assignments for a specific service.
49    pub fn get_assignments(&self, service: &str) -> anyhow::Result<Vec<Assignment>> {
50        self.store.get_assignments(service)
51    }
52
53    /// Get assignments for a specific node.
54    pub fn get_node_assignments(&self, node_id: u64) -> anyhow::Result<Vec<Assignment>> {
55        let all = self.store.get_all_assignments()?;
56        Ok(all.into_iter().filter(|a| a.node_id == node_id).collect())
57    }
58
59    // -- Write operations (proposed to Raft leader) --
60
61    /// Deploy services — proposes SetService entries to Raft.
62    pub async fn propose_deploy(&self, services: &[ServiceConfig]) -> anyhow::Result<()> {
63        for svc in services {
64            self.propose(RaftEntry::SetService(Box::new(svc.clone())))
65                .await?;
66        }
67        info!("Proposed deploy of {} services to Raft", services.len());
68        Ok(())
69    }
70
71    /// Register a node in the cluster.
72    pub async fn register_node(
73        &self,
74        node_id: u64,
75        address: String,
76        labels: HashMap<String, String>,
77    ) -> anyhow::Result<()> {
78        // Add to Raft membership
79        let node = BasicNode {
80            addr: address.clone(),
81        };
82        let _ = self.raft.add_learner(node_id, node, true).await;
83
84        // Register in cluster state
85        self.propose(RaftEntry::RegisterNode {
86            node_id,
87            address,
88            labels,
89        })
90        .await?;
91        Ok(())
92    }
93
94    /// Assign a workload replica to a node.
95    pub async fn assign_workload(
96        &self,
97        service: &str,
98        replica_idx: u32,
99        node_id: u64,
100    ) -> anyhow::Result<()> {
101        self.propose(RaftEntry::AssignWorkload {
102            service: service.to_string(),
103            replica_idx,
104            node_id,
105        })
106        .await
107    }
108
109    /// Unassign a workload replica from a node.
110    pub async fn unassign_workload(
111        &self,
112        service: &str,
113        replica_idx: u32,
114        node_id: u64,
115    ) -> anyhow::Result<()> {
116        self.propose(RaftEntry::UnassignWorkload {
117            service: service.to_string(),
118            replica_idx,
119            node_id,
120        })
121        .await
122    }
123
124    /// Propose a Raft entry for consensus.
125    async fn propose(&self, entry: RaftEntry) -> anyhow::Result<()> {
126        self.raft
127            .client_write(entry)
128            .await
129            .map_err(|e| anyhow::anyhow!("Raft propose failed: {e}"))?;
130        Ok(())
131    }
132}