1use crate::error::Error;
7use crate::identity::Did;
8use blake3::Hasher;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::sync::RwLock;
13
14pub type QueryId = [u8; 32];
16
17pub type ResponseId = [u8; 32];
19
20#[derive(Debug, Clone)]
22pub struct Query {
23 pub id: QueryId,
24 pub content: String,
25 pub submitter: Did,
26 pub timestamp: u64,
27}
28
29impl Query {
30 pub fn new(content: String, submitter: Did) -> Self {
32 let timestamp = SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .unwrap()
35 .as_secs();
36 let mut hasher = Hasher::new();
37 hasher.update(content.as_bytes());
38 hasher.update(submitter.to_string().as_bytes());
39 hasher.update(×tamp.to_le_bytes());
40 let hash: [u8; 32] = *hasher.finalize().as_bytes();
41 Self {
42 id: hash,
43 content,
44 submitter,
45 timestamp,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct Response {
53 pub id: ResponseId,
54 pub query_id: QueryId,
55 pub content: String,
56 pub responder: Did,
57 pub timestamp: u64,
58}
59
60impl Response {
61 pub fn new(query_id: QueryId, content: String, responder: Did) -> Self {
63 let timestamp = SystemTime::now()
64 .duration_since(UNIX_EPOCH)
65 .unwrap()
66 .as_secs();
67 let mut hasher = Hasher::new();
68 hasher.update(&query_id);
69 hasher.update(content.as_bytes());
70 hasher.update(responder.to_string().as_bytes());
71 hasher.update(×tamp.to_le_bytes());
72 let hash: [u8; 32] = *hasher.finalize().as_bytes();
73 Self {
74 id: hash,
75 query_id,
76 content,
77 responder,
78 timestamp,
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct ConsensusResult {
86 pub response: Response,
88 pub votes: usize,
90 pub total_voters: usize,
92 pub confidence: f64,
94}
95
96struct QueryState {
98 query: Query,
99 responses: HashMap<ResponseId, Response>,
100 votes: HashMap<ResponseId, Vec<Did>>,
101 finalized: Option<ResponseId>,
102}
103
104pub struct AgentConsensusVoting {
108 queries: Arc<RwLock<HashMap<QueryId, QueryState>>>,
109 threshold: f64,
111 min_responses: usize,
113 min_votes: usize,
115}
116
117impl AgentConsensusVoting {
118 pub fn new(threshold: f64, min_responses: usize, min_votes: usize) -> Self {
125 Self {
126 queries: Arc::new(RwLock::new(HashMap::new())),
127 threshold: threshold.clamp(0.0, 1.0),
128 min_responses,
129 min_votes,
130 }
131 }
132
133 pub async fn submit_query(&self, query: Query) -> QueryId {
135 let mut queries = self.queries.write().await;
136 let id = query.id;
137 queries.insert(
138 id,
139 QueryState {
140 query,
141 responses: HashMap::new(),
142 votes: HashMap::new(),
143 finalized: None,
144 },
145 );
146 id
147 }
148
149 pub async fn submit_response(&self, response: Response) -> Result<ResponseId, Error> {
151 let mut queries = self.queries.write().await;
152 let state = queries
153 .get_mut(&response.query_id)
154 .ok_or_else(|| Error::Consensus("Query not found".into()))?;
155
156 if state.finalized.is_some() {
157 return Err(Error::Consensus("Query already finalized".into()));
158 }
159
160 let id = response.id;
161 state.responses.insert(id, response);
162 state.votes.insert(id, Vec::new());
163 Ok(id)
164 }
165
166 pub async fn vote(
170 &self,
171 query_id: QueryId,
172 response_id: ResponseId,
173 voter: Did,
174 ) -> Result<(), Error> {
175 let mut queries = self.queries.write().await;
176 let state = queries
177 .get_mut(&query_id)
178 .ok_or_else(|| Error::Consensus("Query not found".into()))?;
179
180 if state.finalized.is_some() {
181 return Err(Error::Consensus("Query already finalized".into()));
182 }
183
184 if !state.responses.contains_key(&response_id) {
185 return Err(Error::Consensus("Response not found".into()));
186 }
187
188 for votes in state.votes.values() {
190 if votes.iter().any(|v| v == &voter) {
191 return Err(Error::Consensus("Already voted on this query".into()));
192 }
193 }
194
195 state.votes.get_mut(&response_id).unwrap().push(voter);
196
197 self.check_consensus(state);
199 Ok(())
200 }
201
202 fn check_consensus(&self, state: &mut QueryState) {
204 if state.finalized.is_some() {
205 return;
206 }
207
208 if state.responses.len() < self.min_responses {
210 return;
211 }
212
213 let total_votes: usize = state.votes.values().map(|v| v.len()).sum();
215 if total_votes < self.min_votes {
216 return;
217 }
218
219 let mut best: Option<(ResponseId, usize)> = None;
221 for (response_id, voters) in &state.votes {
222 let vote_count = voters.len();
223 let confidence = vote_count as f64 / total_votes as f64;
224
225 if confidence >= self.threshold {
226 match best {
227 None => best = Some((*response_id, vote_count)),
228 Some((_, best_count)) if vote_count > best_count => {
229 best = Some((*response_id, vote_count))
230 }
231 _ => {}
232 }
233 }
234 }
235
236 if let Some((response_id, _)) = best {
237 state.finalized = Some(response_id);
238 }
239 }
240
241 pub async fn get_result(&self, query_id: QueryId) -> Option<ConsensusResult> {
243 let queries = self.queries.read().await;
244 let state = queries.get(&query_id)?;
245 let winning_id = state.finalized?;
246 let response = state.responses.get(&winning_id)?.clone();
247 let votes = state.votes.get(&winning_id)?.len();
248 let total_voters: usize = state.votes.values().map(|v| v.len()).sum();
249
250 Some(ConsensusResult {
251 response,
252 votes,
253 total_voters,
254 confidence: if total_voters > 0 {
255 votes as f64 / total_voters as f64
256 } else {
257 0.0
258 },
259 })
260 }
261
262 pub async fn is_finalized(&self, query_id: QueryId) -> bool {
264 let queries = self.queries.read().await;
265 queries
266 .get(&query_id)
267 .map(|s| s.finalized.is_some())
268 .unwrap_or(false)
269 }
270
271 pub async fn get_responses(&self, query_id: QueryId) -> Option<Vec<Response>> {
273 let queries = self.queries.read().await;
274 let state = queries.get(&query_id)?;
275 Some(state.responses.values().cloned().collect())
276 }
277
278 pub async fn get_vote_counts(&self, query_id: QueryId) -> Option<HashMap<ResponseId, usize>> {
280 let queries = self.queries.read().await;
281 let state = queries.get(&query_id)?;
282 Some(
283 state
284 .votes
285 .iter()
286 .map(|(id, voters)| (*id, voters.len()))
287 .collect(),
288 )
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use crate::identity::DidMethod;
296
297 fn make_did(name: &str) -> Did {
298 Did {
299 method: DidMethod::Lux,
300 id: format!("z6Mk{}", name),
301 }
302 }
303
304 #[tokio::test]
305 async fn test_submit_query() {
306 let consensus = AgentConsensusVoting::new(0.5, 1, 1);
307 let query = Query::new("What is 2+2?".into(), make_did("Alice"));
308 let id = consensus.submit_query(query.clone()).await;
309 assert_eq!(id, query.id);
310 }
311
312 #[tokio::test]
313 async fn test_submit_response() {
314 let consensus = AgentConsensusVoting::new(0.5, 1, 1);
315 let query = Query::new("What is 2+2?".into(), make_did("Alice"));
316 let query_id = consensus.submit_query(query).await;
317
318 let response = Response::new(query_id, "4".into(), make_did("Bob"));
319 let result = consensus.submit_response(response.clone()).await;
320 assert!(result.is_ok());
321 }
322
323 #[tokio::test]
324 async fn test_vote_and_consensus() {
325 let consensus = AgentConsensusVoting::new(0.5, 1, 2);
326 let query = Query::new("What is 2+2?".into(), make_did("Alice"));
327 let query_id = consensus.submit_query(query).await;
328
329 let response = Response::new(query_id, "4".into(), make_did("Bob"));
330 let response_id = consensus.submit_response(response).await.unwrap();
331
332 consensus
334 .vote(query_id, response_id, make_did("Voter1"))
335 .await
336 .unwrap();
337 assert!(!consensus.is_finalized(query_id).await);
338
339 consensus
341 .vote(query_id, response_id, make_did("Voter2"))
342 .await
343 .unwrap();
344 assert!(consensus.is_finalized(query_id).await);
345
346 let result = consensus.get_result(query_id).await.unwrap();
347 assert_eq!(result.response.content, "4");
348 assert_eq!(result.votes, 2);
349 assert_eq!(result.confidence, 1.0);
350 }
351
352 #[tokio::test]
353 async fn test_double_vote_prevented() {
354 let consensus = AgentConsensusVoting::new(0.5, 1, 1);
355 let query = Query::new("Test".into(), make_did("Alice"));
356 let query_id = consensus.submit_query(query).await;
357
358 let response = Response::new(query_id, "Answer".into(), make_did("Bob"));
359 let response_id = consensus.submit_response(response).await.unwrap();
360
361 let voter = make_did("Voter1");
362 consensus.vote(query_id, response_id, voter.clone()).await.unwrap();
363
364 let result = consensus.vote(query_id, response_id, voter).await;
366 assert!(result.is_err());
367 }
368
369 #[tokio::test]
370 async fn test_majority_wins() {
371 let consensus = AgentConsensusVoting::new(0.5, 2, 3);
372 let query = Query::new("Best language?".into(), make_did("Alice"));
373 let query_id = consensus.submit_query(query).await;
374
375 let r1 = Response::new(query_id, "Rust".into(), make_did("Bob"));
376 let r1_id = consensus.submit_response(r1).await.unwrap();
377
378 let r2 = Response::new(query_id, "Python".into(), make_did("Carol"));
379 let r2_id = consensus.submit_response(r2).await.unwrap();
380
381 consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
383 consensus.vote(query_id, r1_id, make_did("V2")).await.unwrap();
384 consensus.vote(query_id, r2_id, make_did("V3")).await.unwrap();
385
386 assert!(consensus.is_finalized(query_id).await);
387 let result = consensus.get_result(query_id).await.unwrap();
388 assert_eq!(result.response.content, "Rust");
389 assert_eq!(result.votes, 2);
390 assert_eq!(result.total_voters, 3);
391 }
392
393 #[tokio::test]
394 async fn test_no_consensus_below_threshold() {
395 let consensus = AgentConsensusVoting::new(0.6, 2, 3);
396 let query = Query::new("Test".into(), make_did("Alice"));
397 let query_id = consensus.submit_query(query).await;
398
399 let r1 = Response::new(query_id, "A".into(), make_did("Bob"));
400 let r1_id = consensus.submit_response(r1).await.unwrap();
401
402 let r2 = Response::new(query_id, "B".into(), make_did("Carol"));
403 let r2_id = consensus.submit_response(r2).await.unwrap();
404
405 let r3 = Response::new(query_id, "C".into(), make_did("Dave"));
407 let r3_id = consensus.submit_response(r3).await.unwrap();
408
409 consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
410 consensus.vote(query_id, r2_id, make_did("V2")).await.unwrap();
411 consensus.vote(query_id, r3_id, make_did("V3")).await.unwrap();
412
413 assert!(!consensus.is_finalized(query_id).await);
415 }
416
417 #[tokio::test]
418 async fn test_get_vote_counts() {
419 let consensus = AgentConsensusVoting::new(0.5, 1, 1);
420 let query = Query::new("Test".into(), make_did("Alice"));
421 let query_id = consensus.submit_query(query).await;
422
423 let r1 = Response::new(query_id, "A".into(), make_did("Bob"));
424 let r1_id = consensus.submit_response(r1).await.unwrap();
425
426 consensus.vote(query_id, r1_id, make_did("V1")).await.unwrap();
427
428 let counts = consensus.get_vote_counts(query_id).await.unwrap();
429 assert_eq!(counts.get(&r1_id), Some(&1));
430 }
431}