taple_core/event/
event_completer.rs

1use std::collections::{HashMap, HashSet};
2
3use json_patch::{patch, Patch};
4use log::warn;
5
6use crate::{
7    commons::{
8        channel::SenderEnd,
9        models::{
10            approval::UniqueApproval,
11            evaluation::{EvaluationRequest, SubjectContext},
12            event::Event,
13            event::Metadata,
14            state::{generate_subject_id, Subject},
15            validation::ValidationProof,
16            HashId,
17        },
18        self_signature_manager::SelfSignatureManager,
19    },
20    crypto::KeyPair,
21    governance::{stage::ValidationStage, GovernanceAPI, GovernanceInterface},
22    identifier::{Derivable, DigestIdentifier, KeyIdentifier},
23    ledger::{LedgerCommand, LedgerResponse},
24    message::{MessageConfig, MessageTaskCommand},
25    protocol::protocol_message_manager::TapleMessages,
26    request::StartRequest,
27    request::TapleRequest,
28    signature::{Signature, Signed, UniqueSignature},
29    utils::message::{
30        approval::create_approval_request, evaluator::create_evaluator_request,
31        ledger::request_gov_event, validation::create_validator_request,
32    },
33    validation::ValidationEvent,
34    ApprovalRequest, ApprovalResponse, DatabaseCollection, DigestDerivator, EvaluationResponse,
35    EventRequest, Notification, ValueWrapper,
36};
37use std::hash::Hash;
38
39use super::errors::EventError;
40use crate::database::DB;
41
42const TIMEOUT: u32 = 2000;
43// const GET_ALL: isize = 200;
44const QUORUM_PORCENTAGE_AMPLIFICATION: f64 = 0.2;
45
46#[allow(dead_code)]
47pub struct EventCompleter<C: DatabaseCollection> {
48    gov_api: GovernanceAPI,
49    database: DB<C>,
50    message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
51    notification_tx: tokio::sync::mpsc::Sender<Notification>,
52    ledger_sender: SenderEnd<LedgerCommand, LedgerResponse>,
53    own_identifier: KeyIdentifier,
54    subjects_by_governance: HashMap<DigestIdentifier, HashSet<DigestIdentifier>>,
55    subjects_completing_event:
56        HashMap<DigestIdentifier, (ValidationStage, HashSet<KeyIdentifier>, (u32, u32))>,
57    // actual_sn: HashMap<DigestIdentifier, u64>,
58    // virtual_state: HashMap<DigestIdentifier, Value>,
59    // Evaluation HashMaps
60    event_pre_evaluations: HashMap<DigestIdentifier, EvaluationRequest>,
61    event_evaluations:
62        HashMap<DigestIdentifier, HashSet<(UniqueSignature, bool, DigestIdentifier)>>,
63    // Approval HashMaps
64    approval_eval_signatures: HashMap<DigestIdentifier, HashSet<Signature>>,
65    approval_requests: HashMap<DigestIdentifier, Signed<ApprovalRequest>>,
66    event_approvations: HashMap<DigestIdentifier, HashSet<UniqueApproval>>,
67    // Validation HashMaps
68    events_to_validate: HashMap<DigestIdentifier, Signed<Event>>,
69    event_validations: HashMap<DigestIdentifier, HashSet<UniqueSignature>>,
70    event_validation_events: HashMap<DigestIdentifier, ValidationEvent>,
71    // SignatureManager
72    signature_manager: SelfSignatureManager,
73    derivator: DigestDerivator,
74}
75
76#[allow(dead_code)]
77impl<C: DatabaseCollection> EventCompleter<C> {
78    pub fn new(
79        gov_api: GovernanceAPI,
80        database: DB<C>,
81        message_channel: SenderEnd<MessageTaskCommand<TapleMessages>, ()>,
82        notification_tx: tokio::sync::mpsc::Sender<Notification>,
83        ledger_sender: SenderEnd<LedgerCommand, LedgerResponse>,
84        own_identifier: KeyIdentifier,
85        signature_manager: SelfSignatureManager,
86        derivator: DigestDerivator,
87    ) -> Self {
88        Self {
89            gov_api,
90            database,
91            message_channel,
92            notification_tx,
93            ledger_sender,
94            subjects_completing_event: HashMap::new(),
95            // actual_sn: HashMap::new(),
96            // virtual_state: HashMap::new(),
97            event_pre_evaluations: HashMap::new(),
98            event_evaluations: HashMap::new(),
99            approval_eval_signatures: HashMap::new(),
100            approval_requests: HashMap::new(),
101            events_to_validate: HashMap::new(),
102            event_approvations: HashMap::new(),
103            event_validations: HashMap::new(),
104            subjects_by_governance: HashMap::new(),
105            event_validation_events: HashMap::new(),
106            own_identifier,
107            signature_manager,
108            derivator,
109        }
110    }
111
112    fn create_validation_event_from_genesis(
113        &self,
114        create_request: StartRequest,
115        event_hash: DigestIdentifier,
116        governance_version: u64,
117        subject_id: DigestIdentifier,
118        subject_keys: &KeyPair,
119    ) -> Result<ValidationEvent, EventError> {
120        let validation_proof = ValidationProof::new_from_genesis_event(
121            create_request,
122            event_hash,
123            governance_version,
124            subject_id,
125        );
126        let subject_signature = Signature::new(&validation_proof, subject_keys, self.derivator)?;
127        Ok(ValidationEvent {
128            proof: validation_proof,
129            subject_signature,
130            previous_proof: None,
131            prev_event_validation_signatures: HashSet::new(),
132        })
133    }
134
135    fn create_validation_event(
136        &self,
137        subject: &Subject,
138        event: &Signed<Event>,
139        gov_version: u64,
140    ) -> Result<ValidationEvent, EventError> {
141        let proof = match &event.content.event_request.content {
142            EventRequest::Create(_) | EventRequest::Fact(_) | EventRequest::EOL(_) => {
143                ValidationProof::new(
144                    subject,
145                    event.content.sn,
146                    event.content.hash_prev_event.clone(),
147                    event.content.hash_id(self.derivator)?,
148                    gov_version,
149                )
150            }
151            EventRequest::Transfer(transfer_request) => ValidationProof::new_from_transfer_event(
152                subject,
153                event.content.sn,
154                event.content.hash_prev_event.clone(),
155                event.content.hash_id(self.derivator)?,
156                gov_version,
157                transfer_request.public_key.clone(),
158            ),
159        };
160        let (prev_event_validation_signatures, previous_proof) = {
161            let (prev_event_validation_signatures, previous_proof) = self
162                .database
163                .get_signatures(&subject.subject_id, subject.sn)
164                .map_err(|e| {
165                    EventError::DatabaseError(format!(
166                        "Error getting the signatures of the previous event: {}",
167                        e
168                    ))
169                })?;
170            (prev_event_validation_signatures, Some(previous_proof))
171        };
172        match &subject.keys {
173            Some(keys) => {
174                let subject_signature = Signature::new(&proof, keys, self.derivator)?;
175                Ok(ValidationEvent {
176                    proof,
177                    subject_signature,
178                    previous_proof,
179                    prev_event_validation_signatures,
180                })
181            }
182            None => Err(EventError::SubjectNotOwned(subject.subject_id.to_str()))?,
183        }
184    }
185
186    pub async fn init(&mut self) -> Result<(), EventError> {
187        // Fill actual_sn with the last sn of last event created (not necessarily validated) of each subject
188        let subjects = self.database.get_all_subjects();
189        for subject in subjects.iter() {
190            if subject.schema_id != "governance" {
191                self.subjects_by_governance
192                    .entry(subject.governance_id.clone())
193                    .or_insert_with(HashSet::new)
194                    .insert(subject.subject_id.clone());
195            }
196            match self.database.get_prevalidated_event(&subject.subject_id) {
197                Ok(last_event) => {
198                    let gov_version = self
199                        .gov_api
200                        .get_governance_version(
201                            subject.governance_id.clone(),
202                            subject.subject_id.clone(),
203                        )
204                        .await?;
205                    let metadata = Metadata {
206                        namespace: subject.namespace.clone(),
207                        subject_id: subject.subject_id.clone(),
208                        governance_id: subject.governance_id.clone(),
209                        governance_version: gov_version, // Not needed
210                        schema_id: subject.schema_id.clone(),
211                    };
212                    let stage = ValidationStage::Validate;
213                    let (signers, quorum_size) =
214                        self.get_signers_and_quorum(metadata, stage.clone()).await?;
215                    let validation_event =
216                        self.create_validation_event(subject, &last_event, gov_version)?;
217                    let event_message = create_validator_request(validation_event.clone());
218                    self.ask_signatures(
219                        &subject.subject_id,
220                        event_message,
221                        signers.clone(),
222                        quorum_size,
223                    )
224                    .await?;
225                    let last_event_hash = DigestIdentifier::from_serializable_borsh(
226                        &last_event.content,
227                        self.derivator,
228                    )
229                    .map_err(|_| {
230                        EventError::CryptoError("Error generating last event hash".to_owned())
231                    })?;
232                    self.event_validation_events
233                        .insert(last_event_hash.clone(), validation_event);
234                    self.events_to_validate.insert(last_event_hash, last_event);
235                    self.subjects_completing_event.insert(
236                        subject.subject_id.clone(),
237                        (stage, signers, (quorum_size, 0)),
238                    );
239                    continue;
240                }
241                Err(error) => match error {
242                    crate::DbError::EntryNotFound => {}
243                    _ => return Err(EventError::DatabaseError(error.to_string())),
244                },
245            }
246            // Check if there are requests in the database that correspond to events that have not yet reached the validation phase and should be restarted from requesting evaluations.
247            match self.database.get_request(&subject.subject_id) {
248                Ok(event_request) => {
249                    self.new_event(event_request).await?;
250                }
251                Err(error) => match error {
252                    crate::DbError::EntryNotFound => {}
253                    _ => return Err(EventError::DatabaseError(error.to_string())),
254                },
255            }
256            // self.virtual_state.insert(
257            //     subject.subject_id.to_owned(),
258            //     serde_json::from_str(&subject.properties).expect("This should be OK"),
259            // );
260            // let mut last_event_sn = subject.sn;
261            // let mut post_validated_events = self
262            //     .database
263            //     .get_events_by_range(&subject.subject_id, Some((subject.sn + 1) as i64), GET_ALL)
264            //     .map_err(|error| EventError::DatabaseError(error.to_string()))?;
265            // for event in post_validated_events.into_iter() {
266            //     last_event_sn = event.event_proposal.sn;
267            //     if event.execution {
268            //         let vs = self.virtual_state.get_mut(&subject.subject_id).unwrap();
269            //         let Ok(patch_json) = serde_json::from_str::<Patch>(&event.event_proposal.evaluation.json_patch) else {
270            //             return Err(EventError::ErrorParsingJsonString(event.event_proposal.evaluation.json_patch));
271            //         };
272            //         let Ok(()) = patch(vs, &patch_json) else {
273            //             return Err(EventError::ErrorApplyingPatch(event.event_proposal.evaluation.json_patch));
274            //         };
275            //     }
276            // }
277            // self.actual_sn
278            //     .insert(subject.subject_id.to_owned(), last_event_sn);
279        }
280        Ok(())
281    }
282
283    pub async fn new_governance_version(
284        &mut self,
285        governance_id: DigestIdentifier,
286        new_version: u64,
287    ) -> Result<(), EventError> {
288        // Ask for event requests for each subject_id of the set and launch new_event with them
289        match self.subjects_by_governance.get(&governance_id).cloned() {
290            Some(subjects_affected) => {
291                for subject_id in subjects_affected.iter() {
292                    match self.database.get_request(subject_id) {
293                        Ok(event_request) => {
294                            let EventRequest::Fact(_) = &event_request.content else {
295                                return Err(EventError::GenesisInGovUpdate);
296                            };
297                            self.new_event(event_request).await?;
298                        }
299                        Err(error) => match error {
300                            crate::DbError::EntryNotFound => {}
301                            _ => {
302                                return Err(EventError::DatabaseError(error.to_string()));
303                            }
304                        },
305                    }
306                    match self.database.get_prevalidated_event(subject_id) {
307                        Ok(event_prevalidated) => {
308                            if let EventRequest::Create(_) =
309                                &event_prevalidated.content.event_request.content
310                            {
311                                // Cancel signature request
312                                self.message_channel
313                                    .tell(MessageTaskCommand::Cancel(String::from(format!(
314                                        "{}",
315                                        event_prevalidated.content.subject_id.to_str()
316                                    ))))
317                                    .await
318                                    .map_err(EventError::ChannelError)?;
319                                self.subjects_completing_event.remove(&subject_id);
320                                self.subjects_by_governance.remove(&subject_id);
321                                self.database.del_prevalidated_event(&subject_id).map_err(
322                                    |error| EventError::DatabaseError(error.to_string()),
323                                )?;
324                                self.new_event(event_prevalidated.content.event_request)
325                                    .await?;
326                                continue;
327                            }
328                            let subject = self
329                                .database
330                                .get_subject(subject_id)
331                                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
332                            let metadata = Metadata {
333                                namespace: subject.namespace.clone(),
334                                subject_id: subject_id.clone(),
335                                governance_id: subject.governance_id.clone(),
336                                governance_version: new_version,
337                                schema_id: subject.schema_id.clone(),
338                            };
339                            let validation_event = self.create_validation_event(
340                                &subject,
341                                &event_prevalidated,
342                                new_version,
343                            )?;
344                            let event_message = create_validator_request(validation_event.clone());
345                            let stage = ValidationStage::Validate;
346                            let (signers, quorum_size) =
347                                self.get_signers_and_quorum(metadata, stage.clone()).await?;
348                            self.ask_signatures(
349                                &subject_id,
350                                event_message,
351                                signers.clone(),
352                                quorum_size,
353                            )
354                            .await?;
355                            let event_prevalidated_hash =
356                                DigestIdentifier::from_serializable_borsh(
357                                    &event_prevalidated.content,
358                                    self.derivator,
359                                )
360                                .map_err(|_| {
361                                    EventError::CryptoError(
362                                        "Error generating event prevalidated hash in NGV"
363                                            .to_owned(),
364                                    )
365                                })?;
366                            self.event_validation_events
367                                .insert(event_prevalidated_hash, validation_event);
368                            // Make update of the phase the event is going through
369                            self.subjects_completing_event
370                                .insert(subject_id.clone(), (stage, signers, (quorum_size, 0)));
371                        }
372                        Err(error) => match error {
373                            crate::DbError::EntryNotFound => {}
374                            _ => {
375                                return Err(EventError::DatabaseError(error.to_string()));
376                            }
377                        },
378                    }
379                }
380            }
381            None => {}
382        }
383        Ok(())
384    }
385
386    async fn process_transfer_or_eol_event(
387        &mut self,
388        event_request: Signed<EventRequest>,
389        subject: Subject,
390        gov_version: u64,
391    ) -> Result<(), EventError> {
392        let subject_id = subject.subject_id.clone();
393        // Check if we already have an event for that subject
394        let None = self.subjects_completing_event.get(&subject_id) else {
395            return Err(EventError::EventAlreadyInProgress);
396        };
397        let metadata = Metadata {
398            namespace: subject.namespace.clone(),
399            subject_id: subject_id.clone(),
400            governance_id: subject.governance_id.clone(),
401            governance_version: gov_version,
402            schema_id: subject.schema_id.clone(),
403        };
404        match &event_request.content {
405            EventRequest::Transfer(tr) => {
406                if event_request.signature.signer == self.own_identifier {
407                    self.database
408                        .get_keys(&tr.public_key)
409                        .map_err(|_| EventError::OwnTransferKeysDbError)?;
410                }
411            }
412            EventRequest::EOL(_) => {
413                if subject.owner != event_request.signature.signer {
414                    return Err(EventError::CloseNotAuthorized(
415                        event_request.signature.signer.to_str(),
416                    ));
417                }
418            }
419            _ => unreachable!(),
420        }
421        // Add to the hashmap to be able to access it when the validator signatures arrive.
422        let event =
423            &self.create_event_prevalidated_no_eval(event_request, &subject, gov_version)?;
424        let event_hash = DigestIdentifier::from_serializable_borsh(&event.content, self.derivator)
425            .map_err(|_| EventError::CryptoError("Error generating event hash".to_owned()))?;
426        let validation_event = self.create_validation_event(&subject, &event, gov_version)?;
427        let event_message = create_validator_request(validation_event.clone());
428        self.event_validation_events
429            .insert(event_hash, validation_event);
430        let stage = ValidationStage::Validate;
431        let (signers, quorum_size) = self.get_signers_and_quorum(metadata, stage.clone()).await?;
432        self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
433            .await?;
434        // Make update of the phase the event is going through
435        self.subjects_completing_event
436            .insert(subject_id.clone(), (stage, signers, (quorum_size, 0)));
437        Ok(())
438    }
439
440    async fn generate_event_proposal(
441        &self,
442        event_request: &Signed<EventRequest>,
443        subject: &Subject,
444        gov_version: u64,
445    ) -> Result<Signed<ApprovalRequest>, EventError> {
446        // We need to get the derivator from the previous validation proof if any
447        let derivator = match self
448            .database
449            .get_signatures(&subject.subject_id, subject.sn)
450        {
451            Ok((_, proof)) => proof.event_hash.derivator,
452            Err(crate::database::Error::EntryNotFound) => self.derivator,
453            Err(e) => return Err(EventError::DatabaseError(e.to_string())),
454        };
455        let hash_prev_event = DigestIdentifier::from_serializable_borsh(
456            &self
457                .database
458                .get_event(&subject.subject_id, subject.sn)
459                .map_err(|e| EventError::DatabaseError(e.to_string()))?
460                .content,
461            derivator,
462        )
463        .map_err(|_| {
464            EventError::CryptoError("Error calculating the hash of the previous event".to_string())
465        })?;
466        let approval_request = ApprovalRequest {
467            event_request: event_request.clone(),
468            sn: subject.sn + 1,
469            gov_version,
470            patch: ValueWrapper(
471                serde_json::from_str("[]")
472                    .map_err(|_| EventError::CryptoError("Error parsing empty json".to_string()))?,
473            ),
474            state_hash: subject.properties.hash_id(self.derivator)?,
475            hash_prev_event,
476            gov_id: subject.governance_id.clone(),
477        };
478        let subject_signature = Signature::new(
479            &approval_request,
480            &subject
481                .keys
482                .as_ref()
483                .expect("Llegados a aquĆ­ tenemos que ser owner"),
484            self.derivator,
485        )
486        .map_err(|_| {
487            EventError::CryptoError(String::from("Error signing the hash of the proposal"))
488        })?;
489        Ok(Signed::<ApprovalRequest> {
490            content: approval_request,
491            signature: subject_signature,
492        })
493    }
494
495    pub async fn pre_new_event(
496        &mut self,
497        event_request: Signed<EventRequest>,
498    ) -> Result<DigestIdentifier, EventError> {
499        // Check if the content is correct (signature, invoker, etc)
500        // Signature check:
501        warn!("Entra en pre event");
502        event_request.verify().map_err(EventError::SubjectError)?;
503        warn!("After first verify");
504        let request_id = DigestIdentifier::generate_with_blake3(&event_request)
505            .map_err(|_| EventError::HashGenerationFailed)?;
506        // Comprobamos si ya tenemos la request registrada en el sistema
507        match self.database.get_taple_request(&request_id) {
508            Ok(_) => {
509                return Err(EventError::RequestAlreadyKnown);
510            }
511            Err(crate::DbError::EntryNotFound) => {}
512            Err(error) => return Err(EventError::DatabaseError(error.to_string())),
513        }
514        let subject_id: &DigestIdentifier = match &event_request.content {
515            EventRequest::Create(_) => return self.new_event(event_request).await,
516            EventRequest::Fact(fact_req) => &fact_req.subject_id,
517            EventRequest::Transfer(trans_req) => &trans_req.subject_id,
518            EventRequest::EOL(eol_req) => &eol_req.subject_id,
519        };
520        // Check if we already have an event for that subject
521        let None = self.subjects_completing_event.get(subject_id) else {
522            return Err(EventError::EventAlreadyInProgress);
523        };
524        self.new_event(event_request).await
525    }
526
527    /// Function that is called when a new event request arrives at the system, either invoked by the controller or externally
528    pub async fn new_event(
529        &mut self,
530        event_request: Signed<EventRequest>,
531    ) -> Result<DigestIdentifier, EventError> {
532        let request_id = DigestIdentifier::generate_with_blake3(&event_request)
533            .map_err(|_| EventError::HashGenerationFailed)?;
534        if let EventRequest::Create(create_request) = &event_request.content {
535            // Check if it is governance, then anything goes, otherwise check that the invoker is me and I can do it.
536            if event_request.signature.signer != self.own_identifier {
537                return Err(EventError::ExternalGenesisEvent);
538            }
539            if create_request.public_key.public_key.is_empty() {
540                return Err(EventError::PublicKeyIsEmpty);
541            }
542            // Check if i have the keys
543            let subject_keys = match self.database.get_keys(&create_request.public_key) {
544                Ok(keys) => keys,
545                Err(crate::DbError::EntryNotFound) => {
546                    return Err(EventError::SubjectKeysNotFound(
547                        create_request.public_key.to_str(),
548                    ));
549                }
550                Err(error) => return Err(EventError::DatabaseError(error.to_string())),
551            };
552            let (governance_version, initial_state) = if &create_request.schema_id != "governance" {
553                let governance_version = self
554                    .gov_api
555                    .get_governance_version(
556                        create_request.governance_id.clone(),
557                        DigestIdentifier::default(),
558                    )
559                    .await
560                    .map_err(EventError::GovernanceError)?;
561                let creation_premission = self
562                    .gov_api
563                    .get_invoke_info(
564                        Metadata {
565                            namespace: create_request.namespace.clone(),
566                            subject_id: DigestIdentifier::default(), // Not necessary for this method
567                            governance_id: create_request.governance_id.clone(),
568                            governance_version,
569                            schema_id: create_request.schema_id.clone(),
570                        },
571                        ValidationStage::Create,
572                        self.own_identifier.clone(),
573                    )
574                    .await
575                    .map_err(EventError::GovernanceError)?;
576                if !creation_premission {
577                    return Err(EventError::CreatingPermissionDenied);
578                }
579                let initial_state = self
580                    .gov_api
581                    .get_init_state(
582                        create_request.governance_id.clone(),
583                        create_request.schema_id.clone(),
584                        governance_version,
585                    )
586                    .await?;
587                (governance_version, initial_state)
588            } else {
589                let initial_state = self
590                    .gov_api
591                    .get_init_state(
592                        create_request.governance_id.clone(),
593                        create_request.schema_id.clone(),
594                        0,
595                    )
596                    .await?;
597                (0, initial_state)
598            };
599            let subject_id = generate_subject_id(
600                &create_request.namespace,
601                &create_request.schema_id,
602                create_request.public_key.to_str(),
603                create_request.governance_id.to_str(),
604                governance_version,
605                self.derivator,
606            )?;
607            // Once everything goes well, we create the pre-validated event and send it to validation.
608            let event = Signed::<Event>::from_genesis_request(
609                event_request.clone(),
610                &subject_keys,
611                governance_version,
612                &initial_state,
613                self.derivator,
614            )
615            .map_err(EventError::SubjectError)?;
616            let event_hash =
617                DigestIdentifier::from_serializable_borsh(&event.content, self.derivator)
618                    .map_err(|_| EventError::HashGenerationFailed)?;
619            let validation_event = self.create_validation_event_from_genesis(
620                create_request.clone(),
621                event_hash.clone(),
622                governance_version,
623                subject_id.clone(),
624                &subject_keys,
625            )?;
626            let metadata = validation_event.proof.get_metadata();
627            let event_message = create_validator_request(validation_event.clone());
628            let stage = ValidationStage::Validate;
629            let (signers, quorum_size) = if &create_request.schema_id != "governance" {
630                self.get_signers_and_quorum(metadata, stage.clone()).await?
631            } else {
632                let mut hs = HashSet::new();
633                hs.insert(self.own_identifier.clone());
634                (hs, 1)
635            };
636            self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
637                .await?;
638            self.event_validation_events
639                .insert(event_hash.clone(), validation_event);
640            // Make update of the phase the event is going through
641            self.events_to_validate.insert(event_hash, event.clone());
642            self.database
643                .set_taple_request(&request_id, &event_request.clone().try_into()?)
644                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
645            self.database
646                .set_prevalidated_event(&subject_id, event.clone())
647                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
648            if create_request.schema_id != "governance" {
649                self.subjects_by_governance
650                    .entry(create_request.governance_id.clone())
651                    .or_insert_with(HashSet::new)
652                    .insert(subject_id.clone());
653            }
654            self.subjects_completing_event
655                .insert(subject_id, (stage, signers, (quorum_size, 0)));
656            return Ok(request_id);
657        }
658        let subject_id = match &event_request.content {
659            EventRequest::Transfer(tr) => {
660                log::info!("Processing transfer event");
661                tr.subject_id.clone()
662            }
663            EventRequest::EOL(eolr) => {
664                log::info!("Processing EOL event");
665                eolr.subject_id.clone()
666            }
667            EventRequest::Fact(sr) => {
668                log::info!("Processing state event");
669                sr.subject_id.clone()
670            }
671            _ => unreachable!(),
672        };
673        // Comprobamos si tenemos el sujeto
674        let subject = match self.database.get_subject(&subject_id) {
675            Ok(subject) => subject,
676            Err(error) => match error {
677                crate::DbError::EntryNotFound => {
678                    return Err(EventError::SubjectNotFound(subject_id.to_str()))
679                }
680                _ => return Err(EventError::DatabaseError(error.to_string())),
681            },
682        };
683        // Check is subject life has not come to an end
684        if !subject.active {
685            return Err(EventError::SubjectLifeEnd(subject_id.to_str()));
686        }
687        // Chek if we are owner of Subject
688        if subject.keys.is_none() {
689            return Err(EventError::SubjectNotOwned(subject_id.to_str()));
690        }
691        // We obtain the current version of governance
692        let gov_version = self
693            .gov_api
694            .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
695            .await
696            .map_err(EventError::GovernanceError)?;
697        match &event_request.content {
698            EventRequest::Transfer(_) | EventRequest::EOL(_) => {
699                // TRANSFER
700                // We must remove the cryptographic material of the current subject and change its public key.
701                // However, the event must be signed with the current key, so it cannot be deleted
702                // immediately. It must therefore be deleted after validation.
703                // These events are neither evaluated nor approved.
704                // It is not necessary to check the governance, as no permissions are required for the transfer.
705                self.process_transfer_or_eol_event(
706                    event_request.clone(),
707                    subject.clone(),
708                    gov_version,
709                )
710                .await?;
711            }
712            EventRequest::Fact(_) => {
713                // Request evaluation signatures, sending request, sn and signature of everything about the subject
714                // Get the list of evaluators
715                let (metadata, stage) = (
716                    Metadata {
717                        namespace: subject.namespace,
718                        subject_id: subject_id.clone(),
719                        governance_id: subject.governance_id.clone(),
720                        governance_version: gov_version,
721                        schema_id: subject.schema_id,
722                    },
723                    ValidationStage::Evaluate,
724                );
725                // Check the invoker can Invoke for this subject
726                if event_request.signature.signer != self.own_identifier
727                    && !self
728                        .gov_api
729                        .get_invoke_info(
730                            metadata.clone(),
731                            ValidationStage::Invoke,
732                            event_request.signature.signer.clone(),
733                        )
734                        .await
735                        .map_err(EventError::GovernanceError)?
736                {
737                    return Err(EventError::InvokePermissionDenied(
738                        event_request.signature.signer.to_str(),
739                        subject_id.to_str(),
740                    ));
741                };
742                let event_preevaluation = EvaluationRequest {
743                    event_request: event_request.clone(),
744                    context: SubjectContext {
745                        governance_id: metadata.governance_id.clone(),
746                        schema_id: metadata.schema_id.clone(),
747                        is_owner: subject.owner == event_request.signature.signer,
748                        state: subject.properties,
749                        // serde_json::to_string(self.virtual_state.get(&subject_id).unwrap())
750                        //     .map_err(|_| EventError::ErrorParsingValue)?, // Must be Some, filled in init function
751                        namespace: metadata.namespace.clone(),
752                    },
753                    gov_version,
754                    sn: subject.sn + 1,
755                    // self.actual_sn.get(&subject_id).unwrap().to_owned() + 1, // Must be Some, filled in init function
756                };
757                let (signers, quorum_size) =
758                    self.get_signers_and_quorum(metadata, stage.clone()).await?;
759                // log::info!(
760                //     "{} PIDIENDO FIRMAS DE EVALUACIƓN {} PARA: {}",
761                //     subject.sn + 1,
762                //     quorum_size,
763                //     subject.subject_id.to_str()
764                // );
765                // log::info!("SIGNERS::::");
766                // for signer in signers.iter() {
767                //     log::warn!("{}", signer.to_str());
768                // }
769                let event_preevaluation_hash =
770                    DigestIdentifier::generate_with_blake3(&event_preevaluation).map_err(|_| {
771                        EventError::CryptoError(String::from(
772                            "Error calculating the hash of the event pre-evaluation",
773                        ))
774                    })?;
775                self.event_pre_evaluations
776                    .insert(event_preevaluation_hash, event_preevaluation.clone());
777                // Add the event to the hashset to not complete two at the same time for the same subject
778                let negative_quorum_size = (signers.len() as u32 - quorum_size) + 1;
779                self.subjects_completing_event.insert(
780                    subject_id.clone(),
781                    (stage, signers.clone(), (quorum_size, negative_quorum_size)),
782                );
783                self.ask_signatures(
784                    &subject_id,
785                    create_evaluator_request(event_preevaluation.clone()),
786                    signers.clone(),
787                    quorum_size,
788                )
789                .await?;
790            }
791            EventRequest::Create(_) => unreachable!(),
792        }
793        self.subjects_by_governance
794            .entry(subject.governance_id.clone())
795            .or_insert_with(HashSet::new)
796            .insert(subject.governance_id);
797        let mut request_data: TapleRequest = event_request.clone().try_into()?;
798        request_data.sn = Some(subject.sn + 1);
799        request_data.subject_id = Some(subject.subject_id.clone());
800        self.database
801            .set_taple_request(&request_id, &request_data)
802            .map_err(|error| EventError::DatabaseError(error.to_string()))?;
803        self.database
804            .set_request(&subject.subject_id, event_request)
805            .map_err(|error| EventError::DatabaseError(error.to_string()))?;
806        Ok(request_id)
807    }
808
809    pub async fn evaluator_signatures(
810        &mut self,
811        evaluator_response: Signed<EvaluationResponse>,
812    ) -> Result<(), EventError> {
813        // Check that the returned hash matches the hash from the pre-evaluation
814        let evaluation_request = match self
815            .event_pre_evaluations
816            .get(&evaluator_response.content.eval_req_hash)
817        {
818            Some(preevaluation_event) => preevaluation_event,
819            None => return Err(EventError::CryptoError(String::from(
820                "The hash of the event pre-evaluation does not match any of the pre-evaluations",
821            ))),
822        };
823
824        let subject_id = match &evaluation_request.event_request.content {
825            // The transfer is not evaluated
826            EventRequest::Transfer(_) => return Err(EventError::NoEvaluationForTransferEvents),
827            EventRequest::EOL(_) => return Err(EventError::NoEvaluationForEOLEvents),
828            EventRequest::Create(_) => {
829                return Err(EventError::EvaluationOrApprovationInCreationEvent)
830            }
831            EventRequest::Fact(state_request) => state_request.subject_id.clone(),
832        };
833        // Look at the status of the event, whether it is under evaluation or not.
834        let Some((ValidationStage::Evaluate, signers, quorum_size)) =
835            self.subjects_completing_event.get(&subject_id)
836        else {
837            return Err(EventError::WrongEventPhase);
838        };
839        let signer = evaluator_response.signature.signer.clone();
840        // Comprobar si el evaluador estĆ” en la lista de evaluadores
841        if !signers.contains(&signer) {
842            return Err(EventError::CryptoError(String::from(
843                "The signer is not in the list of evaluators or we already have the signature",
844            )));
845        }
846        // Check that everything is cryptographically correct
847        evaluator_response
848            .verify()
849            .map_err(|error| EventError::CryptoError(error.to_string()))?;
850        let evaluation_hash = evaluator_response.content.hash_id(self.derivator)?;
851        // Get subject to know if we have it and the subject metadata
852        let subject = self
853            .database
854            .get_subject(&subject_id)
855            .map_err(|error| match error {
856                crate::DbError::EntryNotFound => EventError::SubjectNotFound(subject_id.to_str()),
857                _ => EventError::DatabaseError(error.to_string()),
858            })?;
859        // Check if the governance version matches ours, if not we do not accept it.
860        let governance_version = self
861            .gov_api
862            .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
863            .await
864            .map_err(EventError::GovernanceError)?;
865        // Check that the json patch is valid
866        if !hash_match_after_patch(
867            &evaluator_response.content,
868            evaluator_response.content.patch.clone(),
869            subject.properties.clone(),
870        )? {
871            return Err(EventError::CryptoError(
872                "Json patch applied to state hash does not match the new state hash".to_string(),
873            ));
874        }
875        // Save evaluation
876        let signatures_set = match self
877            .event_evaluations
878            .get_mut(&evaluator_response.content.eval_req_hash)
879        {
880            Some(signatures_set) => {
881                insert_or_replace_and_check(
882                    signatures_set,
883                    (
884                        UniqueSignature {
885                            signature: evaluator_response.signature.clone(),
886                        },
887                        evaluator_response.content.eval_success.clone(),
888                        evaluation_hash.clone(),
889                    ),
890                );
891                signatures_set
892            }
893            None => {
894                let mut new_signatures_set = HashSet::new();
895                new_signatures_set.insert((
896                    UniqueSignature {
897                        signature: evaluator_response.signature.clone(),
898                    },
899                    evaluator_response.content.eval_success.clone(),
900                    evaluation_hash.clone(),
901                ));
902                self.event_evaluations.insert(
903                    evaluator_response.content.eval_req_hash.clone(),
904                    new_signatures_set,
905                );
906                self.event_evaluations
907            .get_mut(&evaluator_response.content.eval_req_hash)
908            .expect("Acabamos de insertar el conjunto de firmas, por lo que debe estar presente")
909            }
910        };
911        let (num_signatures_hash_ok, num_signatures_hash_ko) =
912            count_signatures_with_event_content_hash(&signatures_set, &evaluation_hash);
913        let (quorum_size, negative_quorum_size) = quorum_size.to_owned();
914        // Check if we reach Quorum
915        let quorum_reached = {
916            if num_signatures_hash_ok >= quorum_size {
917                Some(true)
918            } else if num_signatures_hash_ko >= negative_quorum_size {
919                Some(false)
920            } else {
921                None
922            }
923        };
924        if quorum_reached.is_none() {
925            let mut new_signers: HashSet<KeyIdentifier> =
926                signers.into_iter().map(|s| s.clone()).collect();
927            new_signers.remove(&signer);
928            self.ask_signatures(
929                &subject_id,
930                create_evaluator_request(evaluation_request.clone()),
931                new_signers.clone(),
932                quorum_size,
933            )
934            .await?;
935            self.subjects_completing_event.insert(
936                subject_id,
937                (
938                    ValidationStage::Evaluate,
939                    new_signers,
940                    (quorum_size, negative_quorum_size),
941                ),
942            );
943            return Ok(()); // We don't reach quorum, we do nothing
944        } else {
945            // If so check that json patch applied to the event stop the signature request and start asking for approves with the complete event with the new obtained in this phase if approves are required, otherwise inform validator.
946            // Check that when applying Json Patch we reach the final state?
947            let evaluator_signatures: HashSet<Signature> = signatures_set
948                .iter()
949                .filter(|(_, acceptance, hash)| {
950                    hash == &evaluation_hash && quorum_reached.as_ref().unwrap() == acceptance
951                })
952                .map(|(signature, _, _)| signature.signature.clone())
953                .collect();
954            // We need to get the derivator from the previous validation proof if any
955            let derivator = match self
956                .database
957                .get_signatures(&subject.subject_id, subject.sn)
958            {
959                Ok((_, proof)) => proof.event_hash.derivator,
960                Err(crate::database::Error::EntryNotFound) => self.derivator,
961                Err(e) => return Err(EventError::DatabaseError(e.to_string())),
962            };
963            let hash_prev_event = DigestIdentifier::from_serializable_borsh(
964                &self
965                    .database
966                    .get_event(&subject.subject_id, subject.sn)
967                    .map_err(|e| EventError::DatabaseError(e.to_string()))?
968                    .content,
969                derivator,
970            )
971            .map_err(|_| {
972                EventError::CryptoError(
973                    "Error calculating the hash of the previous event".to_owned(),
974                )
975            })?;
976            let metadata = Metadata {
977                namespace: subject.namespace.clone(),
978                subject_id: subject_id.clone(),
979                governance_id: subject.governance_id.clone(),
980                governance_version,
981                schema_id: subject.schema_id.clone(),
982            };
983            if evaluator_response.content.appr_required && !evaluator_response.content.eval_success
984            {
985                return Err(EventError::ApprovalRequiredWhenEvalFailed);
986            }
987            // Ask for Approves if necessary, otherwise ask for validations.
988            let (stage, event_message) = if evaluator_response.content.appr_required {
989                let approval_request = ApprovalRequest {
990                    event_request: evaluation_request.event_request.clone(),
991                    sn: evaluation_request.sn,
992                    gov_version: governance_version,
993                    patch: evaluator_response.content.patch,
994                    state_hash: evaluator_response.content.state_hash,
995                    hash_prev_event,
996                    gov_id: subject.governance_id.clone(),
997                };
998                let approval_request_hash = approval_request
999                    .hash_id(DigestDerivator::Blake3_256)
1000                    .map_err(|_| {
1001                    EventError::CryptoError(String::from(
1002                        "Error calculating the hash of the proposal",
1003                    ))
1004                })?;
1005                let subject_keys = subject
1006                    .keys
1007                    .as_ref()
1008                    .expect("Llegados a aquĆ­ tenemos que ser owner");
1009                let subject_signature = Signature::new(
1010                    &approval_request,
1011                    &subject_keys,
1012                    self.derivator,
1013                )
1014                .map_err(|_| {
1015                    EventError::CryptoError(String::from("Error signing the Approval Request"))
1016                })?;
1017                let approval_request = Signed::<ApprovalRequest> {
1018                    content: approval_request,
1019                    signature: subject_signature,
1020                };
1021                // Add to the hashmap to be able to access it when the signatures of the evaluators arrive.
1022                self.approval_eval_signatures
1023                    .insert(approval_request_hash.clone(), evaluator_signatures.clone());
1024                self.approval_requests
1025                    .insert(approval_request_hash, approval_request.clone());
1026                let msg = create_approval_request(approval_request);
1027                // Return TapleMessage directly
1028                (ValidationStage::Approve, msg)
1029            } else {
1030                // No approval required
1031                let gov_version = self
1032                    .gov_api
1033                    .get_governance_version(
1034                        subject.governance_id.clone(),
1035                        subject.subject_id.clone(),
1036                    )
1037                    .await?;
1038                let event = Event {
1039                    subject_id: subject_id.clone(),
1040                    event_request: evaluation_request.event_request.clone(),
1041                    sn: evaluation_request.sn,
1042                    gov_version: governance_version,
1043                    patch: evaluator_response.content.patch,
1044                    state_hash: evaluator_response.content.state_hash,
1045                    eval_success: evaluator_response.content.eval_success,
1046                    appr_required: evaluator_response.content.appr_required,
1047                    approved: true,
1048                    hash_prev_event,
1049                    evaluators: evaluator_signatures,
1050                    approvers: HashSet::new(),
1051                };
1052                let event_hash = event.hash_id(self.derivator)?;
1053                let subject_keys = subject
1054                    .keys
1055                    .as_ref()
1056                    .expect("Llegados a aquĆ­ tenemos que ser owner");
1057                let subject_signature = Signature::new(&event, &subject_keys, self.derivator)
1058                    .map_err(|_| {
1059                        EventError::CryptoError(String::from("Error signing the Event"))
1060                    })?;
1061                let signed_event = Signed::<Event> {
1062                    content: event,
1063                    signature: subject_signature,
1064                };
1065                let validation_event =
1066                    self.create_validation_event(&subject, &signed_event, gov_version)?;
1067                let event_message = create_validator_request(validation_event.clone());
1068                self.event_validation_events
1069                    .insert(event_hash.clone(), validation_event);
1070                self.events_to_validate
1071                    .insert(event_hash, signed_event.clone());
1072                self.database
1073                    .set_prevalidated_event(&subject.subject_id, signed_event)
1074                    .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1075                self.database
1076                    .del_request(&subject.subject_id)
1077                    .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1078                (ValidationStage::Validate, event_message)
1079            };
1080            // Clean HashMaps
1081            self.event_evaluations
1082                .remove(&evaluator_response.content.eval_req_hash);
1083            self.event_pre_evaluations
1084                .remove(&evaluator_response.content.eval_req_hash);
1085            let (signers, quorum_size) =
1086                self.get_signers_and_quorum(metadata, stage.clone()).await?;
1087            self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
1088                .await?;
1089            // Make update of the phase the event is going through
1090            let negative_quorum_size = (signers.len() as u32 - quorum_size) + 1;
1091            self.subjects_completing_event.insert(
1092                subject_id.clone(),
1093                (stage, signers, (quorum_size, negative_quorum_size)),
1094            );
1095        }
1096        Ok(())
1097    }
1098
1099    pub async fn approver_signatures(
1100        &mut self,
1101        approval: Signed<ApprovalResponse>,
1102    ) -> Result<(), EventError> {
1103        // Check the status of the event, if it is in approval or not.
1104        let approval_request = match self.approval_requests.get(&approval.content.appr_req_hash) {
1105            Some(event_proposal) => event_proposal,
1106            None => {
1107                return Err(EventError::CryptoError(String::from(
1108                    "The hash of the event proposal does not match any of the proposals",
1109                )))
1110            }
1111        };
1112        let subject_id = match &approval_request.content.event_request.content {
1113            // The transfer is not approved
1114            EventRequest::Transfer(_) => return Err(EventError::NoApprovalForTransferEvents),
1115            // EOL is not approved
1116            EventRequest::EOL(_) => return Err(EventError::NoApprovalForEOLEvents),
1117            EventRequest::Create(_) => {
1118                return Err(EventError::EvaluationOrApprovationInCreationEvent)
1119            }
1120            EventRequest::Fact(state_request) => state_request.subject_id.clone(),
1121        };
1122        let Some((ValidationStage::Approve, signers, quorum_size)) =
1123            self.subjects_completing_event.get(&subject_id)
1124        else {
1125            return Err(EventError::WrongEventPhase);
1126        };
1127        let signer = approval.signature.signer.clone();
1128        // Check if approver is in the list of approvers
1129        if !signers.contains(&signer) {
1130            return Err(EventError::CryptoError(String::from(
1131                "The signer is not in the list of approvers or we already have his approve",
1132            )));
1133        }
1134        // Check that everything is cryptographically correct
1135        approval
1136            .verify()
1137            .map_err(|error| EventError::CryptoError(error.to_string()))?;
1138        // Get subject to know if we have it and the subject metadata
1139        let subject = self
1140            .database
1141            .get_subject(&subject_id)
1142            .map_err(|error| match error {
1143                crate::DbError::EntryNotFound => EventError::SubjectNotFound(subject_id.to_str()),
1144                _ => EventError::DatabaseError(error.to_string()),
1145            })?;
1146        // Save approval
1147        let approval_set = match self
1148            .event_approvations
1149            .get_mut(&approval.content.appr_req_hash)
1150        {
1151            Some(approval_set) => {
1152                insert_or_replace_and_check(
1153                    approval_set,
1154                    UniqueApproval {
1155                        approval: approval.clone(),
1156                    },
1157                );
1158                approval_set
1159            }
1160            None => {
1161                let mut new_approval_set = HashSet::new();
1162                new_approval_set.insert(UniqueApproval {
1163                    approval: approval.clone(),
1164                });
1165                self.event_approvations
1166                    .insert(approval.content.appr_req_hash.clone(), new_approval_set);
1167                self.event_approvations
1168            .get_mut(&approval.content.appr_req_hash)
1169            .expect("Acabamos de insertar el conjunto de approvals, por lo que debe estar presente")
1170            }
1171        };
1172        // Check if we reach positive or negative Quorum
1173        let num_approvals_with_same_acceptance = approval_set
1174            .iter()
1175            .filter(|unique_approval| {
1176                unique_approval.approval.content.approved == approval.content.approved
1177            })
1178            .count() as u32;
1179        let (quorum_size_now, _) = match approval.content.approved {
1180            true => (quorum_size.0, true),
1181            false => (quorum_size.1, false),
1182        };
1183        if num_approvals_with_same_acceptance < quorum_size_now {
1184            // We did not reach quorum for Approval
1185            let mut new_signers: HashSet<KeyIdentifier> =
1186                signers.into_iter().map(|s| s.clone()).collect();
1187            new_signers.remove(&signer);
1188            self.ask_signatures(
1189                &subject_id,
1190                create_approval_request(approval_request.to_owned()),
1191                signers.clone(),
1192                quorum_size_now,
1193            )
1194            .await?;
1195            // Make update of the phase the event is going through
1196            self.subjects_completing_event.insert(
1197                subject_id,
1198                (
1199                    ValidationStage::Approve,
1200                    new_signers,
1201                    quorum_size.to_owned(),
1202                ),
1203            );
1204            Ok(()) // We don't reach quorum, we do nothing
1205        } else {
1206            let governance_version = self
1207                .gov_api
1208                .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
1209                .await
1210                .map_err(EventError::GovernanceError)?;
1211            // If Quorum is reached, we stop asking for approves and start asking for validations with the complete event including the new approves.
1212            let metadata = Metadata {
1213                namespace: subject.namespace.clone(),
1214                subject_id: subject_id.clone(),
1215                governance_id: subject.governance_id.clone(),
1216                governance_version,
1217                schema_id: subject.schema_id.clone(),
1218            };
1219            // We create the final event
1220            let approvals: HashSet<Signature> = approval_set
1221                .iter()
1222                .filter(|unique_approval| {
1223                    unique_approval.approval.content.approved == approval.content.approved
1224                })
1225                .map(|approval| approval.approval.signature.clone())
1226                .collect();
1227            let event_proposal = self
1228                .approval_requests
1229                .get(&approval.content.appr_req_hash)
1230                .unwrap();
1231            let gov_version = self
1232                .gov_api
1233                .get_governance_version(subject.governance_id.clone(), subject.subject_id.clone())
1234                .await?;
1235            let evaluators = self
1236                .approval_eval_signatures
1237                .get(&approval.content.appr_req_hash)
1238                .unwrap()
1239                .to_owned();
1240            let event = Event {
1241                subject_id: subject_id.clone(),
1242                event_request: event_proposal.content.event_request.clone(),
1243                sn: event_proposal.content.sn,
1244                gov_version,
1245                patch: event_proposal.content.patch.clone(),
1246                state_hash: event_proposal.content.state_hash.clone(),
1247                eval_success: true,
1248                appr_required: true,
1249                approved: approval.content.approved,
1250                hash_prev_event: event_proposal.content.hash_prev_event.clone(),
1251                evaluators,
1252                approvers: approvals,
1253            };
1254            let event_hash = event.hash_id(self.derivator)?;
1255            let subject_keys = subject
1256                .keys
1257                .as_ref()
1258                .expect("Llegados a aquĆ­ tenemos que ser owner");
1259            let subject_signature =
1260                Signature::new(&event, &subject_keys, self.derivator).map_err(|_| {
1261                    EventError::CryptoError(String::from(
1262                        "Error signing the Event (Approval stage)",
1263                    ))
1264                })?;
1265            let signed_event = Signed::<Event> {
1266                content: event,
1267                signature: subject_signature,
1268            };
1269            let validation_event =
1270                self.create_validation_event(&subject, &signed_event, gov_version)?;
1271            let event_message = create_validator_request(validation_event.clone());
1272            // Clean HashMaps
1273            self.approval_eval_signatures
1274                .remove(&approval.content.appr_req_hash);
1275            self.approval_requests
1276                .remove(&approval.content.appr_req_hash);
1277            self.event_approvations
1278                .remove(&approval.content.appr_req_hash);
1279            let stage = ValidationStage::Validate;
1280            let (signers, quorum_size) =
1281                self.get_signers_and_quorum(metadata, stage.clone()).await?;
1282            self.ask_signatures(&subject_id, event_message, signers.clone(), quorum_size)
1283                .await?;
1284            self.event_validation_events
1285                .insert(event_hash.clone(), validation_event);
1286            self.events_to_validate
1287                .insert(event_hash, signed_event.clone());
1288            self.database
1289                .set_prevalidated_event(&subject.subject_id, signed_event)
1290                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1291            self.database
1292                .del_request(&subject.subject_id)
1293                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1294            // Make update of the phase the event is going through
1295            self.subjects_completing_event
1296                .insert(subject_id, (stage, signers, (quorum_size, 0)));
1297            Ok(())
1298        }
1299    }
1300
1301    pub async fn validation_signatures(
1302        &mut self,
1303        event_hash: DigestIdentifier,
1304        signature: Signature,
1305        governance_version: u64,
1306    ) -> Result<(), EventError> {
1307        // Look at the status of the event, whether it is in validation or not.
1308        let event = match self.events_to_validate.get(&event_hash) {
1309            Some(event) => event,
1310            None => {
1311                return Err(EventError::CryptoError(String::from(
1312                    "The hash of the event does not match any of the events 1",
1313                )));
1314            }
1315        };
1316        let validation_event = self
1317            .event_validation_events
1318            .get(&event_hash)
1319            .expect("Should be");
1320        let subject_id = match &event.content.event_request.content {
1321            EventRequest::Transfer(transfer_request) => transfer_request.subject_id.clone(),
1322            EventRequest::EOL(eol_request) => eol_request.subject_id.clone(),
1323            EventRequest::Create(create_request) => generate_subject_id(
1324                &create_request.namespace,
1325                &create_request.schema_id,
1326                create_request.public_key.to_str(),
1327                create_request.governance_id.to_str(),
1328                event.content.gov_version,
1329                self.derivator,
1330            )?,
1331            EventRequest::Fact(state_request) => state_request.subject_id.clone(),
1332        };
1333        // Get subject to know if we have it and the subject metadata
1334        let subject = match self.database.get_subject(&subject_id) {
1335            Ok(subject) => Some(subject),
1336            Err(error) => match error {
1337                crate::DbError::EntryNotFound => None,
1338                _ => return Err(EventError::DatabaseError(error.to_string())),
1339            },
1340        };
1341        let (our_governance_version, governance_id) = if event.content.sn == 0 && subject.is_none()
1342        {
1343            if let EventRequest::Create(create_request) = &event.content.event_request.content {
1344                if create_request.schema_id == "governance" {
1345                    (0, create_request.governance_id.clone())
1346                } else {
1347                    (
1348                        self.gov_api
1349                            .get_governance_version(
1350                                create_request.governance_id.clone(),
1351                                subject_id.clone(),
1352                            )
1353                            .await
1354                            .map_err(EventError::GovernanceError)?,
1355                        create_request.governance_id.clone(),
1356                    )
1357                }
1358            } else {
1359                return Err(EventError::Event0NotCreate);
1360            }
1361        } else if subject.is_some() && event.content.sn != 0 {
1362            let subject = subject.unwrap();
1363            if subject.schema_id == "governance" {
1364                (subject.sn, subject.subject_id.clone())
1365            } else {
1366                (
1367                    self.gov_api
1368                        .get_governance_version(
1369                            subject.governance_id.clone(),
1370                            subject.subject_id.clone(),
1371                        )
1372                        .await
1373                        .map_err(EventError::GovernanceError)?,
1374                    subject.governance_id,
1375                )
1376            }
1377        } else {
1378            return Err(EventError::SubjectNotFound(subject_id.to_str()));
1379        };
1380        if our_governance_version < governance_version {
1381            // We ignore the validation signature because it is not valid, but we ask the validator who sent it to us for governance.
1382            let msg = request_gov_event(
1383                self.own_identifier.clone(),
1384                governance_id,
1385                our_governance_version + 1,
1386            );
1387            self.message_channel
1388                .tell(MessageTaskCommand::Request(
1389                    None,
1390                    msg,
1391                    vec![signature.signer],
1392                    MessageConfig {
1393                        timeout: 2000,
1394                        replication_factor: 1.0,
1395                    },
1396                ))
1397                .await?;
1398            return Ok(());
1399        } else if our_governance_version > governance_version {
1400            // We ignore the validation signature because it is not valid for us.
1401            return Ok(());
1402        }
1403        // Check phase
1404        let Some((ValidationStage::Validate, signers, quorum_size)) =
1405            self.subjects_completing_event.get(&subject_id)
1406        else {
1407            return Err(EventError::WrongEventPhase);
1408        };
1409        let signer = signature.signer.clone();
1410        // Check if approver is in the list of approvers
1411        if !signers.contains(&signer) {
1412            return Err(EventError::CryptoError(String::from(
1413                "The signer is not in the list of validators or we already have the validation",
1414            )));
1415        }
1416        // Check that everything is cryptographically correct
1417        signature
1418            .verify(&validation_event.proof)
1419            .map_err(|error| EventError::CryptoError(error.to_string()))?;
1420        // Save validation
1421        let validation_set = match self.event_validations.get_mut(&event_hash) {
1422            Some(validation_set) => {
1423                insert_or_replace_and_check(validation_set, UniqueSignature { signature });
1424                validation_set
1425            }
1426            None => {
1427                let mut new_validation_set = HashSet::new();
1428                new_validation_set.insert(UniqueSignature { signature });
1429                self.event_validations
1430                    .insert(event_hash.clone(), new_validation_set);
1431                self.event_validations.get_mut(&event_hash).expect(
1432                    "Acabamos de insertar el conjunto de validations, por lo que debe estar presente",
1433                )
1434            }
1435        };
1436        let quorum_size = quorum_size.to_owned();
1437        // Check if we reach Quorum and if so stop asking for signatures.
1438        if (validation_set.len() as u32) < quorum_size.0 {
1439            let event_message = create_validator_request(validation_event.to_owned());
1440            let mut new_signers: HashSet<KeyIdentifier> =
1441                signers.into_iter().map(|s| s.clone()).collect();
1442            new_signers.remove(&signer);
1443            self.ask_signatures(
1444                &subject_id,
1445                event_message,
1446                new_signers.clone(),
1447                quorum_size.0,
1448            )
1449            .await?;
1450            // Make update of the phase the event is going through
1451            self.subjects_completing_event.insert(
1452                subject_id,
1453                (ValidationStage::Validate, new_signers, quorum_size),
1454            );
1455            Ok(())
1456        } else {
1457            let validation_signatures: HashSet<Signature> = validation_set
1458                .iter()
1459                .map(|unique_signature| unique_signature.signature.clone())
1460                .collect();
1461            // If quorum is reached we send it to the ledger.
1462            // Message to Ledger Are in ask because is we use tell we can accept anotheer event with same sn if the ledger function has not finished and block the subject for validation
1463            if event.content.sn == 0 {
1464                let response = self
1465                    .ledger_sender
1466                    .ask(LedgerCommand::Genesis {
1467                        event: event.clone(),
1468                        signatures: validation_signatures,
1469                        validation_proof: validation_event.proof.clone(),
1470                    })
1471                    .await?;
1472                log::debug!("LEDGER RESPONSE: {:?}", response);
1473            } else {
1474                let response = self
1475                    .ledger_sender
1476                    .ask(LedgerCommand::OwnEvent {
1477                        event: event.clone(),
1478                        signatures: validation_signatures,
1479                        validation_proof: validation_event.proof.clone(),
1480                    })
1481                    .await?;
1482                log::debug!("LEDGER RESPONSE: {:?}", response);
1483            }
1484            self.database
1485                .del_prevalidated_event(&subject_id)
1486                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1487            self.database
1488                .del_request(&subject_id)
1489                .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1490            self.message_channel
1491                .tell(MessageTaskCommand::Cancel(String::from(format!(
1492                    "{}",
1493                    subject_id.to_str()
1494                ))))
1495                .await
1496                .map_err(EventError::ChannelError)?;
1497            self.events_to_validate.remove(&event_hash);
1498            self.event_validation_events.remove(&event_hash);
1499            self.event_validations.remove(&event_hash);
1500            self.subjects_completing_event.remove(&subject_id);
1501            Ok(())
1502        }
1503    }
1504
1505    pub async fn higher_governance_expected(
1506        &self,
1507        governance_id: DigestIdentifier,
1508        who_asked: KeyIdentifier,
1509    ) -> Result<(), EventError> {
1510        self.message_channel
1511            .tell(MessageTaskCommand::Request(
1512                None,
1513                TapleMessages::LedgerMessages(LedgerCommand::GetLCE {
1514                    who_asked: self.own_identifier.clone(),
1515                    subject_id: governance_id,
1516                }),
1517                vec![who_asked],
1518                MessageConfig {
1519                    timeout: TIMEOUT,
1520                    replication_factor: 1.0,
1521                },
1522            ))
1523            .await
1524            .map_err(EventError::ChannelError)
1525    }
1526
1527    async fn get_signers_and_quorum(
1528        &self,
1529        metadata: Metadata,
1530        stage: ValidationStage,
1531    ) -> Result<(HashSet<KeyIdentifier>, u32), EventError> {
1532        let signers = self
1533            .gov_api
1534            .get_signers(metadata.clone(), stage.clone())
1535            .await
1536            .map_err(EventError::GovernanceError)?;
1537        let quorum_size = self
1538            .gov_api
1539            .get_quorum(metadata, stage)
1540            .await
1541            .map_err(EventError::GovernanceError)?;
1542        Ok((signers, quorum_size))
1543    }
1544
1545    async fn ask_signatures(
1546        &self,
1547        subject_id: &DigestIdentifier,
1548        event_message: TapleMessages,
1549        signers: HashSet<KeyIdentifier>,
1550        quorum_size: u32,
1551    ) -> Result<(), EventError> {
1552        let replication_factor = extend_quorum(quorum_size, signers.len());
1553        self.message_channel
1554            .tell(MessageTaskCommand::Request(
1555                Some(String::from(format!("{}", subject_id.to_str()))),
1556                event_message,
1557                signers.into_iter().collect(),
1558                MessageConfig {
1559                    timeout: TIMEOUT,
1560                    replication_factor,
1561                },
1562            ))
1563            .await
1564            .map_err(EventError::ChannelError)?;
1565        Ok(())
1566    }
1567
1568    fn create_event_prevalidated_no_eval(
1569        &mut self,
1570        event_request: Signed<EventRequest>,
1571        subject: &Subject,
1572        gov_version: u64,
1573    ) -> Result<Signed<Event>, EventError> {
1574        // We need to get the derivator from the previous validation proof if any
1575        let derivator = match self
1576            .database
1577            .get_signatures(&subject.subject_id, subject.sn)
1578        {
1579            Ok((_, proof)) => proof.event_hash.derivator,
1580            Err(crate::database::Error::EntryNotFound) => self.derivator,
1581            Err(e) => return Err(EventError::DatabaseError(e.to_string())),
1582        };
1583        let hash_prev_event = self
1584            .database
1585            .get_event(&subject.subject_id, subject.sn)
1586            .map_err(|e| EventError::DatabaseError(e.to_string()))?
1587            .content
1588            .hash_id(derivator)?;
1589        let event = Event {
1590            subject_id: subject.subject_id.clone(),
1591            event_request,
1592            sn: subject.sn + 1,
1593            gov_version,
1594            patch: ValueWrapper(
1595                serde_json::from_str("[]")
1596                    .map_err(|_| EventError::CryptoError("Error parsing empty json".to_string()))?,
1597            ),
1598            state_hash: subject.properties.hash_id(self.derivator)?,
1599            eval_success: true,
1600            appr_required: false,
1601            approved: true,
1602            hash_prev_event,
1603            evaluators: HashSet::new(),
1604            approvers: HashSet::new(),
1605        };
1606        let event_content_hash = event.hash_id(self.derivator)?;
1607        let subject_keys = subject.keys.as_ref().expect("Somos propietario");
1608        let event_signature =
1609            Signature::new(&event, &subject_keys, self.derivator).map_err(|_| {
1610                EventError::CryptoError(String::from("Error signing the hash of the event content"))
1611            })?;
1612        let event = Signed::<Event> {
1613            content: event,
1614            signature: event_signature,
1615        };
1616        self.events_to_validate
1617            .insert(event_content_hash, event.clone());
1618        self.database
1619            .set_prevalidated_event(&subject.subject_id, event.clone())
1620            .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1621        self.database
1622            .del_request(&subject.subject_id)
1623            .map_err(|error| EventError::DatabaseError(error.to_string()))?;
1624        Ok(event)
1625    }
1626}
1627
1628pub fn extend_quorum(quorum_size: u32, signers_len: usize) -> f64 {
1629    let quorum_extended =
1630        quorum_size + (signers_len as f64 * QUORUM_PORCENTAGE_AMPLIFICATION).ceil() as u32;
1631    quorum_extended as f64 / signers_len as f64
1632}
1633
1634fn count_signatures_with_event_content_hash(
1635    signatures: &HashSet<(UniqueSignature, bool, DigestIdentifier)>,
1636    target_event_content_hash: &DigestIdentifier,
1637) -> (u32, u32) {
1638    let mut ok: u32 = 0;
1639    let mut ko: u32 = 0;
1640    for (_, acceptance, hash) in signatures.iter() {
1641        if hash == target_event_content_hash {
1642            match acceptance {
1643                true => ok += 1,
1644                false => ko += 1,
1645            }
1646        }
1647    }
1648    (ok, ko)
1649}
1650
1651fn insert_or_replace_and_check<T: PartialEq + Eq + Hash>(
1652    set: &mut HashSet<T>,
1653    new_value: T,
1654) -> bool {
1655    let replaced = set.remove(&new_value);
1656    set.insert(new_value);
1657    replaced
1658}
1659
1660fn hash_match_after_patch(
1661    evaluation: &EvaluationResponse,
1662    json_patch: ValueWrapper,
1663    mut prev_properties: ValueWrapper,
1664) -> Result<bool, EventError> {
1665    if !evaluation.eval_success {
1666        let state_hash_calculated = DigestIdentifier::from_serializable_borsh(
1667            &prev_properties,
1668            evaluation.state_hash.derivator,
1669        )
1670        .map_err(|_| {
1671            EventError::CryptoError(String::from("Error calculating the hash of the state"))
1672        })?;
1673        Ok(state_hash_calculated == evaluation.state_hash)
1674    } else {
1675        let Ok(patch_json) = serde_json::from_value::<Patch>(json_patch.0) else {
1676            return Err(EventError::ErrorParsingJsonString(
1677                "Error Parsing Patch".to_owned(),
1678            ));
1679        };
1680        let Ok(()) = patch(&mut prev_properties.0, &patch_json) else {
1681            return Err(EventError::ErrorApplyingPatch(
1682                "Error applying patch".to_owned(),
1683            ));
1684        };
1685        let state_hash_calculated = DigestIdentifier::from_serializable_borsh(
1686            &prev_properties,
1687            evaluation.state_hash.derivator,
1688        )
1689        .map_err(|_| {
1690            EventError::CryptoError(String::from("Error calculating the hash of the state"))
1691        })?;
1692        Ok(state_hash_calculated == evaluation.state_hash)
1693    }
1694}