Skip to main content

zap/
agent_consensus.rs

1//! Agentic consensus for response voting
2//!
3//! Agents vote on responses to queries. No trust needed - majority wins.
4//! As long as majority are honest, you get correct results.
5
6use crate::error::Error;
7use crate::identity::Did;
8use blake3::Hasher;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::sync::RwLock;
13
14/// Query ID (32-byte hash)
15pub type QueryId = [u8; 32];
16
17/// Response ID (32-byte hash)
18pub type ResponseId = [u8; 32];
19
20/// A query submitted to the agent network
21#[derive(Debug, Clone)]
22pub struct Query {
23    pub id: QueryId,
24    pub content: String,
25    pub submitter: Did,
26    pub timestamp: u64,
27}
28
29impl Query {
30    /// Create a new query
31    pub fn new(content: String, submitter: Did) -> Self {
32        let timestamp = SystemTime::now()
33            .duration_since(UNIX_EPOCH)
34            .unwrap()
35            .as_secs();
36        let mut hasher = Hasher::new();
37        hasher.update(content.as_bytes());
38        hasher.update(submitter.to_string().as_bytes());
39        hasher.update(&timestamp.to_le_bytes());
40        let hash: [u8; 32] = *hasher.finalize().as_bytes();
41        Self {
42            id: hash,
43            content,
44            submitter,
45            timestamp,
46        }
47    }
48}
49
50/// A response to a query
51#[derive(Debug, Clone)]
52pub struct Response {
53    pub id: ResponseId,
54    pub query_id: QueryId,
55    pub content: String,
56    pub responder: Did,
57    pub timestamp: u64,
58}
59
60impl Response {
61    /// Create a new response
62    pub fn new(query_id: QueryId, content: String, responder: Did) -> Self {
63        let timestamp = SystemTime::now()
64            .duration_since(UNIX_EPOCH)
65            .unwrap()
66            .as_secs();
67        let mut hasher = Hasher::new();
68        hasher.update(&query_id);
69        hasher.update(content.as_bytes());
70        hasher.update(responder.to_string().as_bytes());
71        hasher.update(&timestamp.to_le_bytes());
72        let hash: [u8; 32] = *hasher.finalize().as_bytes();
73        Self {
74            id: hash,
75            query_id,
76            content,
77            responder,
78            timestamp,
79        }
80    }
81}
82
83/// Result of consensus voting
84#[derive(Debug, Clone)]
85pub struct ConsensusResult {
86    /// The winning response
87    pub response: Response,
88    /// Number of votes for this response
89    pub votes: usize,
90    /// Total number of voters
91    pub total_voters: usize,
92    /// Confidence (votes / total)
93    pub confidence: f64,
94}
95
96/// Internal state for a query
97struct QueryState {
98    query: Query,
99    responses: HashMap<ResponseId, Response>,
100    votes: HashMap<ResponseId, Vec<Did>>,
101    finalized: Option<ResponseId>,
102}
103
104/// Agentic consensus for response voting
105///
106/// Agents submit responses and vote. Majority wins.
107pub struct AgentConsensusVoting {
108    queries: Arc<RwLock<HashMap<QueryId, QueryState>>>,
109    /// Threshold for consensus (e.g., 0.5 for simple majority)
110    threshold: f64,
111    /// Minimum responses before consensus can be reached
112    min_responses: usize,
113    /// Minimum votes before consensus can be reached
114    min_votes: usize,
115}
116
117impl AgentConsensusVoting {
118    /// Create new consensus instance
119    ///
120    /// # Arguments
121    /// * `threshold` - Fraction of votes needed (0.5 = majority)
122    /// * `min_responses` - Minimum responses before checking consensus
123    /// * `min_votes` - Minimum votes before checking consensus
124    pub fn new(threshold: f64, min_responses: usize, min_votes: usize) -> Self {
125        Self {
126            queries: Arc::new(RwLock::new(HashMap::new())),
127            threshold: threshold.clamp(0.0, 1.0),
128            min_responses,
129            min_votes,
130        }
131    }
132
133    /// Submit a new query
134    pub async fn submit_query(&self, query: Query) -> QueryId {
135        let mut queries = self.queries.write().await;
136        let id = query.id;
137        queries.insert(
138            id,
139            QueryState {
140                query,
141                responses: HashMap::new(),
142                votes: HashMap::new(),
143                finalized: None,
144            },
145        );
146        id
147    }
148
149    /// Submit a response to a query
150    pub async fn submit_response(&self, response: Response) -> Result<ResponseId, Error> {
151        let mut queries = self.queries.write().await;
152        let state = queries
153            .get_mut(&response.query_id)
154            .ok_or_else(|| Error::Consensus("Query not found".into()))?;
155
156        if state.finalized.is_some() {
157            return Err(Error::Consensus("Query already finalized".into()));
158        }
159
160        let id = response.id;
161        state.responses.insert(id, response);
162        state.votes.insert(id, Vec::new());
163        Ok(id)
164    }
165
166    /// Vote for a response
167    ///
168    /// Each agent can only vote once per query (across all responses)
169    pub async fn vote(
170        &self,
171        query_id: QueryId,
172        response_id: ResponseId,
173        voter: Did,
174    ) -> Result<(), Error> {
175        let mut queries = self.queries.write().await;
176        let state = queries
177            .get_mut(&query_id)
178            .ok_or_else(|| Error::Consensus("Query not found".into()))?;
179
180        if state.finalized.is_some() {
181            return Err(Error::Consensus("Query already finalized".into()));
182        }
183
184        if !state.responses.contains_key(&response_id) {
185            return Err(Error::Consensus("Response not found".into()));
186        }
187
188        // Check if voter already voted
189        for votes in state.votes.values() {
190            if votes.iter().any(|v| v == &voter) {
191                return Err(Error::Consensus("Already voted on this query".into()));
192            }
193        }
194
195        state.votes.get_mut(&response_id).unwrap().push(voter);
196
197        // Check if consensus reached
198        self.check_consensus(state);
199        Ok(())
200    }
201
202    /// Check if consensus has been reached
203    fn check_consensus(&self, state: &mut QueryState) {
204        if state.finalized.is_some() {
205            return;
206        }
207
208        // Need minimum responses
209        if state.responses.len() < self.min_responses {
210            return;
211        }
212
213        // Count total votes
214        let total_votes: usize = state.votes.values().map(|v| v.len()).sum();
215        if total_votes < self.min_votes {
216            return;
217        }
218
219        // Find response with most votes that meets threshold
220        let mut best: Option<(ResponseId, usize)> = None;
221        for (response_id, voters) in &state.votes {
222            let vote_count = voters.len();
223            let confidence = vote_count as f64 / total_votes as f64;
224
225            if confidence >= self.threshold {
226                match best {
227                    None => best = Some((*response_id, vote_count)),
228                    Some((_, best_count)) if vote_count > best_count => {
229                        best = Some((*response_id, vote_count))
230                    }
231                    _ => {}
232                }
233            }
234        }
235
236        if let Some((response_id, _)) = best {
237            state.finalized = Some(response_id);
238        }
239    }
240
241    /// Get the consensus result for a query
242    pub async fn get_result(&self, query_id: QueryId) -> Option<ConsensusResult> {
243        let queries = self.queries.read().await;
244        let state = queries.get(&query_id)?;
245        let winning_id = state.finalized?;
246        let response = state.responses.get(&winning_id)?.clone();
247        let votes = state.votes.get(&winning_id)?.len();
248        let total_voters: usize = state.votes.values().map(|v| v.len()).sum();
249
250        Some(ConsensusResult {
251            response,
252            votes,
253            total_voters,
254            confidence: if total_voters > 0 {
255                votes as f64 / total_voters as f64
256            } else {
257                0.0
258            },
259        })
260    }
261
262    /// Check if a query has reached consensus
263    pub async fn is_finalized(&self, query_id: QueryId) -> bool {
264        let queries = self.queries.read().await;
265        queries
266            .get(&query_id)
267            .map(|s| s.finalized.is_some())
268            .unwrap_or(false)
269    }
270
271    /// Get all responses for a query
272    pub async fn get_responses(&self, query_id: QueryId) -> Option<Vec<Response>> {
273        let queries = self.queries.read().await;
274        let state = queries.get(&query_id)?;
275        Some(state.responses.values().cloned().collect())
276    }
277
278    /// Get vote counts for a query
279    pub async fn get_vote_counts(&self, query_id: QueryId) -> Option<HashMap<ResponseId, usize>> {
280        let queries = self.queries.read().await;
281        let state = queries.get(&query_id)?;
282        Some(
283            state
284                .votes
285                .iter()
286                .map(|(id, voters)| (*id, voters.len()))
287                .collect(),
288        )
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::identity::DidMethod;
296
297    fn make_did(name: &str) -> Did {
298        Did {
299            method: DidMethod::Lux,
300            id: format!("z6Mk{}", name),
301        }
302    }
303
304    #[tokio::test]
305    async fn test_submit_query() {
306        let consensus = AgentConsensusVoting::new(0.5, 1, 1);
307        let query = Query::new("What is 2+2?".into(), make_did("Alice"));
308        let id = consensus.submit_query(query.clone()).await;
309        assert_eq!(id, query.id);
310    }
311
312    #[tokio::test]
313    async fn test_submit_response() {
314        let consensus = AgentConsensusVoting::new(0.5, 1, 1);
315        let query = Query::new("What is 2+2?".into(), make_did("Alice"));
316        let query_id = consensus.submit_query(query).await;
317
318        let response = Response::new(query_id, "4".into(), make_did("Bob"));
319        let result = consensus.submit_response(response.clone()).await;
320        assert!(result.is_ok());
321    }
322
323    #[tokio::test]
324    async fn test_vote_and_consensus() {
325        let consensus = AgentConsensusVoting::new(0.5, 1, 2);
326        let query = Query::new("What is 2+2?".into(), make_did("Alice"));
327        let query_id = consensus.submit_query(query).await;
328
329        let response = Response::new(query_id, "4".into(), make_did("Bob"));
330        let response_id = consensus.submit_response(response).await.unwrap();
331
332        // First vote - not enough yet
333        consensus
334            .vote(query_id, response_id, make_did("Voter1"))
335            .await
336            .unwrap();
337        assert!(!consensus.is_finalized(query_id).await);
338
339        // Second vote - should reach consensus
340        consensus
341            .vote(query_id, response_id, make_did("Voter2"))
342            .await
343            .unwrap();
344        assert!(consensus.is_finalized(query_id).await);
345
346        let result = consensus.get_result(query_id).await.unwrap();
347        assert_eq!(result.response.content, "4");
348        assert_eq!(result.votes, 2);
349        assert_eq!(result.confidence, 1.0);
350    }
351
352    #[tokio::test]
353    async fn test_double_vote_prevented() {
354        let consensus = AgentConsensusVoting::new(0.5, 1, 1);
355        let query = Query::new("Test".into(), make_did("Alice"));
356        let query_id = consensus.submit_query(query).await;
357
358        let response = Response::new(query_id, "Answer".into(), make_did("Bob"));
359        let response_id = consensus.submit_response(response).await.unwrap();
360
361        let voter = make_did("Voter1");
362        consensus.vote(query_id, response_id, voter.clone()).await.unwrap();
363
364        // Second vote from same voter should fail
365        let result = consensus.vote(query_id, response_id, voter).await;
366        assert!(result.is_err());
367    }
368
369    #[tokio::test]
370    async fn test_majority_wins() {
371        let consensus = AgentConsensusVoting::new(0.5, 2, 3);
372        let query = Query::new("Best language?".into(), make_did("Alice"));
373        let query_id = consensus.submit_query(query).await;
374
375        let r1 = Response::new(query_id, "Rust".into(), make_did("Bob"));
376        let r1_id = consensus.submit_response(r1).await.unwrap();
377
378        let r2 = Response::new(query_id, "Python".into(), make_did("Carol"));
379        let r2_id = consensus.submit_response(r2).await.unwrap();
380
381        // Vote: 2 for Rust, 1 for Python
382        consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
383        consensus.vote(query_id, r1_id, make_did("V2")).await.unwrap();
384        consensus.vote(query_id, r2_id, make_did("V3")).await.unwrap();
385
386        assert!(consensus.is_finalized(query_id).await);
387        let result = consensus.get_result(query_id).await.unwrap();
388        assert_eq!(result.response.content, "Rust");
389        assert_eq!(result.votes, 2);
390        assert_eq!(result.total_voters, 3);
391    }
392
393    #[tokio::test]
394    async fn test_no_consensus_below_threshold() {
395        let consensus = AgentConsensusVoting::new(0.6, 2, 3);
396        let query = Query::new("Test".into(), make_did("Alice"));
397        let query_id = consensus.submit_query(query).await;
398
399        let r1 = Response::new(query_id, "A".into(), make_did("Bob"));
400        let r1_id = consensus.submit_response(r1).await.unwrap();
401
402        let r2 = Response::new(query_id, "B".into(), make_did("Carol"));
403        let r2_id = consensus.submit_response(r2).await.unwrap();
404
405        // Split vote: 1-1-1 (none reaches 60%)
406        let r3 = Response::new(query_id, "C".into(), make_did("Dave"));
407        let r3_id = consensus.submit_response(r3).await.unwrap();
408
409        consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
410        consensus.vote(query_id, r2_id, make_did("V2")).await.unwrap();
411        consensus.vote(query_id, r3_id, make_did("V3")).await.unwrap();
412
413        // No consensus - 33% each, threshold is 60%
414        assert!(!consensus.is_finalized(query_id).await);
415    }
416
417    #[tokio::test]
418    async fn test_get_vote_counts() {
419        let consensus = AgentConsensusVoting::new(0.5, 1, 1);
420        let query = Query::new("Test".into(), make_did("Alice"));
421        let query_id = consensus.submit_query(query).await;
422
423        let r1 = Response::new(query_id, "A".into(), make_did("Bob"));
424        let r1_id = consensus.submit_response(r1).await.unwrap();
425
426        consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
427
428        let counts = consensus.get_vote_counts(query_id).await.unwrap();
429        assert_eq!(counts.get(&r1_id), Some(&1));
430    }
431}