1pub mod consensus;
15pub mod membership;
16pub mod partitioning;
17pub mod replication;
18
19#[cfg(feature = "full")]
20pub mod cluster;
21
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25use serde::{Deserialize, Serialize};
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct NodeId(pub String);
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ClusterConfig {
34 pub nodes: HashMap<NodeId, NodeInfo>,
35 pub replication_factor: usize,
36 pub partition_count: usize,
37}
38
39impl Default for ClusterConfig {
40 fn default() -> Self {
41 Self {
42 nodes: HashMap::new(),
43 replication_factor: 3,
44 partition_count: 64,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NodeInfo {
52 pub id: NodeId,
53 pub address: String,
54 pub port: u16,
55 pub role: NodeRole,
56 pub partitions: Vec<PartitionId>,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum NodeRole {
62 Follower,
64 Candidate,
66 Leader,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct PartitionId(pub u32);
73
74#[derive(Debug)]
76pub struct ClusterState {
77 pub config: Arc<RwLock<ClusterConfig>>,
78 pub local_node: NodeId,
79 pub consensus_state: Arc<RwLock<ConsensusState>>,
80 pub partition_table: Arc<RwLock<PartitionTable>>,
81}
82
83impl ClusterState {
84 pub fn new(local_node: NodeId) -> Self {
86 Self {
87 config: Arc::new(RwLock::new(ClusterConfig::default())),
88 local_node,
89 consensus_state: Arc::new(RwLock::new(ConsensusState::new())),
90 partition_table: Arc::new(RwLock::new(PartitionTable::new())),
91 }
92 }
93
94 pub async fn get_leader(&self) -> Option<NodeId> {
96 let consensus = self.consensus_state.read().await;
97 consensus.current_leader.clone()
98 }
99
100 pub async fn is_leader(&self) -> bool {
102 let consensus = self.consensus_state.read().await;
103 consensus.current_leader.as_ref() == Some(&self.local_node)
104 }
105
106 pub async fn get_partition_for_key(&self, key: &[u8]) -> PartitionId {
108 let table = self.partition_table.read().await;
109 table.get_partition(key)
110 }
111
112 pub async fn get_nodes_for_partition(&self, partition: &PartitionId) -> Vec<NodeId> {
114 let config = self.config.read().await;
115 config.nodes.iter()
116 .filter(|(_, info)| info.partitions.contains(partition))
117 .map(|(id, _)| id.clone())
118 .collect()
119 }
120
121 pub async fn get_nodes_for_key(&self, key: &[u8]) -> Vec<NodeId> {
123 let partition = self.get_partition_for_key(key).await;
124 self.get_nodes_for_partition(&partition).await
125 }
126
127 pub async fn get_partitions_for_node(&self, node_id: &NodeId) -> Vec<PartitionId> {
129 let config = self.config.read().await;
130 config.nodes.get(node_id)
131 .map(|info| info.partitions.clone())
132 .unwrap_or_default()
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct LogEntry {
139 pub term: u64,
140 pub index: u64,
141 pub operation: Operation,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub enum Operation {
147 CreateNode {
148 properties: HashMap<String, Value>,
149 },
150 UpdateNode {
151 cid: String,
152 properties: HashMap<String, Value>,
153 },
154 DeleteNode {
155 cid: String,
156 },
157 CreateEdge {
158 source_cid: String,
159 target_cid: String,
160 properties: HashMap<String, Value>,
161 },
162 UpdateEdge {
163 cid: String,
164 properties: HashMap<String, Value>,
165 },
166 DeleteEdge {
167 cid: String,
168 },
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum Value {
174 String(String),
175 Int(i64),
176 Float(f64),
177 Bool(bool),
178 Bytes(Vec<u8>),
179 Link(String), }
181
182#[derive(Debug)]
184pub struct ConsensusState {
185 pub current_term: u64,
186 pub voted_for: Option<NodeId>,
187 pub current_leader: Option<NodeId>,
188 pub log: Vec<LogEntry>,
189 pub commit_index: u64,
190 pub last_applied: u64,
191}
192
193impl ConsensusState {
194 pub fn new() -> Self {
195 Self {
196 current_term: 0,
197 voted_for: None,
198 current_leader: None,
199 log: Vec::new(),
200 commit_index: 0,
201 last_applied: 0,
202 }
203 }
204}
205
206
207#[derive(Debug)]
209pub struct PartitionTable {
210 partition_count: usize,
211}
212
213impl PartitionTable {
214 pub fn new() -> Self {
215 Self {
216 partition_count: 64, }
218 }
219
220 pub fn get_partition(&self, key: &[u8]) -> PartitionId {
222 use std::collections::hash_map::DefaultHasher;
223 use std::hash::{Hash, Hasher};
224
225 let mut hasher = DefaultHasher::new();
226 key.hash(&mut hasher);
227 let hash = hasher.finish();
228
229 PartitionId((hash % self.partition_count as u64) as u32)
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn test_partition_assignment() {
239 let table = PartitionTable::new();
240
241 let key1 = b"alice";
242 let key2 = b"bob";
243 let key3 = b"alice"; let partition1 = table.get_partition(key1);
246 let partition2 = table.get_partition(key2);
247 let partition3 = table.get_partition(key3);
248
249 assert_eq!(partition1, partition3); assert_ne!(partition1, partition2); }
252
253 #[test]
254 fn test_cluster_state_creation() {
255 let node_id = NodeId("node-1".to_string());
256 let state = ClusterState::new(node_id.clone());
257
258 assert_eq!(state.local_node, node_id);
259 }
260}