// 38_distributed_computing.ruchy - Distributed computing and clustering
import std::cluster
import std::rpc
import std::consensus
fn main() {
println("=== Distributed Computing ===\n")
// Cluster setup
println("=== Cluster Setup ===")
struct Node {
id: string,
address: string,
port: int,
role: NodeRole
}
enum NodeRole {
Leader,
Follower,
Candidate
}
struct Cluster {
nodes: list<Node>,
self_id: string,
state: ClusterState
}
impl Cluster {
fn new(config: ClusterConfig) -> Cluster {
let self_id = config.node_id || generate_node_id()
Cluster {
nodes: config.seeds.map(addr => Node {
id: "", // Will be discovered
address: addr.split(":")[0],
port: addr.split(":")[1].to_int(),
role: NodeRole::Follower
}),
self_id: self_id,
state: ClusterState::Initializing
}
}
fn join(mut self) {
println(f"Node {self.self_id} joining cluster...")
// Discover other nodes
for node in self.nodes {
match rpc::call(node.address, node.port, "handshake", self.self_id) {
Ok(response) => {
node.id = response.node_id
println(f"Connected to node {node.id}")
},
Err(e) => {
println(f"Failed to connect to {node.address}: {e}")
}
}
}
self.state = ClusterState::Active
self.start_heartbeat()
}
fn start_heartbeat(self) {
spawn async {
loop {
sleep_ms(1000)
self.send_heartbeat()
}
}
}
}
// Distributed data structures
println("\n=== Distributed Data Structures ===")
// Distributed HashMap
struct DistributedHashMap<K, V> {
local_data: map<K, V>,
cluster: Cluster,
replication_factor: int = 3
}
impl<K, V> DistributedHashMap<K, V> {
fn get(self, key: K) -> Option<V> {
let node = self.get_node_for_key(key)
if node.id == self.cluster.self_id {
self.local_data.get(key)
} else {
rpc::call(node.address, node.port, "get", key)
.ok()
.and_then(|response| response.value)
}
}
fn put(mut self, key: K, value: V) {
let primary = self.get_node_for_key(key)
let replicas = self.get_replicas(key, self.replication_factor)
// Write to primary
if primary.id == self.cluster.self_id {
self.local_data[key] = value
} else {
rpc::call(primary.address, primary.port, "put", { key, value })
}
// Replicate to other nodes
for replica in replicas {
spawn async {
rpc::call(replica.address, replica.port, "replicate", { key, value })
}
}
}
fn get_node_for_key(self, key: K) -> Node {
// Consistent hashing
let hash = hash(key)
let node_index = hash % self.cluster.nodes.len()
self.cluster.nodes[node_index]
}
}
// RPC framework
println("\n=== RPC Framework ===")
struct RpcServer {
handlers: map<string, fn(any) -> any>
}
impl RpcServer {
fn register(mut self, method: string, handler: fn(any) -> any) {
self.handlers[method] = handler
}
fn start(self, port: int) {
tcp::listen(port, |connection| {
let request = connection.read_json()
match self.handlers.get(request.method) {
Some(handler) => {
let result = handler(request.params)
connection.write_json({
id: request.id,
result: result
})
},
None => {
connection.write_json({
id: request.id,
error: f"Unknown method: {request.method}"
})
}
}
})
}
}
let server = RpcServer { handlers: {} }
server.register("add", |params| {
params.a + params.b
})
server.register("multiply", |params| {
params.x * params.y
})
server.start(8080)
// Consensus protocols
println("\n=== Consensus Protocols ===")
// Raft consensus
struct RaftNode {
id: string,
term: int,
state: NodeRole,
voted_for: Option<string>,
log: list<LogEntry>,
commit_index: int,
last_applied: int
}
impl RaftNode {
fn request_vote(self, candidate_id: string, term: int, last_log_index: int) -> VoteResponse {
if term < self.term {
return VoteResponse { term: self.term, granted: false }
}
if term > self.term {
self.term = term
self.voted_for = None
self.state = NodeRole::Follower
}
let vote_granted = match self.voted_for {
None => {
self.voted_for = Some(candidate_id)
true
},
Some(id) if id == candidate_id => true,
_ => false
}
VoteResponse { term: self.term, granted: vote_granted }
}
fn append_entries(self, term: int, leader_id: string, entries: list<LogEntry>) -> bool {
if term < self.term {
return false
}
self.term = term
self.state = NodeRole::Follower
// Append new entries
self.log.extend(entries)
true
}
fn start_election(mut self) {
self.state = NodeRole::Candidate
self.term += 1
self.voted_for = Some(self.id)
let votes = 1 // Vote for self
let majority = (self.cluster.nodes.len() / 2) + 1
// Request votes from other nodes
let futures = self.cluster.nodes.map(node => {
spawn async {
rpc::call(node, "request_vote", {
candidate_id: self.id,
term: self.term,
last_log_index: self.log.len() - 1
})
}
})
for future in futures {
match await future {
Ok(response) if response.granted => votes += 1,
_ => {}
}
if votes >= majority {
self.become_leader()
return
}
}
}
}
// MapReduce
println("\n=== MapReduce ===")
struct MapReduceJob<K1, V1, K2, V2, V3> {
map_fn: fn(K1, V1) -> list<(K2, V2)>,
reduce_fn: fn(K2, list<V2>) -> V3,
input_splits: list<(K1, V1)>
}
impl<K1, V1, K2, V2, V3> MapReduceJob<K1, V1, K2, V2, V3> {
fn run(self, cluster: Cluster) -> map<K2, V3> {
// Map phase - distribute to workers
let map_tasks = self.input_splits.chunks(cluster.nodes.len())
let map_results = []
for (node, chunk) in cluster.nodes.zip(map_tasks) {
let future = spawn async {
rpc::call(node, "map_task", {
mapper: self.map_fn,
data: chunk
})
}
map_results.append(future)
}
// Collect map results
let intermediate = {}
for future in map_results {
let results = await future
for (key, value) in results {
if key not in intermediate {
intermediate[key] = []
}
intermediate[key].append(value)
}
}
// Shuffle and sort
let sorted_keys = intermediate.keys().sorted()
// Reduce phase
let reduce_results = {}
for key in sorted_keys {
let values = intermediate[key]
reduce_results[key] = self.reduce_fn(key, values)
}
reduce_results
}
}
// Word count example
let word_count = MapReduceJob {
map_fn: |_, text| {
text.split(" ").map(word => (word, 1))
},
reduce_fn: |word, counts| {
counts.sum()
},
input_splits: documents
}
let results = word_count.run(cluster)
// Distributed transactions
println("\n=== Distributed Transactions ===")
struct TwoPhaseCommit {
coordinator: Node,
participants: list<Node>
}
impl TwoPhaseCommit {
fn execute(self, transaction: Transaction) -> Result<(), Error> {
// Phase 1: Prepare
let prepare_votes = self.participants.map(node => {
spawn async {
rpc::call(node, "prepare", transaction)
}
})
let all_prepared = prepare_votes.all(future => {
match await future {
Ok(vote) => vote == "YES",
Err(_) => false
}
})
if !all_prepared {
// Abort transaction
for node in self.participants {
spawn async {
rpc::call(node, "abort", transaction.id)
}
}
return Err(Error::TransactionAborted)
}
// Phase 2: Commit
for node in self.participants {
spawn async {
rpc::call(node, "commit", transaction.id)
}
}
Ok(())
}
}
// Service mesh
println("\n=== Service Mesh ===")
struct ServiceMesh {
services: map<string, list<ServiceInstance>>,
load_balancer: LoadBalancer,
circuit_breakers: map<string, CircuitBreaker>
}
impl ServiceMesh {
fn call(self, service: string, method: string, params: any) -> Result<any, Error> {
let breaker = self.circuit_breakers[service]
breaker.call(|| {
let instance = self.load_balancer.select(self.services[service])
let result = rpc::call(instance, method, params)
.with_timeout(5000)
.with_retry(3)
// Update metrics
metrics::increment(f"{service}.{method}.calls")
metrics::histogram(f"{service}.{method}.latency", elapsed_ms)
result
})
}
}
// Distributed caching
println("\n=== Distributed Cache ===")
struct DistributedCache {
local_cache: LRUCache,
cluster: Cluster,
consistency: CacheConsistency
}
enum CacheConsistency {
Strong, // All replicas must acknowledge
Eventual, // Write to local, replicate async
Quorum // Majority must acknowledge
}
impl DistributedCache {
fn get(self, key: string) -> Option<any> {
// Check local cache first
if let Some(value) = self.local_cache.get(key) {
return Some(value)
}
// Check remote nodes
let node = self.consistent_hash(key)
match rpc::call(node, "cache_get", key) {
Ok(value) => {
self.local_cache.put(key, value)
Some(value)
},
Err(_) => None
}
}
fn put(self, key: string, value: any, ttl: int = 3600) {
match self.consistency {
Strong => self.put_strong(key, value, ttl),
Eventual => self.put_eventual(key, value, ttl),
Quorum => self.put_quorum(key, value, ttl)
}
}
}
println("Distributed computing setup complete!")
}