Skip to main content

ave_core/approval/
persist.rs

1use std::sync::Arc;
2
3use crate::{
4    ActorMessage, NetworkMessage,
5    approval::types::VotationType,
6    db::Storable,
7    governance::data::GovernanceData,
8    helpers::network::service::NetworkSender,
9    model::common::{
10        emit_fail,
11        node::{SignTypesNode, UpdateData, get_sign, update_ledger_network},
12        purge_storage,
13        subject::get_metadata,
14    },
15    subject::RequestSubjectData,
16};
17use async_trait::async_trait;
18use ave_actors::{
19    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
20    Response,
21};
22use ave_actors::{LightPersistence, PersistentActor};
23use ave_common::{
24    Namespace, SchemaType,
25    bridge::request::{ApprovalState, ApprovalStateRes},
26    identity::{
27        DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
28    },
29};
30use ave_network::ComunicateInfo;
31use borsh::{BorshDeserialize, BorshSerialize};
32use serde::{Deserialize, Serialize};
33use tracing::{Span, debug, error, info_span, warn};
34
35use super::{
36    Approval, ApprovalMessage, request::ApprovalReq, response::ApprovalRes,
37};
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ApprPersist {
41    #[serde(skip)]
42    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
43    #[serde(skip)]
44    our_key: Arc<PublicKey>,
45    #[serde(skip)]
46    subject_id: DigestIdentifier,
47    #[serde(skip)]
48    pass_votation: VotationType,
49    #[serde(skip)]
50    node_key: PublicKey,
51    request_id: String,
52    version: u64,
53    state: Option<ApprovalState>,
54    request: Option<Signed<ApprovalReq>>,
55}
56
57impl BorshSerialize for ApprPersist {
58    fn serialize<W: std::io::Write>(
59        &self,
60        writer: &mut W,
61    ) -> std::io::Result<()> {
62        // Serialize only the fields we want to persist, skipping 'owner'
63        BorshSerialize::serialize(&self.request_id, writer)?;
64        BorshSerialize::serialize(&self.version, writer)?;
65        BorshSerialize::serialize(&self.state, writer)?;
66        BorshSerialize::serialize(&self.request, writer)?;
67
68        Ok(())
69    }
70}
71
72impl BorshDeserialize for ApprPersist {
73    fn deserialize_reader<R: std::io::Read>(
74        reader: &mut R,
75    ) -> std::io::Result<Self> {
76        // Deserialize the persisted fields
77        let request_id = String::deserialize_reader(reader)?;
78        let version = u64::deserialize_reader(reader)?;
79        let state = Option::<ApprovalState>::deserialize_reader(reader)?;
80        let request =
81            Option::<Signed<ApprovalReq>>::deserialize_reader(reader)?;
82
83        let node_key = PublicKey::default();
84        let our_key = Arc::new(PublicKey::default());
85        let pass_votation = VotationType::AlwaysAccept;
86        let subject_id = DigestIdentifier::default();
87
88        Ok(Self {
89            helpers: None,
90            our_key,
91            request_id,
92            version,
93            subject_id,
94            pass_votation,
95            state,
96            request,
97            node_key,
98        })
99    }
100}
101
102pub struct InitApprPersist {
103    pub our_key: Arc<PublicKey>,
104    pub node_key: PublicKey,
105    pub subject_id: DigestIdentifier,
106    pub pass_votation: VotationType,
107    pub helpers: (HashAlgorithm, Arc<NetworkSender>),
108}
109
110impl ApprPersist {
111    async fn check_governance(
112        &self,
113        ctx: &mut ActorContext<Self>,
114        governance_id: &DigestIdentifier,
115        gov_version: u64,
116    ) -> Result<Option<String>, ActorError> {
117        let Some((.., network)) = &self.helpers else {
118            return Err(ActorError::FunctionalCritical {
119                description: "Helpers are None".to_owned(),
120            });
121        };
122
123        let metadata = get_metadata(ctx, governance_id).await?;
124        let governance =
125            match GovernanceData::try_from(metadata.properties.clone()) {
126                Ok(gov) => gov,
127                Err(e) => {
128                    error!(
129                        governance_id = %governance_id,
130                        error = %e,
131                        "Failed to convert governance from properties"
132                    );
133                    return Err(ActorError::FunctionalCritical {
134                        description: format!(
135                            "can not convert governance from properties: {}",
136                            e
137                        ),
138                    });
139                }
140            };
141
142        match gov_version.cmp(&governance.version) {
143            std::cmp::Ordering::Equal => {
144                // If it is the same it means that we have the latest version of governance, we are up to date.
145            }
146            std::cmp::Ordering::Greater => {
147                // Me llega una versión mayor a la mía.
148                let data = UpdateData {
149                    sn: metadata.sn,
150                    gov_version: governance.version,
151                    subject_id: governance_id.clone(),
152                    other_node: self.node_key.clone(),
153                };
154                update_ledger_network(data, network.clone()).await?;
155            }
156            std::cmp::Ordering::Less => {
157                return Ok(Some(format!(
158                    "Abort approval, governance update is required by signer: local={}, request={}",
159                    governance.version, gov_version
160                )));
161            }
162        }
163
164        Ok(None)
165    }
166
167    async fn send_signed_response(
168        &self,
169        ctx: &mut ActorContext<Self>,
170        response: ApprovalRes,
171        request: &Signed<ApprovalReq>,
172        request_id: &str,
173        version: u64,
174    ) -> Result<(), ActorError> {
175        let Some((.., network)) = self.helpers.clone() else {
176            return Err(ActorError::FunctionalCritical {
177                description: "Helpers are None".to_owned(),
178            });
179        };
180
181        let sign_type = SignTypesNode::ApprovalRes(Box::new(response.clone()));
182        let signature = get_sign(ctx, sign_type).await?;
183
184        let subject_id = request.content().subject_id.clone();
185        if self.node_key == *self.our_key {
186            let subject_id = ctx.path().parent().key();
187            let approval_actor = ctx
188                .system()
189                .get_actor::<Approval>(&ActorPath::from(&format!(
190                    "/user/request/{}/approval",
191                    subject_id
192                )))
193                .await;
194            if let Ok(approval_actor) = approval_actor {
195                approval_actor
196                    .tell(ApprovalMessage::Response {
197                        approval_res: response,
198                        sender: (*self.our_key).clone(),
199                        signature: Some(signature),
200                    })
201                    .await?;
202            }
203        } else {
204            let signed_response: Signed<ApprovalRes> =
205                Signed::from_parts(response, signature);
206
207            let new_info = ComunicateInfo {
208                receiver: self.node_key.clone(),
209                request_id: request_id.to_string(),
210                version,
211                receiver_actor: format!(
212                    "/user/request/{}/approval/{}",
213                    subject_id, self.our_key
214                ),
215            };
216
217            if let Err(e) = network
218                .send_command(ave_network::CommandHelper::SendMessage {
219                    message: NetworkMessage {
220                        info: new_info,
221                        message: ActorMessage::ApprovalRes {
222                            res: Box::new(signed_response),
223                        },
224                    },
225                })
226                .await
227            {
228                return Err(emit_fail(ctx, e).await);
229            };
230        }
231
232        Ok(())
233    }
234
235    async fn send_response(
236        &self,
237        ctx: &mut ActorContext<Self>,
238        request: &Signed<ApprovalReq>,
239        response: bool,
240        request_id: &str,
241        version: u64,
242    ) -> Result<(), ActorError> {
243        let Some((hash, ..)) = self.helpers.clone() else {
244            return Err(ActorError::FunctionalCritical {
245                description: "Helpers are None".to_owned(),
246            });
247        };
248        let approval_req_hash = hash_borsh(&*hash.hasher(), request.content())
249            .map_err(|e| ActorError::FunctionalCritical {
250                description: format!(
251                    "Can not obtain approval request hash {}",
252                    e
253                ),
254            })?;
255
256        let req_subject_data_hash = hash_borsh(
257            &*hash.hasher(),
258            &RequestSubjectData {
259                subject_id: request.content().subject_id.clone(),
260                governance_id: request.content().subject_id.clone(),
261                sn: request.content().sn,
262                namespace: Namespace::new(),
263                schema_id: SchemaType::Governance,
264                gov_version: request.content().gov_version,
265                signer: request.content().signer.clone(),
266            },
267        )
268        .map_err(|e| ActorError::FunctionalCritical {
269            description: format!("Can not obtain approval request hash {}", e),
270        })?;
271
272        let res = ApprovalRes::Response {
273            approval_req_hash,
274            agrees: response,
275            req_subject_data_hash,
276        };
277        self.send_signed_response(ctx, res, request, request_id, version)
278            .await
279    }
280}
281
282#[derive(Debug, Clone)]
283pub enum ApprPersistMessage {
284    MakeObsolete,
285    PurgeStorage,
286    // Mensaje para aprobar localmente
287    LocalApproval {
288        request_id: DigestIdentifier,
289        version: u64,
290        approval_req: Signed<ApprovalReq>,
291    },
292    // Mensaje para pedir aprobación desde el helper y devolver ahi
293    NetworkRequest {
294        approval_req: Signed<ApprovalReq>,
295        info: ComunicateInfo,
296        sender: PublicKey,
297    },
298    GetApproval {
299        state: Option<ApprovalState>,
300    },
301    ChangeResponse {
302        response: ApprovalStateRes,
303    }, // Necesito poder emitir un evento de aprobación, no solo el automático
304}
305
306impl Message for ApprPersistMessage {
307    fn is_critical(&self) -> bool {
308        matches!(self, Self::MakeObsolete | Self::PurgeStorage)
309    }
310}
311
312#[derive(
313    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
314)]
315pub enum ApprPersistEvent {
316    ChangeState {
317        state: ApprovalState,
318    },
319    SafeState {
320        subject_id: DigestIdentifier,
321        request_id: String,
322        version: u64,
323        request: Box<Signed<ApprovalReq>>,
324        state: ApprovalState,
325    },
326}
327
328impl Event for ApprPersistEvent {}
329
330pub enum ApprPersistResponse {
331    Ok,
332    Approval {
333        request: ApprovalReq,
334        state: ApprovalState,
335    },
336}
337
338impl Response for ApprPersistResponse {}
339
340#[async_trait]
341impl Actor for ApprPersist {
342    type Event = ApprPersistEvent;
343    type Message = ApprPersistMessage;
344    type Response = ApprPersistResponse;
345
346    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
347        parent_span.map_or_else(
348            || info_span!("ApprPersist"),
349            |parent_span| info_span!(parent: parent_span, "ApprPersist"),
350        )
351    }
352
353    async fn pre_start(
354        &mut self,
355        ctx: &mut ActorContext<Self>,
356    ) -> Result<(), ActorError> {
357        let prefix = ctx.path().parent().key();
358        if let Err(e) = self
359            .init_store("approver", Some(prefix.clone()), false, ctx)
360            .await
361        {
362            error!(
363                error = %e,
364                "Failed to initialize approver store"
365            );
366            return Err(e);
367        }
368        Ok(())
369    }
370}
371
372#[async_trait]
373impl Handler<Self> for ApprPersist {
374    async fn handle_message(
375        &mut self,
376        _sender: ActorPath,
377        msg: ApprPersistMessage,
378        ctx: &mut ActorContext<Self>,
379    ) -> Result<ApprPersistResponse, ActorError> {
380        match msg {
381            ApprPersistMessage::PurgeStorage => {
382                purge_storage(ctx).await?;
383
384                debug!(
385                    msg_type = "PurgeStorage",
386                    subject_id = %self.subject_id,
387                    "Approval storage purged"
388                );
389
390                return Ok(ApprPersistResponse::Ok);
391            }
392            ApprPersistMessage::GetApproval { state } => {
393                let res = if let Some(req) = &self.request
394                    && let Some(req_state) = &self.state
395                {
396                    state.map_or_else(
397                        || ApprPersistResponse::Approval {
398                            request: req.content().clone(),
399                            state: req_state.clone(),
400                        },
401                        |query| {
402                            if &query == req_state {
403                                ApprPersistResponse::Approval {
404                                    request: req.content().clone(),
405                                    state: query,
406                                }
407                            } else {
408                                ApprPersistResponse::Ok
409                            }
410                        },
411                    )
412                } else {
413                    ApprPersistResponse::Ok
414                };
415
416                return Ok(res);
417            }
418            ApprPersistMessage::MakeObsolete => {
419                let state = if let Some(state) = self.state.clone() {
420                    state
421                } else {
422                    return Ok(ApprPersistResponse::Ok);
423                };
424
425                if state == ApprovalState::Pending {
426                    self.on_event(
427                        ApprPersistEvent::ChangeState {
428                            state: ApprovalState::Obsolete,
429                        },
430                        ctx,
431                    )
432                    .await;
433
434                    debug!(
435                        msg_type = "MakeObsolete",
436                        "State changed to obsolete"
437                    );
438                }
439            }
440            ApprPersistMessage::ChangeResponse { response } => {
441                let Some(state) = self.state.clone() else {
442                    warn!(
443                        msg_type = "ChangeResponse",
444                        "Approval state not found"
445                    );
446                    return Err(ActorError::Functional {
447                        description: "Can not get approval state".to_owned(),
448                    });
449                };
450
451                if response == ApprovalStateRes::Obsolete {
452                    warn!(
453                        msg_type = "ChangeResponse",
454                        "Invalid state transition to Obsolete"
455                    );
456                    return Err(ActorError::Functional {
457                        description:
458                            "New state is Obsolete, is an invalid state"
459                                .to_owned(),
460                    });
461                }
462
463                if state == ApprovalState::Pending {
464                    let (response, state) =
465                        if ApprovalStateRes::Accepted == response {
466                            (true, ApprovalState::Accepted)
467                        } else {
468                            (false, ApprovalState::Rejected)
469                        };
470
471                    let Some(approval_req) = self.request.clone() else {
472                        error!(
473                            msg_type = "ChangeResponse",
474                            "Approval request not found"
475                        );
476                        return Err(ActorError::Functional {
477                            description: "Can not get approval request"
478                                .to_owned(),
479                        });
480                    };
481
482                    if let Err(e) = self
483                        .send_response(
484                            ctx,
485                            &approval_req,
486                            response,
487                            &self.request_id.to_string(),
488                            self.version,
489                        )
490                        .await
491                    {
492                        error!(
493                            msg_type = "ChangeResponse",
494                            error = %e,
495                            "Failed to send approval response"
496                        );
497                        return Err(emit_fail(ctx, e).await);
498                    };
499
500                    debug!(
501                        msg_type = "ChangeResponse",
502                        new_state = ?state,
503                        "State changed successfully"
504                    );
505
506                    self.on_event(ApprPersistEvent::ChangeState { state }, ctx)
507                        .await;
508                }
509            }
510            // aprobar si esta por defecto
511            ApprPersistMessage::LocalApproval {
512                request_id,
513                version,
514                approval_req,
515            } => {
516                if request_id.to_string() != self.request_id
517                    || version != self.version
518                {
519                    let state =
520                        if self.pass_votation == VotationType::AlwaysAccept {
521                            if let Err(e) = self
522                                .send_response(
523                                    ctx,
524                                    &approval_req,
525                                    true,
526                                    &request_id.to_string(),
527                                    version,
528                                )
529                                .await
530                            {
531                                error!(
532                                    msg_type = "LocalApproval",
533                                    error = %e,
534                                    "Failed to send approval response"
535                                );
536                                return Err(emit_fail(ctx, e).await);
537                            }
538
539                            ApprovalState::Accepted
540                        } else {
541                            ApprovalState::Pending
542                        };
543
544                    debug!(
545                        msg_type = "LocalApproval",
546                        request_id = %request_id,
547                        version = version,
548                        new_state = ?state,
549                        "New approval request processed"
550                    );
551
552                    self.on_event(
553                        ApprPersistEvent::SafeState {
554                            subject_id: self.subject_id.clone(),
555                            version,
556                            request_id: request_id.to_string(),
557                            request: Box::new(approval_req),
558                            state,
559                        },
560                        ctx,
561                    )
562                    .await;
563                } else if let Some(state) = self.state.clone() {
564                    let response = if state == ApprovalState::Accepted {
565                        true
566                    } else if state == ApprovalState::Rejected {
567                        false
568                    } else {
569                        return Ok(ApprPersistResponse::Ok);
570                    };
571
572                    if let Err(e) = self
573                        .send_response(
574                            ctx,
575                            &approval_req,
576                            response,
577                            &request_id.to_string(),
578                            version,
579                        )
580                        .await
581                    {
582                        error!(
583                            msg_type = "LocalApproval",
584                            error = %e,
585                            "Failed to resend approval response"
586                        );
587                        return Err(emit_fail(ctx, e).await);
588                    }
589
590                    debug!(
591                        msg_type = "LocalApproval",
592                        request_id = %request_id,
593                        version = version,
594                        "Response resent successfully"
595                    );
596                }
597            }
598            ApprPersistMessage::NetworkRequest {
599                approval_req,
600                info,
601                sender,
602            } => {
603                if sender != approval_req.signature().signer
604                    || sender != self.node_key
605                {
606                    warn!(
607                        msg_type = "NetworkRequest",
608                        expected_sender = %self.node_key,
609                        received_sender = %sender,
610                        "Unexpected sender"
611                    );
612                    return Ok(ApprPersistResponse::Ok);
613                }
614
615                if info.request_id != self.request_id
616                    || info.version != self.version
617                {
618                    if let Err(e) = approval_req.verify() {
619                        error!(
620                            msg_type = "NetworkRequest",
621                            error = %e,
622                            "Invalid approval signature"
623                        );
624                        return Err(ActorError::Functional {
625                            description: format!(
626                                "Can not verify signature of request: {}",
627                                e
628                            ),
629                        });
630                    }
631
632                    let governance_check = match self
633                        .check_governance(
634                            ctx,
635                            &approval_req.content().subject_id,
636                            approval_req.content().gov_version,
637                        )
638                        .await
639                    {
640                        Ok(check) => check,
641                        Err(e) => {
642                            warn!(
643                                msg_type = "NetworkRequest",
644                                error = %e,
645                                "Failed to check governance"
646                            );
647                            return Err(emit_fail(ctx, e).await);
648                        }
649                    };
650
651                    if let Some(reason) = governance_check {
652                        if let Err(e) = self
653                            .send_signed_response(
654                                ctx,
655                                ApprovalRes::Abort(reason),
656                                &approval_req,
657                                &info.request_id,
658                                info.version,
659                            )
660                            .await
661                        {
662                            error!(
663                                msg_type = "NetworkRequest",
664                                error = %e,
665                                "Failed to send approval abort response"
666                            );
667                            return Err(emit_fail(ctx, e).await);
668                        }
669
670                        return Ok(ApprPersistResponse::Ok);
671                    }
672
673                    let state =
674                        if self.pass_votation == VotationType::AlwaysAccept {
675                            ApprovalState::Accepted
676                        } else {
677                            ApprovalState::Pending
678                        };
679
680                    self.on_event(
681                        ApprPersistEvent::SafeState {
682                            subject_id: self.subject_id.clone(),
683                            request_id: info.request_id.clone(),
684                            version: info.version,
685                            request: Box::new(approval_req.clone()),
686                            state: state.clone(),
687                        },
688                        ctx,
689                    )
690                    .await;
691
692                    if state == ApprovalState::Accepted
693                        && let Err(e) = self
694                            .send_response(
695                                ctx,
696                                &approval_req,
697                                true,
698                                &info.request_id,
699                                info.version,
700                            )
701                            .await
702                    {
703                        error!(
704                            msg_type = "NetworkRequest",
705                            error = %e,
706                            "Failed to send approval response"
707                        );
708                        return Err(emit_fail(ctx, e).await);
709                    };
710
711                    debug!(
712                        msg_type = "NetworkRequest",
713                        request_id = %info.request_id,
714                        version = info.version,
715                        new_state = ?state,
716                        "Network approval request processed"
717                    );
718                } else if !self.request_id.is_empty() {
719                    let state = if let Some(state) = self.state.clone() {
720                        state
721                    } else {
722                        warn!(
723                            msg_type = "NetworkRequest",
724                            "Approval state not found"
725                        );
726                        let e = ActorError::FunctionalCritical {
727                            description: "Can not get state".to_owned(),
728                        };
729                        return Err(emit_fail(ctx, e).await);
730                    };
731
732                    let response = if ApprovalState::Accepted == state {
733                        true
734                    } else if ApprovalState::Rejected == state {
735                        false
736                    } else {
737                        return Ok(ApprPersistResponse::Ok);
738                    };
739
740                    let approval_req =
741                        if let Some(approval_req) = self.request.clone() {
742                            approval_req
743                        } else {
744                            error!(
745                                msg_type = "NetworkRequest",
746                                "Approval request not found"
747                            );
748                            let e = ActorError::FunctionalCritical {
749                                description: "Can not get approve request"
750                                    .to_owned(),
751                            };
752                            return Err(emit_fail(ctx, e).await);
753                        };
754
755                    if let Err(e) = self
756                        .send_response(
757                            ctx,
758                            &approval_req,
759                            response,
760                            &self.request_id.to_string(),
761                            self.version,
762                        )
763                        .await
764                    {
765                        error!(
766                            msg_type = "NetworkRequest",
767                            error = %e,
768                            "Failed to resend approval response"
769                        );
770                        return Err(emit_fail(ctx, e).await);
771                    };
772
773                    debug!(
774                        msg_type = "NetworkRequest",
775                        request_id = %self.request_id,
776                        version = self.version,
777                        "Response resent successfully"
778                    );
779                }
780            }
781        }
782        Ok(ApprPersistResponse::Ok)
783    }
784
785    async fn on_event(
786        &mut self,
787        event: ApprPersistEvent,
788        ctx: &mut ActorContext<Self>,
789    ) {
790        if let Err(e) = self.persist(&event, ctx).await {
791            error!(error = %e, "Failed to persist event");
792            emit_fail(ctx, e).await;
793        };
794
795        if let Err(e) = ctx.publish_event(event).await {
796            error!(error = %e, "Failed to publish event");
797            emit_fail(ctx, e).await;
798        };
799    }
800}
801
802// Debemos persistir el estado de la petición hasta que se apruebe
803#[async_trait]
804impl PersistentActor for ApprPersist {
805    type Persistence = LightPersistence;
806    type InitParams = InitApprPersist;
807
808    fn update(&mut self, state: Self) {
809        self.request_id = state.request_id;
810        self.version = state.version;
811        self.state = state.state;
812        self.request = state.request;
813    }
814
815    fn create_initial(params: Self::InitParams) -> Self {
816        let Self::InitParams {
817            our_key,
818            node_key,
819            subject_id,
820            pass_votation,
821            helpers,
822        } = params;
823
824        Self {
825            helpers: Some(helpers),
826            node_key,
827            our_key,
828            request_id: String::default(),
829            version: 0,
830            subject_id,
831            pass_votation,
832            state: None,
833            request: None,
834        }
835    }
836
837    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
838        match event {
839            ApprPersistEvent::ChangeState { state, .. } => {
840                debug!(
841                    event_type = "ChangeState",
842                    new_state = ?state,
843                    "Approval state changed"
844                );
845                self.state = Some(state.clone());
846            }
847            ApprPersistEvent::SafeState {
848                request,
849                state,
850                request_id,
851                version,
852                ..
853            } => {
854                debug!(
855                    event_type = "SafeState",
856                    request_id = %request_id,
857                    version = version,
858                    new_state = ?state,
859                    "Approval state saved"
860                );
861                self.version = *version;
862                self.request_id.clone_from(request_id);
863                self.request = Some(*request.clone());
864                self.state = Some(state.clone());
865            }
866        };
867
868        Ok(())
869    }
870}
871
872impl Storable for ApprPersist {}