agentic_payments/consensus/
mod.rs

1//! Byzantine Fault Tolerant consensus mechanisms
2//!
3//! Implements Practical Byzantine Fault Tolerance (PBFT) consensus protocol
4//! with reputation-based weighted voting and CRDT state synchronization.
5
6use crate::error::{Error, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9use std::fmt;
10use uuid::Uuid;
11
12pub mod bft;
13pub mod quorum;
14pub mod reputation;
15pub mod voting;
16
17pub use bft::{BftConfig, BftConsensus};
18pub use quorum::{Quorum, QuorumConfig};
19pub use reputation::{ReputationConfig, ReputationEntry, ReputationSystem};
20pub use voting::{VoteCollector, VotingConfig};
21
22use crate::agents::VerificationAgent;
23use crate::crypto::Signature;
24use ed25519_dalek::VerifyingKey;
25use std::sync::Arc;
26
27/// Unique identifier for consensus rounds
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct RoundId(pub u64);
30
31impl fmt::Display for RoundId {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(f, "Round({})", self.0)
34    }
35}
36
37impl RoundId {
38    pub fn next(self) -> Self {
39        RoundId(self.0 + 1)
40    }
41}
42
43/// Consensus authority identifier
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct AuthorityId(pub String);
46
47impl fmt::Display for AuthorityId {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        write!(f, "{}", self.0)
50    }
51}
52
53impl From<String> for AuthorityId {
54    fn from(s: String) -> Self {
55        AuthorityId(s)
56    }
57}
58
59impl From<&str> for AuthorityId {
60    fn from(s: &str) -> Self {
61        AuthorityId(s.to_string())
62    }
63}
64
65impl From<Uuid> for AuthorityId {
66    fn from(uuid: Uuid) -> Self {
67        AuthorityId(uuid.to_string())
68    }
69}
70
71/// Vote value for consensus
72#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
73pub struct VoteValue(pub Vec<u8>);
74
75impl VoteValue {
76    pub fn new(data: Vec<u8>) -> Self {
77        VoteValue(data)
78    }
79
80    pub fn from_string(s: &str) -> Self {
81        VoteValue(s.as_bytes().to_vec())
82    }
83
84    pub fn as_bytes(&self) -> &[u8] {
85        &self.0
86    }
87}
88
89/// Individual vote in consensus
90#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
91pub struct Vote {
92    pub round_id: RoundId,
93    pub authority: AuthorityId,
94    pub value: VoteValue,
95    pub weight: u64,
96    pub timestamp: u64,
97    pub signature: Vec<u8>,
98}
99
100impl Vote {
101    pub fn new(round_id: RoundId, authority: AuthorityId, value: VoteValue, weight: u64) -> Self {
102        Vote {
103            round_id,
104            authority,
105            value,
106            weight,
107            timestamp: std::time::SystemTime::now()
108                .duration_since(std::time::UNIX_EPOCH)
109                .unwrap()
110                .as_secs(),
111            signature: Vec::new(),
112        }
113    }
114
115    pub fn with_signature(mut self, signature: Vec<u8>) -> Self {
116        self.signature = signature;
117        self
118    }
119}
120
121/// Authority information
122#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
123pub struct Authority {
124    pub id: AuthorityId,
125    pub weight: u64,
126    pub reputation: f64,
127    pub is_byzantine: bool,
128}
129
130impl Authority {
131    pub fn new(id: AuthorityId, weight: u64) -> Self {
132        Authority {
133            id,
134            weight,
135            reputation: 1.0,
136            is_byzantine: false,
137        }
138    }
139
140    pub fn with_reputation(mut self, reputation: f64) -> Self {
141        self.reputation = reputation;
142        self
143    }
144}
145
146/// Consensus phase in PBFT protocol
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
148pub enum ConsensusPhase {
149    Idle,
150    PrePrepare,
151    Prepare,
152    Commit,
153    Decided,
154    ViewChange,
155}
156
157/// BFT Consensus result
158#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
159pub struct BftConsensusResult {
160    pub round_id: RoundId,
161    pub value: VoteValue,
162    pub total_weight: u64,
163    pub participating_authorities: HashSet<AuthorityId>,
164    pub phase: ConsensusPhase,
165}
166
167/// Legacy consensus result (for backward compatibility)
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct ConsensusResult {
170    /// Whether consensus was reached
171    pub reached: bool,
172    /// Number of votes in favor
173    pub votes_for: usize,
174    /// Total number of votes
175    pub total_votes: usize,
176    /// Required threshold
177    pub threshold: f64,
178    /// Participating agent IDs
179    pub agents: Vec<Uuid>,
180}
181
182impl ConsensusResult {
183    /// Check if consensus was reached
184    pub fn is_valid(&self) -> bool {
185        self.reached
186    }
187
188    /// Get the consensus percentage
189    pub fn percentage(&self) -> f64 {
190        if self.total_votes == 0 {
191            0.0
192        } else {
193            self.votes_for as f64 / self.total_votes as f64
194        }
195    }
196}
197
198/// Consensus trait for different consensus implementations
199pub trait Consensus: Send + Sync {
200    /// Submit a vote for the current round
201    fn submit_vote(&mut self, vote: Vote) -> Result<()>;
202
203    /// Check if consensus has been reached
204    fn has_consensus(&self) -> bool;
205
206    /// Get the consensus result if reached
207    fn get_result(&self) -> Option<BftConsensusResult>;
208
209    /// Get current consensus phase
210    fn get_phase(&self) -> ConsensusPhase;
211
212    /// Start a new consensus round
213    fn start_round(&mut self, round_id: RoundId, value: VoteValue) -> Result<()>;
214
215    /// Get current round ID
216    fn current_round(&self) -> RoundId;
217
218    /// Get participating authorities
219    fn authorities(&self) -> Vec<Authority>;
220
221    /// Handle timeout for current round
222    fn handle_timeout(&mut self) -> Result<()>;
223}
224
225/// Vote from an agent
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct AgentVote {
228    /// Agent ID
229    pub agent_id: Uuid,
230    /// Vote result (true = valid, false = invalid)
231    pub vote: bool,
232    /// Optional reasoning/error message
233    pub message: Option<String>,
234}
235
236impl AgentVote {
237    /// Create a positive vote
238    pub fn approve(agent_id: Uuid) -> Self {
239        Self {
240            agent_id,
241            vote: true,
242            message: None,
243        }
244    }
245
246    /// Create a negative vote
247    pub fn reject(agent_id: Uuid, message: String) -> Self {
248        Self {
249            agent_id,
250            vote: false,
251            message: Some(message),
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_consensus_result() {
262        let result = ConsensusResult {
263            reached: true,
264            votes_for: 7,
265            total_votes: 10,
266            threshold: 0.67,
267            agents: vec![Uuid::new_v4(); 10],
268        };
269
270        assert!(result.is_valid());
271        assert_eq!(result.percentage(), 0.7);
272    }
273
274    #[test]
275    fn test_agent_vote() {
276        let approve = AgentVote::approve(Uuid::new_v4());
277        assert!(approve.vote);
278        assert!(approve.message.is_none());
279
280        let reject = AgentVote::reject(Uuid::new_v4(), "Invalid signature".to_string());
281        assert!(!reject.vote);
282        assert!(reject.message.is_some());
283    }
284}
285
286/// Consensus configuration
287#[derive(Debug, Clone)]
288pub struct ConsensusConfig {
289    /// Threshold for consensus (0.0-1.0)
290    pub threshold: f64,
291    /// Timeout in milliseconds
292    pub timeout_ms: u64,
293}
294
295impl ConsensusConfig {
296    /// Create a new consensus configuration
297    pub fn new(threshold: f64, timeout_ms: u64) -> Result<Self> {
298        if !(0.0..=1.0).contains(&threshold) {
299            return Err(Error::config("Consensus threshold must be between 0.0 and 1.0"));
300        }
301        Ok(Self {
302            threshold,
303            timeout_ms,
304        })
305    }
306}
307
308/// Consensus engine for multi-agent verification
309pub struct ConsensusEngine {
310    config: ConsensusConfig,
311}
312
313impl ConsensusEngine {
314    /// Create a new consensus engine
315    pub fn new(config: ConsensusConfig) -> Self {
316        Self { config }
317    }
318
319    /// Verify with consensus across multiple agents
320    pub async fn verify_with_consensus(
321        &self,
322        agents: Vec<Arc<dyn VerificationAgent>>,
323        signature: Signature,
324        message: &[u8],
325        public_key: &VerifyingKey,
326    ) -> Result<ConsensusResult> {
327        if agents.is_empty() {
328            return Err(Error::agent_pool("No agents available for consensus"));
329        }
330
331        let total_votes = agents.len();
332        let mut votes_for = 0;
333        let mut agent_ids = Vec::with_capacity(total_votes);
334
335        // Collect votes from all agents
336        let sig_bytes = signature.to_bytes();
337        for agent in &agents {
338            agent_ids.push(agent.id());
339            match agent.verify(message, &sig_bytes, public_key).await {
340                Ok(true) => votes_for += 1,
341                Ok(false) => {
342                    tracing::debug!("Agent {} voted invalid", agent.id());
343                }
344                Err(e) => {
345                    tracing::warn!("Agent {} verification error: {}", agent.id(), e);
346                }
347            }
348        }
349
350        let reached = (votes_for as f64 / total_votes as f64) >= self.config.threshold;
351
352        Ok(ConsensusResult {
353            reached,
354            votes_for,
355            total_votes,
356            threshold: self.config.threshold,
357            agents: agent_ids,
358        })
359    }
360}