Skip to main content

ave_core/update/
mod.rs

1use std::{collections::HashSet, sync::Arc};
2
3use ave_actors::{
4    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
5    NotPersistentActor,
6};
7
8use async_trait::async_trait;
9use ave_common::identity::{DigestIdentifier, PublicKey};
10use network::ComunicateInfo;
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13use updater::{Updater, UpdaterMessage};
14
15use crate::{
16    NetworkMessage,
17    helpers::network::{ActorMessage, service::NetworkSender},
18    model::common::emit_fail,
19    request::manager::{RequestManager, RequestManagerMessage},
20};
21
22pub mod updater;
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub enum UpdateType {
26    Auth,
27    Request {
28        subject_id: DigestIdentifier,
29        id: DigestIdentifier,
30    },
31}
32
33pub struct UpdateNew {
34    pub subject_id: DigestIdentifier,
35    pub witnesses: HashSet<PublicKey>,
36    pub update_type: UpdateType,
37    pub network: Arc<NetworkSender>,
38    pub our_sn: Option<u64>,
39}
40
41#[derive(Clone, Debug)]
42pub struct Update {
43    subject_id: DigestIdentifier,
44    witnesses: HashSet<PublicKey>,
45    better: Option<(u64, PublicKey)>,
46    our_sn: Option<u64>,
47    update_type: UpdateType,
48    network: Arc<NetworkSender>,
49}
50
51impl Update {
52    pub fn new(data: UpdateNew) -> Self {
53        Self {
54            network: data.network,
55            subject_id: data.subject_id,
56            witnesses: data.witnesses,
57            update_type: data.update_type,
58            our_sn: data.our_sn,
59            better: None,
60        }
61    }
62
63    pub fn update_better(&mut self, sn: u64, sender: PublicKey) {
64        match self.better {
65            Some((better_sn, ..)) => {
66                if sn > better_sn {
67                    self.better = Some((sn, sender))
68                }
69            }
70            None => {
71                self.better = Some((sn, sender));
72            }
73        }
74    }
75
76    fn check_witness(&mut self, witness: PublicKey) -> bool {
77        self.witnesses.remove(&witness)
78    }
79
80    async fn create_updates(
81        &self,
82        ctx: &mut ActorContext<Self>,
83    ) -> Result<(), ActorError> {
84        for witness in self.witnesses.clone() {
85            let updater = Updater::new(witness.clone(), self.network.clone());
86            let child = ctx.create_child(&witness.to_string(), updater).await?;
87            let message = UpdaterMessage::NetworkLastSn {
88                subject_id: self.subject_id.clone(),
89                node_key: witness,
90            };
91
92            child.tell(message).await?;
93        }
94        Ok(())
95    }
96}
97
98#[derive(Debug, Clone)]
99pub enum UpdateMessage {
100    Run,
101    Response { sender: PublicKey, sn: u64 },
102}
103
104impl Message for UpdateMessage {}
105
106#[async_trait]
107impl Actor for Update {
108    type Event = ();
109    type Message = UpdateMessage;
110    type Response = ();
111
112    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
113        parent_span.map_or_else(
114            || info_span!("Update", id),
115            |parent_span| info_span!(parent: parent_span, "Update", id),
116        )
117    }
118}
119
120impl NotPersistentActor for Update {}
121
122#[async_trait]
123impl Handler<Self> for Update {
124    async fn handle_message(
125        &mut self,
126        _sender: ActorPath,
127        msg: UpdateMessage,
128        ctx: &mut ActorContext<Self>,
129    ) -> Result<(), ActorError> {
130        match msg {
131            UpdateMessage::Run => {
132                if let Err(e) = self.create_updates(ctx).await {
133                    error!(
134                        msg_type = "Run",
135                        error = %e,
136                        "Failed to create updates"
137                    );
138                    return Err(emit_fail(ctx, e).await);
139                } else {
140                    debug!(
141                        msg_type = "Run",
142                        witnesses_count = self.witnesses.len(),
143                        "Updates created successfully"
144                    );
145                }
146            }
147            UpdateMessage::Response { sender, sn } => {
148                if self.check_witness(sender.clone()) {
149                    self.update_better(sn, sender);
150
151                    if self.witnesses.is_empty() {
152                        if let Some((.., better_node)) = self.better.clone() {
153                            let info = ComunicateInfo {
154                                receiver: better_node.clone(),
155                                request_id: String::default(),
156                                version: 0,
157                                receiver_actor: format!(
158                                    "/user/node/distributor_{}",
159                                    self.subject_id
160                                ),
161                            };
162
163                            if let Err(e) = self
164                                    .network
165                                    .send_command(
166                                        network::CommandHelper::SendMessage {
167                                            message: NetworkMessage {
168                                                info: info.clone(),
169                                                message: ActorMessage::DistributionLedgerReq {
170                                                    actual_sn: self.our_sn,
171                                                    subject_id: self.subject_id.clone(),
172                                                },
173                                            },
174                                        },
175                                    )
176                                    .await
177                                {
178                                    error!(
179                                        msg_type = "Response",
180                                        error = %e,
181                                        node = %better_node,
182                                        "Failed to send request to network"
183                                    );
184                                    return Err(emit_fail(ctx, e).await);
185                                } else {
186                                    debug!(
187                                        msg_type = "Response",
188                                        node = %info.receiver,
189                                        subject_id = %self.subject_id,
190                                        "Request sent to better node"
191                                    );
192                                }
193                        }
194
195                        if let UpdateType::Request { id, subject_id } =
196                            &self.update_type
197                        {
198                            let request_path = ActorPath::from(format!(
199                                "/user/request/{}",
200                                subject_id
201                            ));
202                            match ctx
203                                .system()
204                                .get_actor::<RequestManager>(&request_path)
205                                .await
206                            {
207                                Ok(request_actor) => {
208                                    let request = if self.better.is_none() {
209                                        RequestManagerMessage::FinishReboot {
210                                            request_id: id.clone(),
211                                        }
212                                    } else {
213                                        RequestManagerMessage::RebootWait {
214                                            request_id: id.clone(),
215                                            governance_id: self
216                                                .subject_id
217                                                .clone(),
218                                        }
219                                    };
220
221                                    if let Err(e) =
222                                        request_actor.tell(request).await
223                                    {
224                                        error!(
225                                            msg_type = "Response",
226                                            error = %e,
227                                            subject_id = %self.subject_id,
228                                            "Failed to send response to request actor"
229                                        );
230                                        return Err(emit_fail(ctx, e).await);
231                                    }
232                                }
233                                Err(e) => {
234                                    error!(
235                                        msg_type = "Response",
236                                        path = %request_path,
237                                        subject_id = %self.subject_id,
238                                        "Request actor not found"
239                                    );
240                                    return Err(emit_fail(ctx, e).await);
241                                }
242                            };
243                        };
244
245                        debug!(
246                            msg_type = "Response",
247                            subject_id = %self.subject_id,
248                            has_better = self.better.is_some(),
249                            "All witnesses responded, update complete"
250                        );
251
252                        ctx.stop(None).await;
253                    }
254                }
255            }
256        };
257
258        Ok(())
259    }
260
261    async fn on_child_fault(
262        &mut self,
263        error: ActorError,
264        ctx: &mut ActorContext<Self>,
265    ) -> ChildAction {
266        error!(
267            subject_id = %self.subject_id,
268            update_type = ?self.update_type,
269            error = %error,
270            "Child fault in update actor"
271        );
272        emit_fail(ctx, error).await;
273        ChildAction::Stop
274    }
275}