kotoba_db_cluster/
lib.rs

1//! # KotobaDB Cluster
2//!
3//! Distributed clustering and consensus implementation for KotobaDB.
4//! Provides high availability, fault tolerance, and horizontal scalability.
5//!
6//! ## Features
7//!
8//! - **Raft Consensus**: Leader election and log replication
9//! - **Automatic Failover**: Transparent leader failover
10//! - **Horizontal Scaling**: Data partitioning across nodes
11//! - **Fault Tolerance**: Survives node failures
12//! - **Eventual Consistency**: Tunable consistency levels
13
14pub mod consensus;
15pub mod membership;
16pub mod partitioning;
17pub mod replication;
18
19#[cfg(feature = "full")]
20pub mod cluster;
21
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25use serde::{Deserialize, Serialize};
26
27/// Unique identifier for cluster nodes
28#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct NodeId(pub String);
30
31/// Cluster configuration
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ClusterConfig {
34    pub nodes: HashMap<NodeId, NodeInfo>,
35    pub replication_factor: usize,
36    pub partition_count: usize,
37}
38
39impl Default for ClusterConfig {
40    fn default() -> Self {
41        Self {
42            nodes: HashMap::new(),
43            replication_factor: 3,
44            partition_count: 64,
45        }
46    }
47}
48
49/// Information about a cluster node
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NodeInfo {
52    pub id: NodeId,
53    pub address: String,
54    pub port: u16,
55    pub role: NodeRole,
56    pub partitions: Vec<PartitionId>,
57}
58
59/// Node roles in the cluster
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum NodeRole {
62    /// Follows leader and replicates log
63    Follower,
64    /// Candidate for leadership
65    Candidate,
66    /// Current leader, accepts client requests
67    Leader,
68}
69
70/// Partition identifier for data sharding
71#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct PartitionId(pub u32);
73
74/// Cluster state
75#[derive(Debug)]
76pub struct ClusterState {
77    pub config: Arc<RwLock<ClusterConfig>>,
78    pub local_node: NodeId,
79    pub consensus_state: Arc<RwLock<ConsensusState>>,
80    pub partition_table: Arc<RwLock<PartitionTable>>,
81}
82
83impl ClusterState {
84    /// Create a new cluster state
85    pub fn new(local_node: NodeId) -> Self {
86        Self {
87            config: Arc::new(RwLock::new(ClusterConfig::default())),
88            local_node,
89            consensus_state: Arc::new(RwLock::new(ConsensusState::new())),
90            partition_table: Arc::new(RwLock::new(PartitionTable::new())),
91        }
92    }
93
94    /// Get the current leader node
95    pub async fn get_leader(&self) -> Option<NodeId> {
96        let consensus = self.consensus_state.read().await;
97        consensus.current_leader.clone()
98    }
99
100    /// Check if this node is the leader
101    pub async fn is_leader(&self) -> bool {
102        let consensus = self.consensus_state.read().await;
103        consensus.current_leader.as_ref() == Some(&self.local_node)
104    }
105
106    /// Get partition assignment for a key
107    pub async fn get_partition_for_key(&self, key: &[u8]) -> PartitionId {
108        let table = self.partition_table.read().await;
109        table.get_partition(key)
110    }
111
112    /// Get nodes responsible for a partition
113    pub async fn get_nodes_for_partition(&self, partition: &PartitionId) -> Vec<NodeId> {
114        let config = self.config.read().await;
115        config.nodes.iter()
116            .filter(|(_, info)| info.partitions.contains(partition))
117            .map(|(id, _)| id.clone())
118            .collect()
119    }
120
121    /// Get nodes responsible for a key
122    pub async fn get_nodes_for_key(&self, key: &[u8]) -> Vec<NodeId> {
123        let partition = self.get_partition_for_key(key).await;
124        self.get_nodes_for_partition(&partition).await
125    }
126
127    /// Get partitions for a node
128    pub async fn get_partitions_for_node(&self, node_id: &NodeId) -> Vec<PartitionId> {
129        let config = self.config.read().await;
130        config.nodes.get(node_id)
131            .map(|info| info.partitions.clone())
132            .unwrap_or_default()
133    }
134}
135
136/// Log entry for consensus
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct LogEntry {
139    pub term: u64,
140    pub index: u64,
141    pub operation: Operation,
142}
143
144/// Database operations that can be replicated
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub enum Operation {
147    CreateNode {
148        properties: HashMap<String, Value>,
149    },
150    UpdateNode {
151        cid: String,
152        properties: HashMap<String, Value>,
153    },
154    DeleteNode {
155        cid: String,
156    },
157    CreateEdge {
158        source_cid: String,
159        target_cid: String,
160        properties: HashMap<String, Value>,
161    },
162    UpdateEdge {
163        cid: String,
164        properties: HashMap<String, Value>,
165    },
166    DeleteEdge {
167        cid: String,
168    },
169}
170
171/// Generic value type for operations
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum Value {
174    String(String),
175    Int(i64),
176    Float(f64),
177    Bool(bool),
178    Bytes(Vec<u8>),
179    Link(String), // CID as string
180}
181
182/// Consensus algorithm state (simplified Raft)
183#[derive(Debug)]
184pub struct ConsensusState {
185    pub current_term: u64,
186    pub voted_for: Option<NodeId>,
187    pub current_leader: Option<NodeId>,
188    pub log: Vec<LogEntry>,
189    pub commit_index: u64,
190    pub last_applied: u64,
191}
192
193impl ConsensusState {
194    pub fn new() -> Self {
195        Self {
196            current_term: 0,
197            voted_for: None,
198            current_leader: None,
199            log: Vec::new(),
200            commit_index: 0,
201            last_applied: 0,
202        }
203    }
204}
205
206
207/// Partition table for data distribution
208#[derive(Debug)]
209pub struct PartitionTable {
210    partition_count: usize,
211}
212
213impl PartitionTable {
214    pub fn new() -> Self {
215        Self {
216            partition_count: 64, // Default partition count
217        }
218    }
219
220    /// Get partition for a key using consistent hashing
221    pub fn get_partition(&self, key: &[u8]) -> PartitionId {
222        use std::collections::hash_map::DefaultHasher;
223        use std::hash::{Hash, Hasher};
224
225        let mut hasher = DefaultHasher::new();
226        key.hash(&mut hasher);
227        let hash = hasher.finish();
228
229        PartitionId((hash % self.partition_count as u64) as u32)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_partition_assignment() {
239        let table = PartitionTable::new();
240
241        let key1 = b"alice";
242        let key2 = b"bob";
243        let key3 = b"alice"; // Same key should get same partition
244
245        let partition1 = table.get_partition(key1);
246        let partition2 = table.get_partition(key2);
247        let partition3 = table.get_partition(key3);
248
249        assert_eq!(partition1, partition3); // Same key, same partition
250        assert_ne!(partition1, partition2); // Different keys, likely different partitions
251    }
252
253    #[test]
254    fn test_cluster_state_creation() {
255        let node_id = NodeId("node-1".to_string());
256        let state = ClusterState::new(node_id.clone());
257
258        assert_eq!(state.local_node, node_id);
259    }
260}