1use std::sync::Arc;
2
3use crate::{
4 ActorMessage, NetworkMessage,
5 approval::types::VotationType,
6 db::Storable,
7 governance::data::GovernanceData,
8 helpers::network::service::NetworkSender,
9 model::common::{
10 emit_fail,
11 node::{SignTypesNode, UpdateData, get_sign, update_ledger_network},
12 purge_storage,
13 subject::get_metadata,
14 },
15 subject::RequestSubjectData,
16};
17use async_trait::async_trait;
18use ave_actors::{
19 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
20 Response,
21};
22use ave_actors::{LightPersistence, PersistentActor};
23use ave_common::{
24 Namespace, SchemaType,
25 bridge::request::{ApprovalState, ApprovalStateRes},
26 identity::{
27 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
28 },
29};
30use ave_network::ComunicateInfo;
31use borsh::{BorshDeserialize, BorshSerialize};
32use serde::{Deserialize, Serialize};
33use tracing::{Span, debug, error, info_span, warn};
34
35use super::{
36 Approval, ApprovalMessage, request::ApprovalReq, response::ApprovalRes,
37};
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ApprPersist {
41 #[serde(skip)]
42 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
43 #[serde(skip)]
44 our_key: Arc<PublicKey>,
45 #[serde(skip)]
46 subject_id: DigestIdentifier,
47 #[serde(skip)]
48 pass_votation: VotationType,
49 #[serde(skip)]
50 node_key: PublicKey,
51 request_id: String,
52 version: u64,
53 state: Option<ApprovalState>,
54 request: Option<Signed<ApprovalReq>>,
55}
56
57impl BorshSerialize for ApprPersist {
58 fn serialize<W: std::io::Write>(
59 &self,
60 writer: &mut W,
61 ) -> std::io::Result<()> {
62 BorshSerialize::serialize(&self.request_id, writer)?;
64 BorshSerialize::serialize(&self.version, writer)?;
65 BorshSerialize::serialize(&self.state, writer)?;
66 BorshSerialize::serialize(&self.request, writer)?;
67
68 Ok(())
69 }
70}
71
72impl BorshDeserialize for ApprPersist {
73 fn deserialize_reader<R: std::io::Read>(
74 reader: &mut R,
75 ) -> std::io::Result<Self> {
76 let request_id = String::deserialize_reader(reader)?;
78 let version = u64::deserialize_reader(reader)?;
79 let state = Option::<ApprovalState>::deserialize_reader(reader)?;
80 let request =
81 Option::<Signed<ApprovalReq>>::deserialize_reader(reader)?;
82
83 let node_key = PublicKey::default();
84 let our_key = Arc::new(PublicKey::default());
85 let pass_votation = VotationType::AlwaysAccept;
86 let subject_id = DigestIdentifier::default();
87
88 Ok(Self {
89 helpers: None,
90 our_key,
91 request_id,
92 version,
93 subject_id,
94 pass_votation,
95 state,
96 request,
97 node_key,
98 })
99 }
100}
101
102pub struct InitApprPersist {
103 pub our_key: Arc<PublicKey>,
104 pub node_key: PublicKey,
105 pub subject_id: DigestIdentifier,
106 pub pass_votation: VotationType,
107 pub helpers: (HashAlgorithm, Arc<NetworkSender>),
108}
109
110impl ApprPersist {
111 async fn check_governance(
112 &self,
113 ctx: &mut ActorContext<Self>,
114 governance_id: &DigestIdentifier,
115 gov_version: u64,
116 ) -> Result<Option<String>, ActorError> {
117 let Some((.., network)) = &self.helpers else {
118 return Err(ActorError::FunctionalCritical {
119 description: "Helpers are None".to_owned(),
120 });
121 };
122
123 let metadata = get_metadata(ctx, governance_id).await?;
124 let governance =
125 match GovernanceData::try_from(metadata.properties.clone()) {
126 Ok(gov) => gov,
127 Err(e) => {
128 error!(
129 governance_id = %governance_id,
130 error = %e,
131 "Failed to convert governance from properties"
132 );
133 return Err(ActorError::FunctionalCritical {
134 description: format!(
135 "can not convert governance from properties: {}",
136 e
137 ),
138 });
139 }
140 };
141
142 match gov_version.cmp(&governance.version) {
143 std::cmp::Ordering::Equal => {
144 }
146 std::cmp::Ordering::Greater => {
147 let data = UpdateData {
149 sn: metadata.sn,
150 gov_version: governance.version,
151 subject_id: governance_id.clone(),
152 other_node: self.node_key.clone(),
153 };
154 update_ledger_network(data, network.clone()).await?;
155 }
156 std::cmp::Ordering::Less => {
157 return Ok(Some(format!(
158 "Abort approval, governance update is required by signer: local={}, request={}",
159 governance.version, gov_version
160 )));
161 }
162 }
163
164 Ok(None)
165 }
166
167 async fn send_signed_response(
168 &self,
169 ctx: &mut ActorContext<Self>,
170 response: ApprovalRes,
171 request: &Signed<ApprovalReq>,
172 request_id: &str,
173 version: u64,
174 ) -> Result<(), ActorError> {
175 let Some((.., network)) = self.helpers.clone() else {
176 return Err(ActorError::FunctionalCritical {
177 description: "Helpers are None".to_owned(),
178 });
179 };
180
181 let sign_type = SignTypesNode::ApprovalRes(Box::new(response.clone()));
182 let signature = get_sign(ctx, sign_type).await?;
183
184 let subject_id = request.content().subject_id.clone();
185 if self.node_key == *self.our_key {
186 let subject_id = ctx.path().parent().key();
187 let approval_actor = ctx
188 .system()
189 .get_actor::<Approval>(&ActorPath::from(&format!(
190 "/user/request/{}/approval",
191 subject_id
192 )))
193 .await;
194 if let Ok(approval_actor) = approval_actor {
195 approval_actor
196 .tell(ApprovalMessage::Response {
197 approval_res: response,
198 sender: (*self.our_key).clone(),
199 signature: Some(signature),
200 })
201 .await?;
202 }
203 } else {
204 let signed_response: Signed<ApprovalRes> =
205 Signed::from_parts(response, signature);
206
207 let new_info = ComunicateInfo {
208 receiver: self.node_key.clone(),
209 request_id: request_id.to_string(),
210 version,
211 receiver_actor: format!(
212 "/user/request/{}/approval/{}",
213 subject_id, self.our_key
214 ),
215 };
216
217 if let Err(e) = network
218 .send_command(ave_network::CommandHelper::SendMessage {
219 message: NetworkMessage {
220 info: new_info,
221 message: ActorMessage::ApprovalRes {
222 res: Box::new(signed_response),
223 },
224 },
225 })
226 .await
227 {
228 return Err(emit_fail(ctx, e).await);
229 };
230 }
231
232 Ok(())
233 }
234
235 async fn send_response(
236 &self,
237 ctx: &mut ActorContext<Self>,
238 request: &Signed<ApprovalReq>,
239 response: bool,
240 request_id: &str,
241 version: u64,
242 ) -> Result<(), ActorError> {
243 let Some((hash, ..)) = self.helpers.clone() else {
244 return Err(ActorError::FunctionalCritical {
245 description: "Helpers are None".to_owned(),
246 });
247 };
248 let approval_req_hash = hash_borsh(&*hash.hasher(), request.content())
249 .map_err(|e| ActorError::FunctionalCritical {
250 description: format!(
251 "Can not obtain approval request hash {}",
252 e
253 ),
254 })?;
255
256 let req_subject_data_hash = hash_borsh(
257 &*hash.hasher(),
258 &RequestSubjectData {
259 subject_id: request.content().subject_id.clone(),
260 governance_id: request.content().subject_id.clone(),
261 sn: request.content().sn,
262 namespace: Namespace::new(),
263 schema_id: SchemaType::Governance,
264 gov_version: request.content().gov_version,
265 signer: request.content().signer.clone(),
266 },
267 )
268 .map_err(|e| ActorError::FunctionalCritical {
269 description: format!("Can not obtain approval request hash {}", e),
270 })?;
271
272 let res = ApprovalRes::Response {
273 approval_req_hash,
274 agrees: response,
275 req_subject_data_hash,
276 };
277 self.send_signed_response(ctx, res, request, request_id, version)
278 .await
279 }
280}
281
282#[derive(Debug, Clone)]
283pub enum ApprPersistMessage {
284 MakeObsolete,
285 PurgeStorage,
286 LocalApproval {
288 request_id: DigestIdentifier,
289 version: u64,
290 approval_req: Signed<ApprovalReq>,
291 },
292 NetworkRequest {
294 approval_req: Signed<ApprovalReq>,
295 info: ComunicateInfo,
296 sender: PublicKey,
297 },
298 GetApproval {
299 state: Option<ApprovalState>,
300 },
301 ChangeResponse {
302 response: ApprovalStateRes,
303 }, }
305
306impl Message for ApprPersistMessage {
307 fn is_critical(&self) -> bool {
308 matches!(self, Self::MakeObsolete | Self::PurgeStorage)
309 }
310}
311
312#[derive(
313 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
314)]
315pub enum ApprPersistEvent {
316 ChangeState {
317 state: ApprovalState,
318 },
319 SafeState {
320 subject_id: DigestIdentifier,
321 request_id: String,
322 version: u64,
323 request: Box<Signed<ApprovalReq>>,
324 state: ApprovalState,
325 },
326}
327
328impl Event for ApprPersistEvent {}
329
330pub enum ApprPersistResponse {
331 Ok,
332 Approval {
333 request: ApprovalReq,
334 state: ApprovalState,
335 },
336}
337
338impl Response for ApprPersistResponse {}
339
340#[async_trait]
341impl Actor for ApprPersist {
342 type Event = ApprPersistEvent;
343 type Message = ApprPersistMessage;
344 type Response = ApprPersistResponse;
345
346 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
347 parent_span.map_or_else(
348 || info_span!("ApprPersist"),
349 |parent_span| info_span!(parent: parent_span, "ApprPersist"),
350 )
351 }
352
353 async fn pre_start(
354 &mut self,
355 ctx: &mut ActorContext<Self>,
356 ) -> Result<(), ActorError> {
357 let prefix = ctx.path().parent().key();
358 if let Err(e) = self
359 .init_store("approver", Some(prefix.clone()), false, ctx)
360 .await
361 {
362 error!(
363 error = %e,
364 "Failed to initialize approver store"
365 );
366 return Err(e);
367 }
368 Ok(())
369 }
370}
371
372#[async_trait]
373impl Handler<Self> for ApprPersist {
374 async fn handle_message(
375 &mut self,
376 _sender: ActorPath,
377 msg: ApprPersistMessage,
378 ctx: &mut ActorContext<Self>,
379 ) -> Result<ApprPersistResponse, ActorError> {
380 match msg {
381 ApprPersistMessage::PurgeStorage => {
382 purge_storage(ctx).await?;
383
384 debug!(
385 msg_type = "PurgeStorage",
386 subject_id = %self.subject_id,
387 "Approval storage purged"
388 );
389
390 return Ok(ApprPersistResponse::Ok);
391 }
392 ApprPersistMessage::GetApproval { state } => {
393 let res = if let Some(req) = &self.request
394 && let Some(req_state) = &self.state
395 {
396 state.map_or_else(
397 || ApprPersistResponse::Approval {
398 request: req.content().clone(),
399 state: req_state.clone(),
400 },
401 |query| {
402 if &query == req_state {
403 ApprPersistResponse::Approval {
404 request: req.content().clone(),
405 state: query,
406 }
407 } else {
408 ApprPersistResponse::Ok
409 }
410 },
411 )
412 } else {
413 ApprPersistResponse::Ok
414 };
415
416 return Ok(res);
417 }
418 ApprPersistMessage::MakeObsolete => {
419 let state = if let Some(state) = self.state.clone() {
420 state
421 } else {
422 return Ok(ApprPersistResponse::Ok);
423 };
424
425 if state == ApprovalState::Pending {
426 self.on_event(
427 ApprPersistEvent::ChangeState {
428 state: ApprovalState::Obsolete,
429 },
430 ctx,
431 )
432 .await;
433
434 debug!(
435 msg_type = "MakeObsolete",
436 "State changed to obsolete"
437 );
438 }
439 }
440 ApprPersistMessage::ChangeResponse { response } => {
441 let Some(state) = self.state.clone() else {
442 warn!(
443 msg_type = "ChangeResponse",
444 "Approval state not found"
445 );
446 return Err(ActorError::Functional {
447 description: "Can not get approval state".to_owned(),
448 });
449 };
450
451 if response == ApprovalStateRes::Obsolete {
452 warn!(
453 msg_type = "ChangeResponse",
454 "Invalid state transition to Obsolete"
455 );
456 return Err(ActorError::Functional {
457 description:
458 "New state is Obsolete, is an invalid state"
459 .to_owned(),
460 });
461 }
462
463 if state == ApprovalState::Pending {
464 let (response, state) =
465 if ApprovalStateRes::Accepted == response {
466 (true, ApprovalState::Accepted)
467 } else {
468 (false, ApprovalState::Rejected)
469 };
470
471 let Some(approval_req) = self.request.clone() else {
472 error!(
473 msg_type = "ChangeResponse",
474 "Approval request not found"
475 );
476 return Err(ActorError::Functional {
477 description: "Can not get approval request"
478 .to_owned(),
479 });
480 };
481
482 if let Err(e) = self
483 .send_response(
484 ctx,
485 &approval_req,
486 response,
487 &self.request_id.to_string(),
488 self.version,
489 )
490 .await
491 {
492 error!(
493 msg_type = "ChangeResponse",
494 error = %e,
495 "Failed to send approval response"
496 );
497 return Err(emit_fail(ctx, e).await);
498 };
499
500 debug!(
501 msg_type = "ChangeResponse",
502 new_state = ?state,
503 "State changed successfully"
504 );
505
506 self.on_event(ApprPersistEvent::ChangeState { state }, ctx)
507 .await;
508 }
509 }
510 ApprPersistMessage::LocalApproval {
512 request_id,
513 version,
514 approval_req,
515 } => {
516 if request_id.to_string() != self.request_id
517 || version != self.version
518 {
519 let state =
520 if self.pass_votation == VotationType::AlwaysAccept {
521 if let Err(e) = self
522 .send_response(
523 ctx,
524 &approval_req,
525 true,
526 &request_id.to_string(),
527 version,
528 )
529 .await
530 {
531 error!(
532 msg_type = "LocalApproval",
533 error = %e,
534 "Failed to send approval response"
535 );
536 return Err(emit_fail(ctx, e).await);
537 }
538
539 ApprovalState::Accepted
540 } else {
541 ApprovalState::Pending
542 };
543
544 debug!(
545 msg_type = "LocalApproval",
546 request_id = %request_id,
547 version = version,
548 new_state = ?state,
549 "New approval request processed"
550 );
551
552 self.on_event(
553 ApprPersistEvent::SafeState {
554 subject_id: self.subject_id.clone(),
555 version,
556 request_id: request_id.to_string(),
557 request: Box::new(approval_req),
558 state,
559 },
560 ctx,
561 )
562 .await;
563 } else if let Some(state) = self.state.clone() {
564 let response = if state == ApprovalState::Accepted {
565 true
566 } else if state == ApprovalState::Rejected {
567 false
568 } else {
569 return Ok(ApprPersistResponse::Ok);
570 };
571
572 if let Err(e) = self
573 .send_response(
574 ctx,
575 &approval_req,
576 response,
577 &request_id.to_string(),
578 version,
579 )
580 .await
581 {
582 error!(
583 msg_type = "LocalApproval",
584 error = %e,
585 "Failed to resend approval response"
586 );
587 return Err(emit_fail(ctx, e).await);
588 }
589
590 debug!(
591 msg_type = "LocalApproval",
592 request_id = %request_id,
593 version = version,
594 "Response resent successfully"
595 );
596 }
597 }
598 ApprPersistMessage::NetworkRequest {
599 approval_req,
600 info,
601 sender,
602 } => {
603 if sender != approval_req.signature().signer
604 || sender != self.node_key
605 {
606 warn!(
607 msg_type = "NetworkRequest",
608 expected_sender = %self.node_key,
609 received_sender = %sender,
610 "Unexpected sender"
611 );
612 return Ok(ApprPersistResponse::Ok);
613 }
614
615 if info.request_id != self.request_id
616 || info.version != self.version
617 {
618 if let Err(e) = approval_req.verify() {
619 error!(
620 msg_type = "NetworkRequest",
621 error = %e,
622 "Invalid approval signature"
623 );
624 return Err(ActorError::Functional {
625 description: format!(
626 "Can not verify signature of request: {}",
627 e
628 ),
629 });
630 }
631
632 let governance_check = match self
633 .check_governance(
634 ctx,
635 &approval_req.content().subject_id,
636 approval_req.content().gov_version,
637 )
638 .await
639 {
640 Ok(check) => check,
641 Err(e) => {
642 warn!(
643 msg_type = "NetworkRequest",
644 error = %e,
645 "Failed to check governance"
646 );
647 return Err(emit_fail(ctx, e).await);
648 }
649 };
650
651 if let Some(reason) = governance_check {
652 if let Err(e) = self
653 .send_signed_response(
654 ctx,
655 ApprovalRes::Abort(reason),
656 &approval_req,
657 &info.request_id,
658 info.version,
659 )
660 .await
661 {
662 error!(
663 msg_type = "NetworkRequest",
664 error = %e,
665 "Failed to send approval abort response"
666 );
667 return Err(emit_fail(ctx, e).await);
668 }
669
670 return Ok(ApprPersistResponse::Ok);
671 }
672
673 let state =
674 if self.pass_votation == VotationType::AlwaysAccept {
675 ApprovalState::Accepted
676 } else {
677 ApprovalState::Pending
678 };
679
680 self.on_event(
681 ApprPersistEvent::SafeState {
682 subject_id: self.subject_id.clone(),
683 request_id: info.request_id.clone(),
684 version: info.version,
685 request: Box::new(approval_req.clone()),
686 state: state.clone(),
687 },
688 ctx,
689 )
690 .await;
691
692 if state == ApprovalState::Accepted
693 && let Err(e) = self
694 .send_response(
695 ctx,
696 &approval_req,
697 true,
698 &info.request_id,
699 info.version,
700 )
701 .await
702 {
703 error!(
704 msg_type = "NetworkRequest",
705 error = %e,
706 "Failed to send approval response"
707 );
708 return Err(emit_fail(ctx, e).await);
709 };
710
711 debug!(
712 msg_type = "NetworkRequest",
713 request_id = %info.request_id,
714 version = info.version,
715 new_state = ?state,
716 "Network approval request processed"
717 );
718 } else if !self.request_id.is_empty() {
719 let state = if let Some(state) = self.state.clone() {
720 state
721 } else {
722 warn!(
723 msg_type = "NetworkRequest",
724 "Approval state not found"
725 );
726 let e = ActorError::FunctionalCritical {
727 description: "Can not get state".to_owned(),
728 };
729 return Err(emit_fail(ctx, e).await);
730 };
731
732 let response = if ApprovalState::Accepted == state {
733 true
734 } else if ApprovalState::Rejected == state {
735 false
736 } else {
737 return Ok(ApprPersistResponse::Ok);
738 };
739
740 let approval_req =
741 if let Some(approval_req) = self.request.clone() {
742 approval_req
743 } else {
744 error!(
745 msg_type = "NetworkRequest",
746 "Approval request not found"
747 );
748 let e = ActorError::FunctionalCritical {
749 description: "Can not get approve request"
750 .to_owned(),
751 };
752 return Err(emit_fail(ctx, e).await);
753 };
754
755 if let Err(e) = self
756 .send_response(
757 ctx,
758 &approval_req,
759 response,
760 &self.request_id.to_string(),
761 self.version,
762 )
763 .await
764 {
765 error!(
766 msg_type = "NetworkRequest",
767 error = %e,
768 "Failed to resend approval response"
769 );
770 return Err(emit_fail(ctx, e).await);
771 };
772
773 debug!(
774 msg_type = "NetworkRequest",
775 request_id = %self.request_id,
776 version = self.version,
777 "Response resent successfully"
778 );
779 }
780 }
781 }
782 Ok(ApprPersistResponse::Ok)
783 }
784
785 async fn on_event(
786 &mut self,
787 event: ApprPersistEvent,
788 ctx: &mut ActorContext<Self>,
789 ) {
790 if let Err(e) = self.persist(&event, ctx).await {
791 error!(error = %e, "Failed to persist event");
792 emit_fail(ctx, e).await;
793 };
794
795 if let Err(e) = ctx.publish_event(event).await {
796 error!(error = %e, "Failed to publish event");
797 emit_fail(ctx, e).await;
798 };
799 }
800}
801
802#[async_trait]
804impl PersistentActor for ApprPersist {
805 type Persistence = LightPersistence;
806 type InitParams = InitApprPersist;
807
808 fn update(&mut self, state: Self) {
809 self.request_id = state.request_id;
810 self.version = state.version;
811 self.state = state.state;
812 self.request = state.request;
813 }
814
815 fn create_initial(params: Self::InitParams) -> Self {
816 let Self::InitParams {
817 our_key,
818 node_key,
819 subject_id,
820 pass_votation,
821 helpers,
822 } = params;
823
824 Self {
825 helpers: Some(helpers),
826 node_key,
827 our_key,
828 request_id: String::default(),
829 version: 0,
830 subject_id,
831 pass_votation,
832 state: None,
833 request: None,
834 }
835 }
836
837 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
838 match event {
839 ApprPersistEvent::ChangeState { state, .. } => {
840 debug!(
841 event_type = "ChangeState",
842 new_state = ?state,
843 "Approval state changed"
844 );
845 self.state = Some(state.clone());
846 }
847 ApprPersistEvent::SafeState {
848 request,
849 state,
850 request_id,
851 version,
852 ..
853 } => {
854 debug!(
855 event_type = "SafeState",
856 request_id = %request_id,
857 version = version,
858 new_state = ?state,
859 "Approval state saved"
860 );
861 self.version = *version;
862 self.request_id.clone_from(request_id);
863 self.request = Some(*request.clone());
864 self.state = Some(state.clone());
865 }
866 };
867
868 Ok(())
869 }
870}
871
872impl Storable for ApprPersist {}