1use std::collections::{HashMap, HashSet};
2
3use json_patch::{patch, Patch};
4use log::warn;
5
6use crate::{
7 commons::{
8 channel::SenderEnd,
9 models::{
10 approval::UniqueApproval,
11 evaluation::{EvaluationRequest, SubjectContext},
12 event::Event,
13 event::Metadata,
14 state::{generate_subject_id, Subject},
15 validation::ValidationProof,
16 HashId,
17 },
18 self_signature_manager::SelfSignatureManager,
19 },
20 crypto::KeyPair,
21 governance::{stage::ValidationStage, GovernanceAPI, GovernanceInterface},
22 identifier::{Derivable, DigestIdentifier, KeyIdentifier},
23 ledger::{LedgerCommand, LedgerResponse},
24 message::{MessageConfig, MessageTaskCommand},
25 protocol::protocol_message_manager::TapleMessages,
26 request::StartRequest,
27 request::TapleRequest,
28 signature::{Signature, Signed, UniqueSignature},
29 utils::message::{
30 approval::create_approval_request, evaluator::create_evaluator_request,
31 ledger::request_gov_event, validation::create_validator_request,
32 },
33 validation::ValidationEvent,
34 ApprovalRequest, ApprovalResponse, DatabaseCollection, DigestDerivator, EvaluationResponse,
35 EventRequest, Notification, ValueWrapper,
36};
37use std::hash::Hash;
38
39use super::errors::EventError;
40use crate::database::DB;
41
42const TIMEOUT: u32 = 2000;
43const QUORUM_PORCENTAGE_AMPLIFICATION: f64 = 0.2;
45
46#[allow(dead_code)]
47pub struct EventCompleter<C: DatabaseCollection> {
48 gov_api: GovernanceAPI,
49 database: DB<C>,
50 message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
51 notification_tx: tokio::sync::mpsc::Sender<Notification>,
52 ledger_sender: SenderEnd<LedgerCommand, LedgerResponse>,
53 own_identifier: KeyIdentifier,
54 subjects_by_governance: HashMap<DigestIdentifier, HashSet<DigestIdentifier>>,
55 subjects_completing_event:
56 HashMap<DigestIdentifier, (ValidationStage, HashSet<KeyIdentifier>, (u32, u32))>,
57 event_pre_evaluations: HashMap<DigestIdentifier, EvaluationRequest>,
61 event_evaluations:
62 HashMap<DigestIdentifier, HashSet<(UniqueSignature, bool, DigestIdentifier)>>,
63 approval_eval_signatures: HashMap<DigestIdentifier, HashSet<Signature>>,
65 approval_requests: HashMap<DigestIdentifier, Signed<ApprovalRequest>>,
66 event_approvations: HashMap<DigestIdentifier, HashSet<UniqueApproval>>,
67 events_to_validate: HashMap<DigestIdentifier, Signed<Event>>,
69 event_validations: HashMap<DigestIdentifier, HashSet<UniqueSignature>>,
70 event_validation_events: HashMap<DigestIdentifier, ValidationEvent>,
71 signature_manager: SelfSignatureManager,
73 derivator: DigestDerivator,
74}
75
76#[allow(dead_code)]
77impl<C: DatabaseCollection> EventCompleter<C> {
78 pub fn new(
79 gov_api: GovernanceAPI,
80 database: DB<C>,
81 message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
82 notification_tx: tokio::sync::mpsc::Sender<Notification>,
83 ledger_sender: SenderEnd<LedgerCommand, LedgerResponse>,
84 own_identifier: KeyIdentifier,
85 signature_manager: SelfSignatureManager,
86 derivator: DigestDerivator,
87 ) -> Self {
88 Self {
89 gov_api,
90 database,
91 message_channel,
92 notification_tx,
93 ledger_sender,
94 subjects_completing_event: HashMap::new(),
95 event_pre_evaluations: HashMap::new(),
98 event_evaluations: HashMap::new(),
99 approval_eval_signatures: HashMap::new(),
100 approval_requests: HashMap::new(),
101 events_to_validate: HashMap::new(),
102 event_approvations: HashMap::new(),
103 event_validations: HashMap::new(),
104 subjects_by_governance: HashMap::new(),
105 event_validation_events: HashMap::new(),
106 own_identifier,
107 signature_manager,
108 derivator,
109 }
110 }
111
112 fn create_validation_event_from_genesis(
113 &self,
114 create_request: StartRequest,
115 event_hash: DigestIdentifier,
116 governance_version: u64,
117 subject_id: DigestIdentifier,
118 subject_keys: &KeyPair,
119 ) -> Result<ValidationEvent, EventError> {
120 let validation_proof = ValidationProof::new_from_genesis_event(
121 create_request,
122 event_hash,
123 governance_version,
124 subject_id,
125 );
126 let subject_signature = Signature::new(&validation_proof, subject_keys, self.derivator)?;
127 Ok(ValidationEvent {
128 proof: validation_proof,
129 subject_signature,
130 previous_proof: None,
131 prev_event_validation_signatures: HashSet::new(),
132 })
133 }
134
135 fn create_validation_event(
136 &self,
137 subject: &Subject,
138 event: &Signed<Event>,
139 gov_version: u64,
140 ) -> Result<ValidationEvent, EventError> {
141 let proof = match &event.content.event_request.content {
142 EventRequest::Create(_) | EventRequest::Fact(_) | EventRequest::EOL(_) => {
143 ValidationProof::new(
144 subject,
145 event.content.sn,
146 event.content.hash_prev_event.clone(),
147 event.content.hash_id(self.derivator)?,
148 gov_version,
149 )
150 }
151 EventRequest::Transfer(transfer_request) => ValidationProof::new_from_transfer_event(
152 subject,
153 event.content.sn,
154 event.content.hash_prev_event.clone(),
155 event.content.hash_id(self.derivator)?,
156 gov_version,
157 transfer_request.public_key.clone(),
158 ),
159 };
160 let (prev_event_validation_signatures, previous_proof) = {
161 let (prev_event_validation_signatures, previous_proof) = self
162 .database
163 .get_signatures(&subject.subject_id, subject.sn)
164 .map_err(|e| {
165 EventError::DatabaseError(format!(
166 "Error getting the signatures of the previous event: {}",
167 e
168 ))
169 })?;
170 (prev_event_validation_signatures, Some(previous_proof))
171 };
172 match &subject.keys {
173 Some(keys) => {
174 let subject_signature = Signature::new(&proof, keys, self.derivator)?;
175 Ok(ValidationEvent {
176 proof,
177 subject_signature,
178 previous_proof,
179 prev_event_validation_signatures,
180 })
181 }
182 None => Err(EventError::SubjectNotOwned(subject.subject_id.to_str()))?,
183 }
184 }
185
186 pub async fn init(&mut self) -> Result<(), EventError> {
187 let subjects = self.database.get_all_subjects();
189 for subject in subjects.iter() {
190 if subject.schema_id != "governance" {
191 self.subjects_by_governance
192 .entry(subject.governance_id.clone())
193 .or_insert_with(HashSet::new)
194 .insert(subject.subject_id.clone());
195 }
196 match self.database.get_prevalidated_event(&subject.subject_id) {
197 Ok(last_event) => {
198 let gov_version = self
199 .gov_api
200 .get_governance_version(
201 subject.governance_id.clone(),
202 subject.subject_id.clone(),
203 )
204 .await?;
205 let metadata = Metadata {
206 namespace: subject.namespace.clone(),
207 subject_id: subject.subject_id.clone(),
208 governance_id: subject.governance_id.clone(),
209 governance_version: gov_version, schema_id: subject.schema_id.clone(),
211 };
212 let stage = ValidationStage::Validate;
213 let (signers, quorum_size) =
214 self.get_signers_and_quorum(metadata, stage.clone()).await?;
215 let validation_event =
216 self.create_validation_event(subject, &last_event, gov_version)?;
217 let event_message = create_validator_request(validation_event.clone());
218 self.ask_signatures(
219 &subject.subject_id,
220 event_message,
221 signers.clone(),
222 quorum_size,
223 )
224 .await?;
225 let last_event_hash = DigestIdentifier::from_serializable_borsh(
226 &last_event.content,
227 self.derivator,
228 )
229 .map_err(|_| {
230 EventError::CryptoError("Error generating last event hash".to_owned())
231 })?;
232 self.event_validation_events
233 .insert(last_event_hash.clone(), validation_event);
234 self.events_to_validate.insert(last_event_hash, last_event);
235 self.subjects_completing_event.insert(
236 subject.subject_id.clone(),
237 (stage, signers, (quorum_size, 0)),
238 );
239 continue;
240 }
241 Err(error) => match error {
242 crate::DbError::EntryNotFound => {}
243 _ => return Err(EventError::DatabaseError(error.to_string())),
244 },
245 }
246 match self.database.get_request(&subject.subject_id) {
248 Ok(event_request) => {
249 self.new_event(event_request).await?;
250 }
251 Err(error) => match error {
252 crate::DbError::EntryNotFound => {}
253 _ => return Err(EventError::DatabaseError(error.to_string())),
254 },
255 }
256 }
280 Ok(())
281 }
282
283 pub async fn new_governance_version(
284 &mut self,
285 governance_id: DigestIdentifier,
286 new_version: u64,
287 ) -> Result<(), EventError> {
288 match self.subjects_by_governance.get(&governance_id).cloned() {
290 Some(subjects_affected) => {
291 for subject_id in subjects_affected.iter() {
292 match self.database.get_request(subject_id) {
293 Ok(event_request) => {
294 let EventRequest::Fact(_) = &event_request.content else {
295 return Err(EventError::GenesisInGovUpdate);
296 };
297 self.new_event(event_request).await?;
298 }
299 Err(error) => match error {
300 crate::DbError::EntryNotFound => {}
301 _ => {
302 return Err(EventError::DatabaseError(error.to_string()));
303 }
304 },
305 }
306 match self.database.get_prevalidated_event(subject_id) {
307 Ok(event_prevalidated) => {
308 if let EventRequest::Create(_) =
309 &event_prevalidated.content.event_request.content
310 {
311 self.message_channel
313 .tell(MessageTaskCommand::Cancel(String::from(format!(
314 "{}",
315 event_prevalidated.content.subject_id.to_str()
316 ))))
317 .await
318 .map_err(EventError::ChannelError)?;
319 self.subjects_completing_event.remove(&subject_id);
320 self.subjects_by_governance.remove(&subject_id);
321 self.database.del_prevalidated_event(&subject_id).map_err(
322 |error| EventError::DatabaseError(error.to_string()),
323 )?;
324 self.new_event(event_prevalidated.content.event_request)
325 .await?;
326 continue;
327 }
328 let subject = self
329 .database
330 .get_subject(subject_id)
331 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
332 let metadata = Metadata {
333 namespace: subject.namespace.clone(),
334 subject_id: subject_id.clone(),
335 governance_id: subject.governance_id.clone(),
336 governance_version: new_version,
337 schema_id: subject.schema_id.clone(),
338 };
339 let validation_event = self.create_validation_event(
340 &subject,
341 &event_prevalidated,
342 new_version,
343 )?;
344 let event_message = create_validator_request(validation_event.clone());
345 let stage = ValidationStage::Validate;
346 let (signers, quorum_size) =
347 self.get_signers_and_quorum(metadata, stage.clone()).await?;
348 self.ask_signatures(
349 &subject_id,
350 event_message,
351 signers.clone(),
352 quorum_size,
353 )
354 .await?;
355 let event_prevalidated_hash =
356 DigestIdentifier::from_serializable_borsh(
357 &event_prevalidated.content,
358 self.derivator,
359 )
360 .map_err(|_| {
361 EventError::CryptoError(
362 "Error generating event prevalidated hash in NGV"
363 .to_owned(),
364 )
365 })?;
366 self.event_validation_events
367 .insert(event_prevalidated_hash, validation_event);
368 self.subjects_completing_event
370 .insert(subject_id.clone(), (stage, signers, (quorum_size, 0)));
371 }
372 Err(error) => match error {
373 crate::DbError::EntryNotFound => {}
374 _ => {
375 return Err(EventError::DatabaseError(error.to_string()));
376 }
377 },
378 }
379 }
380 }
381 None => {}
382 }
383 Ok(())
384 }
385
386 async fn process_transfer_or_eol_event(
387 &mut self,
388 event_request: Signed<EventRequest>,
389 subject: Subject,
390 gov_version: u64,
391 ) -> Result<(), EventError> {
392 let subject_id = subject.subject_id.clone();
393 let None = self.subjects_completing_event.get(&subject_id) else {
395 return Err(EventError::EventAlreadyInProgress);
396 };
397 let metadata = Metadata {
398 namespace: subject.namespace.clone(),
399 subject_id: subject_id.clone(),
400 governance_id: subject.governance_id.clone(),
401 governance_version: gov_version,
402 schema_id: subject.schema_id.clone(),
403 };
404 match &event_request.content {
405 EventRequest::Transfer(tr) => {
406 if event_request.signature.signer == self.own_identifier {
407 self.database
408 .get_keys(&tr.public_key)
409 .map_err(|_| EventError::OwnTransferKeysDbError)?;
410 }
411 }
412 EventRequest::EOL(_) => {
413 if subject.owner != event_request.signature.signer {
414 return Err(EventError::CloseNotAuthorized(
415 event_request.signature.signer.to_str(),
416 ));
417 }
418 }
419 _ => unreachable!(),
420 }
421 let event =
423 &self.create_event_prevalidated_no_eval(event_request, &subject, gov_version)?;
424 let event_hash = DigestIdentifier::from_serializable_borsh(&event.content, self.derivator)
425 .map_err(|_| EventError::CryptoError("Error generating event hash".to_owned()))?;
426 let validation_event = self.create_validation_event(&subject, &event, gov_version)?;
427 let event_message = create_validator_request(validation_event.clone());
428 self.event_validation_events
429 .insert(event_hash, validation_event);
430 let stage = ValidationStage::Validate;
431 let (signers, quorum_size) = self.get_signers_and_quorum(metadata, stage.clone()).await?;
432 self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
433 .await?;
434 self.subjects_completing_event
436 .insert(subject_id.clone(), (stage, signers, (quorum_size, 0)));
437 Ok(())
438 }
439
440 async fn generate_event_proposal(
441 &self,
442 event_request: &Signed<EventRequest>,
443 subject: &Subject,
444 gov_version: u64,
445 ) -> Result<Signed<ApprovalRequest>, EventError> {
446 let derivator = match self
448 .database
449 .get_signatures(&subject.subject_id, subject.sn)
450 {
451 Ok((_, proof)) => proof.event_hash.derivator,
452 Err(crate::database::Error::EntryNotFound) => self.derivator,
453 Err(e) => return Err(EventError::DatabaseError(e.to_string())),
454 };
455 let hash_prev_event = DigestIdentifier::from_serializable_borsh(
456 &self
457 .database
458 .get_event(&subject.subject_id, subject.sn)
459 .map_err(|e| EventError::DatabaseError(e.to_string()))?
460 .content,
461 derivator,
462 )
463 .map_err(|_| {
464 EventError::CryptoError("Error calculating the hash of the previous event".to_string())
465 })?;
466 let approval_request = ApprovalRequest {
467 event_request: event_request.clone(),
468 sn: subject.sn + 1,
469 gov_version,
470 patch: ValueWrapper(
471 serde_json::from_str("[]")
472 .map_err(|_| EventError::CryptoError("Error parsing empty json".to_string()))?,
473 ),
474 state_hash: subject.properties.hash_id(self.derivator)?,
475 hash_prev_event,
476 gov_id: subject.governance_id.clone(),
477 };
478 let subject_signature = Signature::new(
479 &approval_request,
480 &subject
481 .keys
482 .as_ref()
483 .expect("Llegados a aquĆ tenemos que ser owner"),
484 self.derivator,
485 )
486 .map_err(|_| {
487 EventError::CryptoError(String::from("Error signing the hash of the proposal"))
488 })?;
489 Ok(Signed::<ApprovalRequest> {
490 content: approval_request,
491 signature: subject_signature,
492 })
493 }
494
495 pub async fn pre_new_event(
496 &mut self,
497 event_request: Signed<EventRequest>,
498 ) -> Result<DigestIdentifier, EventError> {
499 warn!("Entra en pre event");
502 event_request.verify().map_err(EventError::SubjectError)?;
503 warn!("After first verify");
504 let request_id = DigestIdentifier::generate_with_blake3(&event_request)
505 .map_err(|_| EventError::HashGenerationFailed)?;
506 match self.database.get_taple_request(&request_id) {
508 Ok(_) => {
509 return Err(EventError::RequestAlreadyKnown);
510 }
511 Err(crate::DbError::EntryNotFound) => {}
512 Err(error) => return Err(EventError::DatabaseError(error.to_string())),
513 }
514 let subject_id: &DigestIdentifier = match &event_request.content {
515 EventRequest::Create(_) => return self.new_event(event_request).await,
516 EventRequest::Fact(fact_req) => &fact_req.subject_id,
517 EventRequest::Transfer(trans_req) => &trans_req.subject_id,
518 EventRequest::EOL(eol_req) => &eol_req.subject_id,
519 };
520 let None = self.subjects_completing_event.get(subject_id) else {
522 return Err(EventError::EventAlreadyInProgress);
523 };
524 self.new_event(event_request).await
525 }
526
527 pub async fn new_event(
529 &mut self,
530 event_request: Signed<EventRequest>,
531 ) -> Result<DigestIdentifier, EventError> {
532 let request_id = DigestIdentifier::generate_with_blake3(&event_request)
533 .map_err(|_| EventError::HashGenerationFailed)?;
534 if let EventRequest::Create(create_request) = &event_request.content {
535 if event_request.signature.signer != self.own_identifier {
537 return Err(EventError::ExternalGenesisEvent);
538 }
539 if create_request.public_key.public_key.is_empty() {
540 return Err(EventError::PublicKeyIsEmpty);
541 }
542 let subject_keys = match self.database.get_keys(&create_request.public_key) {
544 Ok(keys) => keys,
545 Err(crate::DbError::EntryNotFound) => {
546 return Err(EventError::SubjectKeysNotFound(
547 create_request.public_key.to_str(),
548 ));
549 }
550 Err(error) => return Err(EventError::DatabaseError(error.to_string())),
551 };
552 let (governance_version, initial_state) = if &create_request.schema_id != "governance" {
553 let governance_version = self
554 .gov_api
555 .get_governance_version(
556 create_request.governance_id.clone(),
557 DigestIdentifier::default(),
558 )
559 .await
560 .map_err(EventError::GovernanceError)?;
561 let creation_premission = self
562 .gov_api
563 .get_invoke_info(
564 Metadata {
565 namespace: create_request.namespace.clone(),
566 subject_id: DigestIdentifier::default(), governance_id: create_request.governance_id.clone(),
568 governance_version,
569 schema_id: create_request.schema_id.clone(),
570 },
571 ValidationStage::Create,
572 self.own_identifier.clone(),
573 )
574 .await
575 .map_err(EventError::GovernanceError)?;
576 if !creation_premission {
577 return Err(EventError::CreatingPermissionDenied);
578 }
579 let initial_state = self
580 .gov_api
581 .get_init_state(
582 create_request.governance_id.clone(),
583 create_request.schema_id.clone(),
584 governance_version,
585 )
586 .await?;
587 (governance_version, initial_state)
588 } else {
589 let initial_state = self
590 .gov_api
591 .get_init_state(
592 create_request.governance_id.clone(),
593 create_request.schema_id.clone(),
594 0,
595 )
596 .await?;
597 (0, initial_state)
598 };
599 let subject_id = generate_subject_id(
600 &create_request.namespace,
601 &create_request.schema_id,
602 create_request.public_key.to_str(),
603 create_request.governance_id.to_str(),
604 governance_version,
605 self.derivator,
606 )?;
607 let event = Signed::<Event>::from_genesis_request(
609 event_request.clone(),
610 &subject_keys,
611 governance_version,
612 &initial_state,
613 self.derivator,
614 )
615 .map_err(EventError::SubjectError)?;
616 let event_hash =
617 DigestIdentifier::from_serializable_borsh(&event.content, self.derivator)
618 .map_err(|_| EventError::HashGenerationFailed)?;
619 let validation_event = self.create_validation_event_from_genesis(
620 create_request.clone(),
621 event_hash.clone(),
622 governance_version,
623 subject_id.clone(),
624 &subject_keys,
625 )?;
626 let metadata = validation_event.proof.get_metadata();
627 let event_message = create_validator_request(validation_event.clone());
628 let stage = ValidationStage::Validate;
629 let (signers, quorum_size) = if &create_request.schema_id != "governance" {
630 self.get_signers_and_quorum(metadata, stage.clone()).await?
631 } else {
632 let mut hs = HashSet::new();
633 hs.insert(self.own_identifier.clone());
634 (hs, 1)
635 };
636 self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
637 .await?;
638 self.event_validation_events
639 .insert(event_hash.clone(), validation_event);
640 self.events_to_validate.insert(event_hash, event.clone());
642 self.database
643 .set_taple_request(&request_id, &event_request.clone().try_into()?)
644 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
645 self.database
646 .set_prevalidated_event(&subject_id, event.clone())
647 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
648 if create_request.schema_id != "governance" {
649 self.subjects_by_governance
650 .entry(create_request.governance_id.clone())
651 .or_insert_with(HashSet::new)
652 .insert(subject_id.clone());
653 }
654 self.subjects_completing_event
655 .insert(subject_id, (stage, signers, (quorum_size, 0)));
656 return Ok(request_id);
657 }
658 let subject_id = match &event_request.content {
659 EventRequest::Transfer(tr) => {
660 log::info!("Processing transfer event");
661 tr.subject_id.clone()
662 }
663 EventRequest::EOL(eolr) => {
664 log::info!("Processing EOL event");
665 eolr.subject_id.clone()
666 }
667 EventRequest::Fact(sr) => {
668 log::info!("Processing state event");
669 sr.subject_id.clone()
670 }
671 _ => unreachable!(),
672 };
673 let subject = match self.database.get_subject(&subject_id) {
675 Ok(subject) => subject,
676 Err(error) => match error {
677 crate::DbError::EntryNotFound => {
678 return Err(EventError::SubjectNotFound(subject_id.to_str()))
679 }
680 _ => return Err(EventError::DatabaseError(error.to_string())),
681 },
682 };
683 if !subject.active {
685 return Err(EventError::SubjectLifeEnd(subject_id.to_str()));
686 }
687 if subject.keys.is_none() {
689 return Err(EventError::SubjectNotOwned(subject_id.to_str()));
690 }
691 let gov_version = self
693 .gov_api
694 .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
695 .await
696 .map_err(EventError::GovernanceError)?;
697 match &event_request.content {
698 EventRequest::Transfer(_) | EventRequest::EOL(_) => {
699 self.process_transfer_or_eol_event(
706 event_request.clone(),
707 subject.clone(),
708 gov_version,
709 )
710 .await?;
711 }
712 EventRequest::Fact(_) => {
713 let (metadata, stage) = (
716 Metadata {
717 namespace: subject.namespace,
718 subject_id: subject_id.clone(),
719 governance_id: subject.governance_id.clone(),
720 governance_version: gov_version,
721 schema_id: subject.schema_id,
722 },
723 ValidationStage::Evaluate,
724 );
725 if event_request.signature.signer != self.own_identifier
727 && !self
728 .gov_api
729 .get_invoke_info(
730 metadata.clone(),
731 ValidationStage::Invoke,
732 event_request.signature.signer.clone(),
733 )
734 .await
735 .map_err(EventError::GovernanceError)?
736 {
737 return Err(EventError::InvokePermissionDenied(
738 event_request.signature.signer.to_str(),
739 subject_id.to_str(),
740 ));
741 };
742 let event_preevaluation = EvaluationRequest {
743 event_request: event_request.clone(),
744 context: SubjectContext {
745 governance_id: metadata.governance_id.clone(),
746 schema_id: metadata.schema_id.clone(),
747 is_owner: subject.owner == event_request.signature.signer,
748 state: subject.properties,
749 namespace: metadata.namespace.clone(),
752 },
753 gov_version,
754 sn: subject.sn + 1,
755 };
757 let (signers, quorum_size) =
758 self.get_signers_and_quorum(metadata, stage.clone()).await?;
759 let event_preevaluation_hash =
770 DigestIdentifier::generate_with_blake3(&event_preevaluation).map_err(|_| {
771 EventError::CryptoError(String::from(
772 "Error calculating the hash of the event pre-evaluation",
773 ))
774 })?;
775 self.event_pre_evaluations
776 .insert(event_preevaluation_hash, event_preevaluation.clone());
777 let negative_quorum_size = (signers.len() as u32 - quorum_size) + 1;
779 self.subjects_completing_event.insert(
780 subject_id.clone(),
781 (stage, signers.clone(), (quorum_size, negative_quorum_size)),
782 );
783 self.ask_signatures(
784 &subject_id,
785 create_evaluator_request(event_preevaluation.clone()),
786 signers.clone(),
787 quorum_size,
788 )
789 .await?;
790 }
791 EventRequest::Create(_) => unreachable!(),
792 }
793 self.subjects_by_governance
794 .entry(subject.governance_id.clone())
795 .or_insert_with(HashSet::new)
796 .insert(subject.governance_id);
797 let mut request_data: TapleRequest = event_request.clone().try_into()?;
798 request_data.sn = Some(subject.sn + 1);
799 request_data.subject_id = Some(subject.subject_id.clone());
800 self.database
801 .set_taple_request(&request_id, &request_data)
802 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
803 self.database
804 .set_request(&subject.subject_id, event_request)
805 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
806 Ok(request_id)
807 }
808
809 pub async fn evaluator_signatures(
810 &mut self,
811 evaluator_response: Signed<EvaluationResponse>,
812 ) -> Result<(), EventError> {
813 let evaluation_request = match self
815 .event_pre_evaluations
816 .get(&evaluator_response.content.eval_req_hash)
817 {
818 Some(preevaluation_event) => preevaluation_event,
819 None => return Err(EventError::CryptoError(String::from(
820 "The hash of the event pre-evaluation does not match any of the pre-evaluations",
821 ))),
822 };
823
824 let subject_id = match &evaluation_request.event_request.content {
825 EventRequest::Transfer(_) => return Err(EventError::NoEvaluationForTransferEvents),
827 EventRequest::EOL(_) => return Err(EventError::NoEvaluationForEOLEvents),
828 EventRequest::Create(_) => {
829 return Err(EventError::EvaluationOrApprovationInCreationEvent)
830 }
831 EventRequest::Fact(state_request) => state_request.subject_id.clone(),
832 };
833 let Some((ValidationStage::Evaluate, signers, quorum_size)) =
835 self.subjects_completing_event.get(&subject_id)
836 else {
837 return Err(EventError::WrongEventPhase);
838 };
839 let signer = evaluator_response.signature.signer.clone();
840 if !signers.contains(&signer) {
842 return Err(EventError::CryptoError(String::from(
843 "The signer is not in the list of evaluators or we already have the signature",
844 )));
845 }
846 evaluator_response
848 .verify()
849 .map_err(|error| EventError::CryptoError(error.to_string()))?;
850 let evaluation_hash = evaluator_response.content.hash_id(self.derivator)?;
851 let subject = self
853 .database
854 .get_subject(&subject_id)
855 .map_err(|error| match error {
856 crate::DbError::EntryNotFound => EventError::SubjectNotFound(subject_id.to_str()),
857 _ => EventError::DatabaseError(error.to_string()),
858 })?;
859 let governance_version = self
861 .gov_api
862 .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
863 .await
864 .map_err(EventError::GovernanceError)?;
865 if !hash_match_after_patch(
867 &evaluator_response.content,
868 evaluator_response.content.patch.clone(),
869 subject.properties.clone(),
870 )? {
871 return Err(EventError::CryptoError(
872 "Json patch applied to state hash does not match the new state hash".to_string(),
873 ));
874 }
875 let signatures_set = match self
877 .event_evaluations
878 .get_mut(&evaluator_response.content.eval_req_hash)
879 {
880 Some(signatures_set) => {
881 insert_or_replace_and_check(
882 signatures_set,
883 (
884 UniqueSignature {
885 signature: evaluator_response.signature.clone(),
886 },
887 evaluator_response.content.eval_success.clone(),
888 evaluation_hash.clone(),
889 ),
890 );
891 signatures_set
892 }
893 None => {
894 let mut new_signatures_set = HashSet::new();
895 new_signatures_set.insert((
896 UniqueSignature {
897 signature: evaluator_response.signature.clone(),
898 },
899 evaluator_response.content.eval_success.clone(),
900 evaluation_hash.clone(),
901 ));
902 self.event_evaluations.insert(
903 evaluator_response.content.eval_req_hash.clone(),
904 new_signatures_set,
905 );
906 self.event_evaluations
907 .get_mut(&evaluator_response.content.eval_req_hash)
908 .expect("Acabamos de insertar el conjunto de firmas, por lo que debe estar presente")
909 }
910 };
911 let (num_signatures_hash_ok, num_signatures_hash_ko) =
912 count_signatures_with_event_content_hash(&signatures_set, &evaluation_hash);
913 let (quorum_size, negative_quorum_size) = quorum_size.to_owned();
914 let quorum_reached = {
916 if num_signatures_hash_ok >= quorum_size {
917 Some(true)
918 } else if num_signatures_hash_ko >= negative_quorum_size {
919 Some(false)
920 } else {
921 None
922 }
923 };
924 if quorum_reached.is_none() {
925 let mut new_signers: HashSet<KeyIdentifier> =
926 signers.into_iter().map(|s| s.clone()).collect();
927 new_signers.remove(&signer);
928 self.ask_signatures(
929 &subject_id,
930 create_evaluator_request(evaluation_request.clone()),
931 new_signers.clone(),
932 quorum_size,
933 )
934 .await?;
935 self.subjects_completing_event.insert(
936 subject_id,
937 (
938 ValidationStage::Evaluate,
939 new_signers,
940 (quorum_size, negative_quorum_size),
941 ),
942 );
943 return Ok(()); } else {
945 let evaluator_signatures: HashSet<Signature> = signatures_set
948 .iter()
949 .filter(|(_, acceptance, hash)| {
950 hash == &evaluation_hash && quorum_reached.as_ref().unwrap() == acceptance
951 })
952 .map(|(signature, _, _)| signature.signature.clone())
953 .collect();
954 let derivator = match self
956 .database
957 .get_signatures(&subject.subject_id, subject.sn)
958 {
959 Ok((_, proof)) => proof.event_hash.derivator,
960 Err(crate::database::Error::EntryNotFound) => self.derivator,
961 Err(e) => return Err(EventError::DatabaseError(e.to_string())),
962 };
963 let hash_prev_event = DigestIdentifier::from_serializable_borsh(
964 &self
965 .database
966 .get_event(&subject.subject_id, subject.sn)
967 .map_err(|e| EventError::DatabaseError(e.to_string()))?
968 .content,
969 derivator,
970 )
971 .map_err(|_| {
972 EventError::CryptoError(
973 "Error calculating the hash of the previous event".to_owned(),
974 )
975 })?;
976 let metadata = Metadata {
977 namespace: subject.namespace.clone(),
978 subject_id: subject_id.clone(),
979 governance_id: subject.governance_id.clone(),
980 governance_version,
981 schema_id: subject.schema_id.clone(),
982 };
983 if evaluator_response.content.appr_required && !evaluator_response.content.eval_success
984 {
985 return Err(EventError::ApprovalRequiredWhenEvalFailed);
986 }
987 let (stage, event_message) = if evaluator_response.content.appr_required {
989 let approval_request = ApprovalRequest {
990 event_request: evaluation_request.event_request.clone(),
991 sn: evaluation_request.sn,
992 gov_version: governance_version,
993 patch: evaluator_response.content.patch,
994 state_hash: evaluator_response.content.state_hash,
995 hash_prev_event,
996 gov_id: subject.governance_id.clone(),
997 };
998 let approval_request_hash = approval_request
999 .hash_id(DigestDerivator::Blake3_256)
1000 .map_err(|_| {
1001 EventError::CryptoError(String::from(
1002 "Error calculating the hash of the proposal",
1003 ))
1004 })?;
1005 let subject_keys = subject
1006 .keys
1007 .as_ref()
1008 .expect("Llegados a aquĆ tenemos que ser owner");
1009 let subject_signature = Signature::new(
1010 &approval_request,
1011 &subject_keys,
1012 self.derivator,
1013 )
1014 .map_err(|_| {
1015 EventError::CryptoError(String::from("Error signing the Approval Request"))
1016 })?;
1017 let approval_request = Signed::<ApprovalRequest> {
1018 content: approval_request,
1019 signature: subject_signature,
1020 };
1021 self.approval_eval_signatures
1023 .insert(approval_request_hash.clone(), evaluator_signatures.clone());
1024 self.approval_requests
1025 .insert(approval_request_hash, approval_request.clone());
1026 let msg = create_approval_request(approval_request);
1027 (ValidationStage::Approve, msg)
1029 } else {
1030 let gov_version = self
1032 .gov_api
1033 .get_governance_version(
1034 subject.governance_id.clone(),
1035 subject.subject_id.clone(),
1036 )
1037 .await?;
1038 let event = Event {
1039 subject_id: subject_id.clone(),
1040 event_request: evaluation_request.event_request.clone(),
1041 sn: evaluation_request.sn,
1042 gov_version: governance_version,
1043 patch: evaluator_response.content.patch,
1044 state_hash: evaluator_response.content.state_hash,
1045 eval_success: evaluator_response.content.eval_success,
1046 appr_required: evaluator_response.content.appr_required,
1047 approved: true,
1048 hash_prev_event,
1049 evaluators: evaluator_signatures,
1050 approvers: HashSet::new(),
1051 };
1052 let event_hash = event.hash_id(self.derivator)?;
1053 let subject_keys = subject
1054 .keys
1055 .as_ref()
1056 .expect("Llegados a aquĆ tenemos que ser owner");
1057 let subject_signature = Signature::new(&event, &subject_keys, self.derivator)
1058 .map_err(|_| {
1059 EventError::CryptoError(String::from("Error signing the Event"))
1060 })?;
1061 let signed_event = Signed::<Event> {
1062 content: event,
1063 signature: subject_signature,
1064 };
1065 let validation_event =
1066 self.create_validation_event(&subject, &signed_event, gov_version)?;
1067 let event_message = create_validator_request(validation_event.clone());
1068 self.event_validation_events
1069 .insert(event_hash.clone(), validation_event);
1070 self.events_to_validate
1071 .insert(event_hash, signed_event.clone());
1072 self.database
1073 .set_prevalidated_event(&subject.subject_id, signed_event)
1074 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1075 self.database
1076 .del_request(&subject.subject_id)
1077 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1078 (ValidationStage::Validate, event_message)
1079 };
1080 self.event_evaluations
1082 .remove(&evaluator_response.content.eval_req_hash);
1083 self.event_pre_evaluations
1084 .remove(&evaluator_response.content.eval_req_hash);
1085 let (signers, quorum_size) =
1086 self.get_signers_and_quorum(metadata, stage.clone()).await?;
1087 self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
1088 .await?;
1089 let negative_quorum_size = (signers.len() as u32 - quorum_size) + 1;
1091 self.subjects_completing_event.insert(
1092 subject_id.clone(),
1093 (stage, signers, (quorum_size, negative_quorum_size)),
1094 );
1095 }
1096 Ok(())
1097 }
1098
1099 pub async fn approver_signatures(
1100 &mut self,
1101 approval: Signed<ApprovalResponse>,
1102 ) -> Result<(), EventError> {
1103 let approval_request = match self.approval_requests.get(&approval.content.appr_req_hash) {
1105 Some(event_proposal) => event_proposal,
1106 None => {
1107 return Err(EventError::CryptoError(String::from(
1108 "The hash of the event proposal does not match any of the proposals",
1109 )))
1110 }
1111 };
1112 let subject_id = match &approval_request.content.event_request.content {
1113 EventRequest::Transfer(_) => return Err(EventError::NoApprovalForTransferEvents),
1115 EventRequest::EOL(_) => return Err(EventError::NoApprovalForEOLEvents),
1117 EventRequest::Create(_) => {
1118 return Err(EventError::EvaluationOrApprovationInCreationEvent)
1119 }
1120 EventRequest::Fact(state_request) => state_request.subject_id.clone(),
1121 };
1122 let Some((ValidationStage::Approve, signers, quorum_size)) =
1123 self.subjects_completing_event.get(&subject_id)
1124 else {
1125 return Err(EventError::WrongEventPhase);
1126 };
1127 let signer = approval.signature.signer.clone();
1128 if !signers.contains(&signer) {
1130 return Err(EventError::CryptoError(String::from(
1131 "The signer is not in the list of approvers or we already have his approve",
1132 )));
1133 }
1134 approval
1136 .verify()
1137 .map_err(|error| EventError::CryptoError(error.to_string()))?;
1138 let subject = self
1140 .database
1141 .get_subject(&subject_id)
1142 .map_err(|error| match error {
1143 crate::DbError::EntryNotFound => EventError::SubjectNotFound(subject_id.to_str()),
1144 _ => EventError::DatabaseError(error.to_string()),
1145 })?;
1146 let approval_set = match self
1148 .event_approvations
1149 .get_mut(&approval.content.appr_req_hash)
1150 {
1151 Some(approval_set) => {
1152 insert_or_replace_and_check(
1153 approval_set,
1154 UniqueApproval {
1155 approval: approval.clone(),
1156 },
1157 );
1158 approval_set
1159 }
1160 None => {
1161 let mut new_approval_set = HashSet::new();
1162 new_approval_set.insert(UniqueApproval {
1163 approval: approval.clone(),
1164 });
1165 self.event_approvations
1166 .insert(approval.content.appr_req_hash.clone(), new_approval_set);
1167 self.event_approvations
1168 .get_mut(&approval.content.appr_req_hash)
1169 .expect("Acabamos de insertar el conjunto de approvals, por lo que debe estar presente")
1170 }
1171 };
1172 let num_approvals_with_same_acceptance = approval_set
1174 .iter()
1175 .filter(|unique_approval| {
1176 unique_approval.approval.content.approved == approval.content.approved
1177 })
1178 .count() as u32;
1179 let (quorum_size_now, _) = match approval.content.approved {
1180 true => (quorum_size.0, true),
1181 false => (quorum_size.1, false),
1182 };
1183 if num_approvals_with_same_acceptance < quorum_size_now {
1184 let mut new_signers: HashSet<KeyIdentifier> =
1186 signers.into_iter().map(|s| s.clone()).collect();
1187 new_signers.remove(&signer);
1188 self.ask_signatures(
1189 &subject_id,
1190 create_approval_request(approval_request.to_owned()),
1191 signers.clone(),
1192 quorum_size_now,
1193 )
1194 .await?;
1195 self.subjects_completing_event.insert(
1197 subject_id,
1198 (
1199 ValidationStage::Approve,
1200 new_signers,
1201 quorum_size.to_owned(),
1202 ),
1203 );
1204 Ok(()) } else {
1206 let governance_version = self
1207 .gov_api
1208 .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
1209 .await
1210 .map_err(EventError::GovernanceError)?;
1211 let metadata = Metadata {
1213 namespace: subject.namespace.clone(),
1214 subject_id: subject_id.clone(),
1215 governance_id: subject.governance_id.clone(),
1216 governance_version,
1217 schema_id: subject.schema_id.clone(),
1218 };
1219 let approvals: HashSet<Signature> = approval_set
1221 .iter()
1222 .filter(|unique_approval| {
1223 unique_approval.approval.content.approved == approval.content.approved
1224 })
1225 .map(|approval| approval.approval.signature.clone())
1226 .collect();
1227 let event_proposal = self
1228 .approval_requests
1229 .get(&approval.content.appr_req_hash)
1230 .unwrap();
1231 let gov_version = self
1232 .gov_api
1233 .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
1234 .await?;
1235 let evaluators = self
1236 .approval_eval_signatures
1237 .get(&approval.content.appr_req_hash)
1238 .unwrap()
1239 .to_owned();
1240 let event = Event {
1241 subject_id: subject_id.clone(),
1242 event_request: event_proposal.content.event_request.clone(),
1243 sn: event_proposal.content.sn,
1244 gov_version,
1245 patch: event_proposal.content.patch.clone(),
1246 state_hash: event_proposal.content.state_hash.clone(),
1247 eval_success: true,
1248 appr_required: true,
1249 approved: approval.content.approved,
1250 hash_prev_event: event_proposal.content.hash_prev_event.clone(),
1251 evaluators,
1252 approvers: approvals,
1253 };
1254 let event_hash = event.hash_id(self.derivator)?;
1255 let subject_keys = subject
1256 .keys
1257 .as_ref()
1258 .expect("Llegados a aquĆ tenemos que ser owner");
1259 let subject_signature =
1260 Signature::new(&event, &subject_keys, self.derivator).map_err(|_| {
1261 EventError::CryptoError(String::from(
1262 "Error signing the Event (Approval stage)",
1263 ))
1264 })?;
1265 let signed_event = Signed::<Event> {
1266 content: event,
1267 signature: subject_signature,
1268 };
1269 let validation_event =
1270 self.create_validation_event(&subject, &signed_event, gov_version)?;
1271 let event_message = create_validator_request(validation_event.clone());
1272 self.approval_eval_signatures
1274 .remove(&approval.content.appr_req_hash);
1275 self.approval_requests
1276 .remove(&approval.content.appr_req_hash);
1277 self.event_approvations
1278 .remove(&approval.content.appr_req_hash);
1279 let stage = ValidationStage::Validate;
1280 let (signers, quorum_size) =
1281 self.get_signers_and_quorum(metadata, stage.clone()).await?;
1282 self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
1283 .await?;
1284 self.event_validation_events
1285 .insert(event_hash.clone(), validation_event);
1286 self.events_to_validate
1287 .insert(event_hash, signed_event.clone());
1288 self.database
1289 .set_prevalidated_event(&subject.subject_id, signed_event)
1290 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1291 self.database
1292 .del_request(&subject.subject_id)
1293 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1294 self.subjects_completing_event
1296 .insert(subject_id, (stage, signers, (quorum_size, 0)));
1297 Ok(())
1298 }
1299 }
1300
1301 pub async fn validation_signatures(
1302 &mut self,
1303 event_hash: DigestIdentifier,
1304 signature: Signature,
1305 governance_version: u64,
1306 ) -> Result<(), EventError> {
1307 let event = match self.events_to_validate.get(&event_hash) {
1309 Some(event) => event,
1310 None => {
1311 return Err(EventError::CryptoError(String::from(
1312 "The hash of the event does not match any of the events 1",
1313 )));
1314 }
1315 };
1316 let validation_event = self
1317 .event_validation_events
1318 .get(&event_hash)
1319 .expect("Should be");
1320 let subject_id = match &event.content.event_request.content {
1321 EventRequest::Transfer(transfer_request) => transfer_request.subject_id.clone(),
1322 EventRequest::EOL(eol_request) => eol_request.subject_id.clone(),
1323 EventRequest::Create(create_request) => generate_subject_id(
1324 &create_request.namespace,
1325 &create_request.schema_id,
1326 create_request.public_key.to_str(),
1327 create_request.governance_id.to_str(),
1328 event.content.gov_version,
1329 self.derivator,
1330 )?,
1331 EventRequest::Fact(state_request) => state_request.subject_id.clone(),
1332 };
1333 let subject = match self.database.get_subject(&subject_id) {
1335 Ok(subject) => Some(subject),
1336 Err(error) => match error {
1337 crate::DbError::EntryNotFound => None,
1338 _ => return Err(EventError::DatabaseError(error.to_string())),
1339 },
1340 };
1341 let (our_governance_version, governance_id) = if event.content.sn == 0 && subject.is_none()
1342 {
1343 if let EventRequest::Create(create_request) = &event.content.event_request.content {
1344 if create_request.schema_id == "governance" {
1345 (0, create_request.governance_id.clone())
1346 } else {
1347 (
1348 self.gov_api
1349 .get_governance_version(
1350 create_request.governance_id.clone(),
1351 subject_id.clone(),
1352 )
1353 .await
1354 .map_err(EventError::GovernanceError)?,
1355 create_request.governance_id.clone(),
1356 )
1357 }
1358 } else {
1359 return Err(EventError::Event0NotCreate);
1360 }
1361 } else if subject.is_some() && event.content.sn != 0 {
1362 let subject = subject.unwrap();
1363 if subject.schema_id == "governance" {
1364 (subject.sn, subject.subject_id.clone())
1365 } else {
1366 (
1367 self.gov_api
1368 .get_governance_version(
1369 subject.governance_id.clone(),
1370 subject.subject_id.clone(),
1371 )
1372 .await
1373 .map_err(EventError::GovernanceError)?,
1374 subject.governance_id,
1375 )
1376 }
1377 } else {
1378 return Err(EventError::SubjectNotFound(subject_id.to_str()));
1379 };
1380 if our_governance_version < governance_version {
1381 let msg = request_gov_event(
1383 self.own_identifier.clone(),
1384 governance_id,
1385 our_governance_version + 1,
1386 );
1387 self.message_channel
1388 .tell(MessageTaskCommand::Request(
1389 None,
1390 msg,
1391 vec![signature.signer],
1392 MessageConfig {
1393 timeout: 2000,
1394 replication_factor: 1.0,
1395 },
1396 ))
1397 .await?;
1398 return Ok(());
1399 } else if our_governance_version > governance_version {
1400 return Ok(());
1402 }
1403 let Some((ValidationStage::Validate, signers, quorum_size)) =
1405 self.subjects_completing_event.get(&subject_id)
1406 else {
1407 return Err(EventError::WrongEventPhase);
1408 };
1409 let signer = signature.signer.clone();
1410 if !signers.contains(&signer) {
1412 return Err(EventError::CryptoError(String::from(
1413 "The signer is not in the list of validators or we already have the validation",
1414 )));
1415 }
1416 signature
1418 .verify(&validation_event.proof)
1419 .map_err(|error| EventError::CryptoError(error.to_string()))?;
1420 let validation_set = match self.event_validations.get_mut(&event_hash) {
1422 Some(validation_set) => {
1423 insert_or_replace_and_check(validation_set, UniqueSignature { signature });
1424 validation_set
1425 }
1426 None => {
1427 let mut new_validation_set = HashSet::new();
1428 new_validation_set.insert(UniqueSignature { signature });
1429 self.event_validations
1430 .insert(event_hash.clone(), new_validation_set);
1431 self.event_validations.get_mut(&event_hash).expect(
1432 "Acabamos de insertar el conjunto de validations, por lo que debe estar presente",
1433 )
1434 }
1435 };
1436 let quorum_size = quorum_size.to_owned();
1437 if (validation_set.len() as u32) < quorum_size.0 {
1439 let event_message = create_validator_request(validation_event.to_owned());
1440 let mut new_signers: HashSet<KeyIdentifier> =
1441 signers.into_iter().map(|s| s.clone()).collect();
1442 new_signers.remove(&signer);
1443 self.ask_signatures(
1444 &subject_id,
1445 event_message,
1446 new_signers.clone(),
1447 quorum_size.0,
1448 )
1449 .await?;
1450 self.subjects_completing_event.insert(
1452 subject_id,
1453 (ValidationStage::Validate, new_signers, quorum_size),
1454 );
1455 Ok(())
1456 } else {
1457 let validation_signatures: HashSet<Signature> = validation_set
1458 .iter()
1459 .map(|unique_signature| unique_signature.signature.clone())
1460 .collect();
1461 if event.content.sn == 0 {
1464 let response = self
1465 .ledger_sender
1466 .ask(LedgerCommand::Genesis {
1467 event: event.clone(),
1468 signatures: validation_signatures,
1469 validation_proof: validation_event.proof.clone(),
1470 })
1471 .await?;
1472 log::debug!("LEDGER RESPONSE: {:?}", response);
1473 } else {
1474 let response = self
1475 .ledger_sender
1476 .ask(LedgerCommand::OwnEvent {
1477 event: event.clone(),
1478 signatures: validation_signatures,
1479 validation_proof: validation_event.proof.clone(),
1480 })
1481 .await?;
1482 log::debug!("LEDGER RESPONSE: {:?}", response);
1483 }
1484 self.database
1485 .del_prevalidated_event(&subject_id)
1486 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1487 self.database
1488 .del_request(&subject_id)
1489 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1490 self.message_channel
1491 .tell(MessageTaskCommand::Cancel(String::from(format!(
1492 "{}",
1493 subject_id.to_str()
1494 ))))
1495 .await
1496 .map_err(EventError::ChannelError)?;
1497 self.events_to_validate.remove(&event_hash);
1498 self.event_validation_events.remove(&event_hash);
1499 self.event_validations.remove(&event_hash);
1500 self.subjects_completing_event.remove(&subject_id);
1501 Ok(())
1502 }
1503 }
1504
1505 pub async fn higher_governance_expected(
1506 &self,
1507 governance_id: DigestIdentifier,
1508 who_asked: KeyIdentifier,
1509 ) -> Result<(), EventError> {
1510 self.message_channel
1511 .tell(MessageTaskCommand::Request(
1512 None,
1513 TapleMessages::LedgerMessages(LedgerCommand::GetLCE {
1514 who_asked: self.own_identifier.clone(),
1515 subject_id: governance_id,
1516 }),
1517 vec![who_asked],
1518 MessageConfig {
1519 timeout: TIMEOUT,
1520 replication_factor: 1.0,
1521 },
1522 ))
1523 .await
1524 .map_err(EventError::ChannelError)
1525 }
1526
1527 async fn get_signers_and_quorum(
1528 &self,
1529 metadata: Metadata,
1530 stage: ValidationStage,
1531 ) -> Result<(HashSet<KeyIdentifier>, u32), EventError> {
1532 let signers = self
1533 .gov_api
1534 .get_signers(metadata.clone(), stage.clone())
1535 .await
1536 .map_err(EventError::GovernanceError)?;
1537 let quorum_size = self
1538 .gov_api
1539 .get_quorum(metadata, stage)
1540 .await
1541 .map_err(EventError::GovernanceError)?;
1542 Ok((signers, quorum_size))
1543 }
1544
1545 async fn ask_signatures(
1546 &self,
1547 subject_id: &DigestIdentifier,
1548 event_message: TapleMessages,
1549 signers: HashSet<KeyIdentifier>,
1550 quorum_size: u32,
1551 ) -> Result<(), EventError> {
1552 let replication_factor = extend_quorum(quorum_size, signers.len());
1553 self.message_channel
1554 .tell(MessageTaskCommand::Request(
1555 Some(String::from(format!("{}", subject_id.to_str()))),
1556 event_message,
1557 signers.into_iter().collect(),
1558 MessageConfig {
1559 timeout: TIMEOUT,
1560 replication_factor,
1561 },
1562 ))
1563 .await
1564 .map_err(EventError::ChannelError)?;
1565 Ok(())
1566 }
1567
1568 fn create_event_prevalidated_no_eval(
1569 &mut self,
1570 event_request: Signed<EventRequest>,
1571 subject: &Subject,
1572 gov_version: u64,
1573 ) -> Result<Signed<Event>, EventError> {
1574 let derivator = match self
1576 .database
1577 .get_signatures(&subject.subject_id, subject.sn)
1578 {
1579 Ok((_, proof)) => proof.event_hash.derivator,
1580 Err(crate::database::Error::EntryNotFound) => self.derivator,
1581 Err(e) => return Err(EventError::DatabaseError(e.to_string())),
1582 };
1583 let hash_prev_event = self
1584 .database
1585 .get_event(&subject.subject_id, subject.sn)
1586 .map_err(|e| EventError::DatabaseError(e.to_string()))?
1587 .content
1588 .hash_id(derivator)?;
1589 let event = Event {
1590 subject_id: subject.subject_id.clone(),
1591 event_request,
1592 sn: subject.sn + 1,
1593 gov_version,
1594 patch: ValueWrapper(
1595 serde_json::from_str("[]")
1596 .map_err(|_| EventError::CryptoError("Error parsing empty json".to_string()))?,
1597 ),
1598 state_hash: subject.properties.hash_id(self.derivator)?,
1599 eval_success: true,
1600 appr_required: false,
1601 approved: true,
1602 hash_prev_event,
1603 evaluators: HashSet::new(),
1604 approvers: HashSet::new(),
1605 };
1606 let event_content_hash = event.hash_id(self.derivator)?;
1607 let subject_keys = subject.keys.as_ref().expect("Somos propietario");
1608 let event_signature =
1609 Signature::new(&event, &subject_keys, self.derivator).map_err(|_| {
1610 EventError::CryptoError(String::from("Error signing the hash of the event content"))
1611 })?;
1612 let event = Signed::<Event> {
1613 content: event,
1614 signature: event_signature,
1615 };
1616 self.events_to_validate
1617 .insert(event_content_hash, event.clone());
1618 self.database
1619 .set_prevalidated_event(&subject.subject_id, event.clone())
1620 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1621 self.database
1622 .del_request(&subject.subject_id)
1623 .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1624 Ok(event)
1625 }
1626}
1627
1628pub fn extend_quorum(quorum_size: u32, signers_len: usize) -> f64 {
1629 let quorum_extended =
1630 quorum_size + (signers_len as f64 * QUORUM_PORCENTAGE_AMPLIFICATION).ceil() as u32;
1631 quorum_extended as f64 / signers_len as f64
1632}
1633
1634fn count_signatures_with_event_content_hash(
1635 signatures: &HashSet<(UniqueSignature, bool, DigestIdentifier)>,
1636 target_event_content_hash: &DigestIdentifier,
1637) -> (u32, u32) {
1638 let mut ok: u32 = 0;
1639 let mut ko: u32 = 0;
1640 for (_, acceptance, hash) in signatures.iter() {
1641 if hash == target_event_content_hash {
1642 match acceptance {
1643 true => ok += 1,
1644 false => ko += 1,
1645 }
1646 }
1647 }
1648 (ok, ko)
1649}
1650
1651fn insert_or_replace_and_check<T: PartialEq + Eq + Hash>(
1652 set: &mut HashSet<T>,
1653 new_value: T,
1654) -> bool {
1655 let replaced = set.remove(&new_value);
1656 set.insert(new_value);
1657 replaced
1658}
1659
1660fn hash_match_after_patch(
1661 evaluation: &EvaluationResponse,
1662 json_patch: ValueWrapper,
1663 mut prev_properties: ValueWrapper,
1664) -> Result<bool, EventError> {
1665 if !evaluation.eval_success {
1666 let state_hash_calculated = DigestIdentifier::from_serializable_borsh(
1667 &prev_properties,
1668 evaluation.state_hash.derivator,
1669 )
1670 .map_err(|_| {
1671 EventError::CryptoError(String::from("Error calculating the hash of the state"))
1672 })?;
1673 Ok(state_hash_calculated == evaluation.state_hash)
1674 } else {
1675 let Ok(patch_json) = serde_json::from_value::<Patch>(json_patch.0) else {
1676 return Err(EventError::ErrorParsingJsonString(
1677 "Error Parsing Patch".to_owned(),
1678 ));
1679 };
1680 let Ok(()) = patch(&mut prev_properties.0, &patch_json) else {
1681 return Err(EventError::ErrorApplyingPatch(
1682 "Error applying patch".to_owned(),
1683 ));
1684 };
1685 let state_hash_calculated = DigestIdentifier::from_serializable_borsh(
1686 &prev_properties,
1687 evaluation.state_hash.derivator,
1688 )
1689 .map_err(|_| {
1690 EventError::CryptoError(String::from("Error calculating the hash of the state"))
1691 })?;
1692 Ok(state_hash_calculated == evaluation.state_hash)
1693 }
1694}