Skip to main content

ave_core/request/
reboot.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorError, ActorPath, Handler, Message, NotPersistentActor,
6};
7use ave_common::identity::DigestIdentifier;
8use serde::{Deserialize, Serialize};
9use tracing::{Span, debug, error, info_span};
10
11use crate::model::common::{emit_fail, subject::get_gov_sn};
12
13use super::manager::{RequestManager, RequestManagerMessage};
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct Reboot {
17    request_id: DigestIdentifier,
18    governance_id: DigestIdentifier,
19    actual_sn: u64,
20    count: u64,
21}
22
23impl Reboot {
24    pub const fn new(
25        governance_id: DigestIdentifier,
26        request_id: DigestIdentifier,
27    ) -> Self {
28        Self {
29            request_id,
30            governance_id,
31            actual_sn: 0,
32            count: 0,
33        }
34    }
35
36    async fn sleep(
37        &self,
38        ctx: &ave_actors::ActorContext<Self>,
39    ) -> Result<(), ActorError> {
40        let actor = ctx.reference().await?;
41        let request = RebootMessage::Update;
42        let request_id = self.request_id.clone();
43        let governance_id = self.governance_id.clone();
44        tokio::spawn(async move {
45            tokio::time::sleep(Duration::from_secs(5)).await;
46            if let Err(e) = actor.tell(request).await {
47                error!(
48                    request_id = %request_id,
49                    governance_id = %governance_id,
50                    error = %e,
51                    "Failed to send Update message to Reboot actor"
52                );
53            }
54        });
55
56        Ok(())
57    }
58
59    async fn finish(
60        &self,
61        ctx: &ave_actors::ActorContext<Self>,
62    ) -> Result<(), ActorError> {
63        debug!(
64            request_id = %self.request_id,
65            governance_id = %self.governance_id,
66            count = self.count,
67            "Finishing reboot, notifying parent"
68        );
69
70        let request_actor = match ctx.get_parent::<RequestManager>().await {
71            Ok(actor) => actor,
72            Err(e) => {
73                error!(
74                    request_id = %self.request_id,
75                    governance_id = %self.governance_id,
76                    error = %e,
77                    "Failed to get parent RequestManager"
78                );
79                return Err(e);
80            }
81        };
82
83        if let Err(e) = request_actor
84            .tell(RequestManagerMessage::FinishReboot {
85                request_id: self.request_id.clone(),
86            })
87            .await
88        {
89            error!(
90                request_id = %self.request_id,
91                governance_id = %self.governance_id,
92                error = %e,
93                "Failed to send FinishReboot message to parent"
94            );
95            return Err(e);
96        }
97
98        ctx.stop(None).await;
99        Ok(())
100    }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum RebootMessage {
105    Init,
106    Update,
107}
108
109impl Message for RebootMessage {}
110
111impl NotPersistentActor for Reboot {}
112
113#[async_trait]
114impl Actor for Reboot {
115    type Message = RebootMessage;
116    type Event = ();
117    type Response = ();
118
119    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
120        parent_span.map_or_else(
121            || info_span!("Reboot"),
122            |parent_span| info_span!(parent: parent_span, "Reboot"),
123        )
124    }
125}
126
127#[async_trait]
128impl Handler<Self> for Reboot {
129    async fn handle_message(
130        &mut self,
131        _sender: ActorPath,
132        msg: RebootMessage,
133        ctx: &mut ave_actors::ActorContext<Self>,
134    ) -> Result<(), ActorError> {
135        match msg {
136            RebootMessage::Init => {
137                match get_gov_sn(ctx, &self.governance_id).await {
138                    Ok(sn) => {
139                        self.actual_sn = sn;
140                        debug!(
141                            msg_type = "Init",
142                            request_id = %self.request_id,
143                            governance_id = %self.governance_id,
144                            sn = sn,
145                            "Reboot initialized with governance sn"
146                        );
147                    }
148                    Err(e) => {
149                        error!(
150                            msg_type = "Init",
151                            request_id = %self.request_id,
152                            governance_id = %self.governance_id,
153                            error = %e,
154                            "Failed to get governance sn"
155                        );
156                        return Err(emit_fail(ctx, e).await);
157                    }
158                };
159
160                if let Err(e) = self.sleep(ctx).await {
161                    error!(
162                        msg_type = "Init",
163                        request_id = %self.request_id,
164                        governance_id = %self.governance_id,
165                        error = %e,
166                        "Failed to schedule sleep"
167                    );
168                    return Err(emit_fail(ctx, e).await);
169                };
170            }
171            RebootMessage::Update => {
172                let actual_sn = self.actual_sn;
173
174                match get_gov_sn(ctx, &self.governance_id).await {
175                    Ok(sn) => {
176                        self.actual_sn = sn;
177                        debug!(
178                            msg_type = "Update",
179                            request_id = %self.request_id,
180                            governance_id = %self.governance_id,
181                            old_sn = actual_sn,
182                            new_sn = sn,
183                            "Governance sn retrieved"
184                        );
185                    }
186                    Err(e) => {
187                        error!(
188                            msg_type = "Update",
189                            request_id = %self.request_id,
190                            governance_id = %self.governance_id,
191                            error = %e,
192                            "Failed to get governance sn"
193                        );
194                        return Err(emit_fail(ctx, e).await);
195                    }
196                };
197
198                if actual_sn == self.actual_sn {
199                    self.count += 1;
200                    debug!(
201                        msg_type = "Update",
202                        request_id = %self.request_id,
203                        governance_id = %self.governance_id,
204                        sn = actual_sn,
205                        count = self.count,
206                        "Governance sn unchanged, incrementing counter"
207                    );
208                } else {
209                    debug!(
210                        msg_type = "Update",
211                        request_id = %self.request_id,
212                        governance_id = %self.governance_id,
213                        old_sn = actual_sn,
214                        new_sn = self.actual_sn,
215                        count = self.count,
216                        "Governance sn changed"
217                    );
218                }
219
220                if self.count >= 3 {
221                    debug!(
222                        msg_type = "Update",
223                        request_id = %self.request_id,
224                        governance_id = %self.governance_id,
225                        count = self.count,
226                        "Max retry count reached, finishing reboot"
227                    );
228                    if let Err(e) = self.finish(ctx).await {
229                        error!(
230                            msg_type = "Update",
231                            request_id = %self.request_id,
232                            governance_id = %self.governance_id,
233                            error = %e,
234                            "Failed to finish reboot"
235                        );
236                        return Err(emit_fail(ctx, e).await);
237                    }
238                } else if let Err(e) = self.sleep(ctx).await {
239                    error!(
240                        msg_type = "Update",
241                        request_id = %self.request_id,
242                        governance_id = %self.governance_id,
243                        count = self.count,
244                        error = %e,
245                        "Failed to schedule sleep"
246                    );
247                    return Err(emit_fail(ctx, e).await);
248                };
249            }
250        };
251
252        Ok(())
253    }
254}