Skip to main content

ave_core/request/
reboot.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorError, ActorPath, Handler, Message, NotPersistentActor,
6};
7use ave_common::identity::DigestIdentifier;
8use serde::{Deserialize, Serialize};
9use tracing::{Span, debug, error, info_span};
10
11use crate::model::common::{emit_fail, subject::get_gov_sn};
12
13use super::manager::{RequestManager, RequestManagerMessage};
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct Reboot {
17    request_id: DigestIdentifier,
18    governance_id: DigestIdentifier,
19    actual_sn: u64,
20    count: u64,
21    stability_check_interval_secs: u64,
22    stability_check_max_retries: u64,
23}
24
25impl Reboot {
26    pub const fn new(
27        governance_id: DigestIdentifier,
28        request_id: DigestIdentifier,
29        stability_check_interval_secs: u64,
30        stability_check_max_retries: u64,
31    ) -> Self {
32        Self {
33            request_id,
34            governance_id,
35            actual_sn: 0,
36            count: 0,
37            stability_check_interval_secs,
38            stability_check_max_retries,
39        }
40    }
41
42    async fn sleep(
43        &self,
44        ctx: &ave_actors::ActorContext<Self>,
45    ) -> Result<(), ActorError> {
46        let actor = ctx.reference().await?;
47        let request = RebootMessage::Update;
48        let request_id = self.request_id.clone();
49        let governance_id = self.governance_id.clone();
50        let interval_secs = self.stability_check_interval_secs.max(1);
51        tokio::spawn(async move {
52            tokio::time::sleep(Duration::from_secs(interval_secs)).await;
53            if let Err(e) = actor.tell(request).await {
54                error!(
55                    request_id = %request_id,
56                    governance_id = %governance_id,
57                    error = %e,
58                    "Failed to send Update message to Reboot actor"
59                );
60            }
61        });
62
63        Ok(())
64    }
65
66    async fn finish(
67        &self,
68        ctx: &ave_actors::ActorContext<Self>,
69    ) -> Result<(), ActorError> {
70        debug!(
71            request_id = %self.request_id,
72            governance_id = %self.governance_id,
73            count = self.count,
74            "Finishing reboot, notifying parent"
75        );
76
77        let request_actor = match ctx.get_parent::<RequestManager>().await {
78            Ok(actor) => actor,
79            Err(e) => {
80                error!(
81                    request_id = %self.request_id,
82                    governance_id = %self.governance_id,
83                    error = %e,
84                    "Failed to get parent RequestManager"
85                );
86                return Err(e);
87            }
88        };
89
90        if let Err(e) = request_actor
91            .tell(RequestManagerMessage::FinishReboot {
92                request_id: self.request_id.clone(),
93            })
94            .await
95        {
96            error!(
97                request_id = %self.request_id,
98                governance_id = %self.governance_id,
99                error = %e,
100                "Failed to send FinishReboot message to parent"
101            );
102            return Err(e);
103        }
104
105        ctx.stop(None).await;
106        Ok(())
107    }
108}
109
110#[derive(Clone, Debug, Serialize, Deserialize)]
111pub enum RebootMessage {
112    Init,
113    Update,
114}
115
116impl Message for RebootMessage {}
117
118impl NotPersistentActor for Reboot {}
119
120#[async_trait]
121impl Actor for Reboot {
122    type Message = RebootMessage;
123    type Event = ();
124    type Response = ();
125
126    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
127        parent_span.map_or_else(
128            || info_span!("Reboot"),
129            |parent_span| info_span!(parent: parent_span, "Reboot"),
130        )
131    }
132}
133
134#[async_trait]
135impl Handler<Self> for Reboot {
136    async fn handle_message(
137        &mut self,
138        _sender: ActorPath,
139        msg: RebootMessage,
140        ctx: &mut ave_actors::ActorContext<Self>,
141    ) -> Result<(), ActorError> {
142        match msg {
143            RebootMessage::Init => {
144                match get_gov_sn(ctx, &self.governance_id).await {
145                    Ok(sn) => {
146                        self.actual_sn = sn;
147                        debug!(
148                            msg_type = "Init",
149                            request_id = %self.request_id,
150                            governance_id = %self.governance_id,
151                            sn = sn,
152                            "Reboot initialized with governance sn"
153                        );
154                    }
155                    Err(e) => {
156                        error!(
157                            msg_type = "Init",
158                            request_id = %self.request_id,
159                            governance_id = %self.governance_id,
160                            error = %e,
161                            "Failed to get governance sn"
162                        );
163                        return Err(emit_fail(ctx, e).await);
164                    }
165                };
166
167                if let Err(e) = self.sleep(ctx).await {
168                    error!(
169                        msg_type = "Init",
170                        request_id = %self.request_id,
171                        governance_id = %self.governance_id,
172                        error = %e,
173                        "Failed to schedule sleep"
174                    );
175                    return Err(emit_fail(ctx, e).await);
176                };
177            }
178            RebootMessage::Update => {
179                let actual_sn = self.actual_sn;
180                match get_gov_sn(ctx, &self.governance_id).await {
181                    Ok(sn) => {
182                        self.actual_sn = sn;
183                        debug!(
184                            msg_type = "Update",
185                            request_id = %self.request_id,
186                            governance_id = %self.governance_id,
187                            old_sn = actual_sn,
188                            new_sn = sn,
189                            "Governance sn retrieved"
190                        );
191                    }
192                    Err(e) => {
193                        error!(
194                            msg_type = "Update",
195                            request_id = %self.request_id,
196                            governance_id = %self.governance_id,
197                            error = %e,
198                            "Failed to get governance sn"
199                        );
200                        return Err(emit_fail(ctx, e).await);
201                    }
202                };
203
204                if actual_sn == self.actual_sn {
205                    self.count += 1;
206                    debug!(
207                        msg_type = "Update",
208                        request_id = %self.request_id,
209                        governance_id = %self.governance_id,
210                        sn = actual_sn,
211                        count = self.count,
212                        "Governance sn unchanged, incrementing counter"
213                    );
214                } else {
215                    debug!(
216                        msg_type = "Update",
217                        request_id = %self.request_id,
218                        governance_id = %self.governance_id,
219                        old_sn = actual_sn,
220                        new_sn = self.actual_sn,
221                        count = self.count,
222                        "Governance sn changed"
223                    );
224                }
225
226                if self.count >= self.stability_check_max_retries {
227                    debug!(
228                        msg_type = "Update",
229                        request_id = %self.request_id,
230                        governance_id = %self.governance_id,
231                        count = self.count,
232                        "Max retry count reached, finishing reboot"
233                    );
234                    if let Err(e) = self.finish(ctx).await {
235                        error!(
236                            msg_type = "Update",
237                            request_id = %self.request_id,
238                            governance_id = %self.governance_id,
239                            error = %e,
240                            "Failed to finish reboot"
241                        );
242                        return Err(emit_fail(ctx, e).await);
243                    }
244                } else if let Err(e) = self.sleep(ctx).await {
245                    error!(
246                        msg_type = "Update",
247                        request_id = %self.request_id,
248                        governance_id = %self.governance_id,
249                        count = self.count,
250                        error = %e,
251                        "Failed to schedule sleep"
252                    );
253                    return Err(emit_fail(ctx, e).await);
254                };
255            }
256        };
257
258        Ok(())
259    }
260}