use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Proposal {
pub id: String,
pub proposer: String,
pub data: serde_json::Value,
pub quorum: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Vote {
pub proposal_id: String,
pub voter: String,
pub approve: bool,
pub weight: f64,
}
#[derive(Debug, Clone)]
pub enum ConsensusResult {
Approved,
Rejected,
Pending {
approval_rate: f64,
votes_needed: usize,
},
}
#[derive(Debug, Clone)]
struct ProposalState {
proposal: Proposal,
votes: Vec<Vote>,
created_at: std::time::Instant,
}
impl ProposalState {
fn calculate_result(&self, total_agents: usize) -> ConsensusResult {
let total_weight: f64 = self.votes.iter().map(|v| v.weight).sum();
let approval_weight: f64 = self
.votes
.iter()
.filter(|v| v.approve)
.map(|v| v.weight)
.sum();
let approval_rate = if total_weight > 0.0 {
approval_weight / total_weight
} else {
0.0
};
let votes_received = self.votes.len();
let quorum_votes = (total_agents as f64 * self.proposal.quorum).ceil() as usize;
if votes_received >= quorum_votes {
if approval_rate >= 0.5 {
ConsensusResult::Approved
} else {
ConsensusResult::Rejected
}
} else {
ConsensusResult::Pending {
approval_rate,
votes_needed: quorum_votes - votes_received,
}
}
}
}
pub struct ConsensusEngine {
proposals: Arc<RwLock<HashMap<String, ProposalState>>>,
agents: Arc<RwLock<HashMap<String, AgentInfo>>>,
}
#[derive(Debug, Clone)]
struct AgentInfo {
id: String,
weight: f64,
}
impl ConsensusEngine {
pub fn new() -> Self {
Self {
proposals: Arc::new(RwLock::new(HashMap::new())),
agents: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_agent(&self, agent_id: String, weight: f64) {
let mut agents = self.agents.write().await;
agents.insert(
agent_id.clone(),
AgentInfo {
id: agent_id,
weight,
},
);
}
pub async fn submit_proposal(&self, proposal: Proposal) -> String {
let id = Uuid::new_v4().to_string();
let state = ProposalState {
proposal: Proposal {
id: id.clone(),
..proposal
},
votes: Vec::new(),
created_at: std::time::Instant::now(),
};
let mut proposals = self.proposals.write().await;
proposals.insert(id.clone(), state);
tracing::debug!("Proposal submitted: {}", id);
id
}
pub async fn vote(&self, vote: Vote) -> anyhow::Result<ConsensusResult> {
let mut proposals = self.proposals.write().await;
let state = proposals
.get_mut(&vote.proposal_id)
.ok_or_else(|| anyhow::anyhow!("Proposal not found"))?;
if state.votes.iter().any(|v| v.voter == vote.voter) {
return Err(anyhow::anyhow!("Agent already voted"));
}
state.votes.push(vote);
let agents = self.agents.read().await;
let result = state.calculate_result(agents.len());
Ok(result)
}
pub async fn get_result(&self, proposal_id: &str) -> Option<ConsensusResult> {
let proposals = self.proposals.read().await;
let agents = self.agents.read().await;
proposals
.get(proposal_id)
.map(|state| state.calculate_result(agents.len()))
}
pub async fn get_proposal(&self, proposal_id: &str) -> Option<Proposal> {
let proposals = self.proposals.read().await;
proposals.get(proposal_id).map(|s| s.proposal.clone())
}
pub async fn list_proposals(&self) -> Vec<Proposal> {
let proposals = self.proposals.read().await;
proposals.values().map(|s| s.proposal.clone()).collect()
}
pub async fn cleanup_old(&self, max_age: std::time::Duration) {
let mut proposals = self.proposals.write().await;
proposals.retain(|_, state| state.created_at.elapsed() < max_age);
}
pub async fn agent_count(&self) -> usize {
self.agents.read().await.len()
}
}
impl Default for ConsensusEngine {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_consensus_approval() {
let engine = ConsensusEngine::new();
engine.register_agent("agent1".to_string(), 1.0).await;
engine.register_agent("agent2".to_string(), 1.0).await;
engine.register_agent("agent3".to_string(), 1.0).await;
let proposal = Proposal {
id: String::new(),
proposer: "agent1".to_string(),
data: serde_json::json!({"action": "test"}),
quorum: 0.67, };
let proposal_id = engine.submit_proposal(proposal).await;
let result1 = engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent1".to_string(),
approve: true,
weight: 1.0,
})
.await
.unwrap();
assert!(matches!(result1, ConsensusResult::Pending { .. }));
let result2 = engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent2".to_string(),
approve: true,
weight: 1.0,
})
.await
.unwrap();
assert!(matches!(result2, ConsensusResult::Approved));
}
#[tokio::test]
async fn test_consensus_rejection() {
let engine = ConsensusEngine::new();
for i in 1..=3 {
engine
.register_agent(format!("agent{}", i), 1.0)
.await;
}
let proposal = Proposal {
id: String::new(),
proposer: "agent1".to_string(),
data: serde_json::json!({"action": "test"}),
quorum: 0.67,
};
let proposal_id = engine.submit_proposal(proposal).await;
engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent1".to_string(),
approve: true,
weight: 1.0,
})
.await
.unwrap();
engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent2".to_string(),
approve: false,
weight: 1.0,
})
.await
.unwrap();
let result = engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent3".to_string(),
approve: false,
weight: 1.0,
})
.await
.unwrap();
assert!(matches!(result, ConsensusResult::Rejected));
}
#[tokio::test]
async fn test_duplicate_vote() {
let engine = ConsensusEngine::new();
engine.register_agent("agent1".to_string(), 1.0).await;
let proposal = Proposal {
id: String::new(),
proposer: "agent1".to_string(),
data: serde_json::json!({}),
quorum: 0.5,
};
let proposal_id = engine.submit_proposal(proposal).await;
engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent1".to_string(),
approve: true,
weight: 1.0,
})
.await
.unwrap();
let result = engine
.vote(Vote {
proposal_id: proposal_id.clone(),
voter: "agent1".to_string(),
approve: false,
weight: 1.0,
})
.await;
assert!(result.is_err());
}
}