1use std::time::Duration;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorError, ActorPath, Handler, Message, NotPersistentActor,
6};
7use ave_common::identity::DigestIdentifier;
8use serde::{Deserialize, Serialize};
9use tracing::{Span, debug, error, info_span};
10
11use crate::model::common::{emit_fail, subject::get_gov_sn};
12
13use super::manager::{RequestManager, RequestManagerMessage};
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct Reboot {
17 request_id: DigestIdentifier,
18 governance_id: DigestIdentifier,
19 actual_sn: u64,
20 count: u64,
21 stability_check_interval_secs: u64,
22 stability_check_max_retries: u64,
23}
24
25impl Reboot {
26 pub const fn new(
27 governance_id: DigestIdentifier,
28 request_id: DigestIdentifier,
29 stability_check_interval_secs: u64,
30 stability_check_max_retries: u64,
31 ) -> Self {
32 Self {
33 request_id,
34 governance_id,
35 actual_sn: 0,
36 count: 0,
37 stability_check_interval_secs,
38 stability_check_max_retries,
39 }
40 }
41
42 async fn sleep(
43 &self,
44 ctx: &ave_actors::ActorContext<Self>,
45 ) -> Result<(), ActorError> {
46 let actor = ctx.reference().await?;
47 let request = RebootMessage::Update;
48 let request_id = self.request_id.clone();
49 let governance_id = self.governance_id.clone();
50 let interval_secs = self.stability_check_interval_secs.max(1);
51 tokio::spawn(async move {
52 tokio::time::sleep(Duration::from_secs(interval_secs)).await;
53 if let Err(e) = actor.tell(request).await {
54 error!(
55 request_id = %request_id,
56 governance_id = %governance_id,
57 error = %e,
58 "Failed to send Update message to Reboot actor"
59 );
60 }
61 });
62
63 Ok(())
64 }
65
66 async fn finish(
67 &self,
68 ctx: &ave_actors::ActorContext<Self>,
69 ) -> Result<(), ActorError> {
70 debug!(
71 request_id = %self.request_id,
72 governance_id = %self.governance_id,
73 count = self.count,
74 "Finishing reboot, notifying parent"
75 );
76
77 let request_actor = match ctx.get_parent::<RequestManager>().await {
78 Ok(actor) => actor,
79 Err(e) => {
80 error!(
81 request_id = %self.request_id,
82 governance_id = %self.governance_id,
83 error = %e,
84 "Failed to get parent RequestManager"
85 );
86 return Err(e);
87 }
88 };
89
90 if let Err(e) = request_actor
91 .tell(RequestManagerMessage::FinishReboot {
92 request_id: self.request_id.clone(),
93 })
94 .await
95 {
96 error!(
97 request_id = %self.request_id,
98 governance_id = %self.governance_id,
99 error = %e,
100 "Failed to send FinishReboot message to parent"
101 );
102 return Err(e);
103 }
104
105 ctx.stop(None).await;
106 Ok(())
107 }
108}
109
110#[derive(Clone, Debug, Serialize, Deserialize)]
111pub enum RebootMessage {
112 Init,
113 Update,
114}
115
116impl Message for RebootMessage {}
117
118impl NotPersistentActor for Reboot {}
119
120#[async_trait]
121impl Actor for Reboot {
122 type Message = RebootMessage;
123 type Event = ();
124 type Response = ();
125
126 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
127 parent_span.map_or_else(
128 || info_span!("Reboot"),
129 |parent_span| info_span!(parent: parent_span, "Reboot"),
130 )
131 }
132}
133
134#[async_trait]
135impl Handler<Self> for Reboot {
136 async fn handle_message(
137 &mut self,
138 _sender: ActorPath,
139 msg: RebootMessage,
140 ctx: &mut ave_actors::ActorContext<Self>,
141 ) -> Result<(), ActorError> {
142 match msg {
143 RebootMessage::Init => {
144 match get_gov_sn(ctx, &self.governance_id).await {
145 Ok(sn) => {
146 self.actual_sn = sn;
147 debug!(
148 msg_type = "Init",
149 request_id = %self.request_id,
150 governance_id = %self.governance_id,
151 sn = sn,
152 "Reboot initialized with governance sn"
153 );
154 }
155 Err(e) => {
156 error!(
157 msg_type = "Init",
158 request_id = %self.request_id,
159 governance_id = %self.governance_id,
160 error = %e,
161 "Failed to get governance sn"
162 );
163 return Err(emit_fail(ctx, e).await);
164 }
165 };
166
167 if let Err(e) = self.sleep(ctx).await {
168 error!(
169 msg_type = "Init",
170 request_id = %self.request_id,
171 governance_id = %self.governance_id,
172 error = %e,
173 "Failed to schedule sleep"
174 );
175 return Err(emit_fail(ctx, e).await);
176 };
177 }
178 RebootMessage::Update => {
179 let actual_sn = self.actual_sn;
180 match get_gov_sn(ctx, &self.governance_id).await {
181 Ok(sn) => {
182 self.actual_sn = sn;
183 debug!(
184 msg_type = "Update",
185 request_id = %self.request_id,
186 governance_id = %self.governance_id,
187 old_sn = actual_sn,
188 new_sn = sn,
189 "Governance sn retrieved"
190 );
191 }
192 Err(e) => {
193 error!(
194 msg_type = "Update",
195 request_id = %self.request_id,
196 governance_id = %self.governance_id,
197 error = %e,
198 "Failed to get governance sn"
199 );
200 return Err(emit_fail(ctx, e).await);
201 }
202 };
203
204 if actual_sn == self.actual_sn {
205 self.count += 1;
206 debug!(
207 msg_type = "Update",
208 request_id = %self.request_id,
209 governance_id = %self.governance_id,
210 sn = actual_sn,
211 count = self.count,
212 "Governance sn unchanged, incrementing counter"
213 );
214 } else {
215 debug!(
216 msg_type = "Update",
217 request_id = %self.request_id,
218 governance_id = %self.governance_id,
219 old_sn = actual_sn,
220 new_sn = self.actual_sn,
221 count = self.count,
222 "Governance sn changed"
223 );
224 }
225
226 if self.count >= self.stability_check_max_retries {
227 debug!(
228 msg_type = "Update",
229 request_id = %self.request_id,
230 governance_id = %self.governance_id,
231 count = self.count,
232 "Max retry count reached, finishing reboot"
233 );
234 if let Err(e) = self.finish(ctx).await {
235 error!(
236 msg_type = "Update",
237 request_id = %self.request_id,
238 governance_id = %self.governance_id,
239 error = %e,
240 "Failed to finish reboot"
241 );
242 return Err(emit_fail(ctx, e).await);
243 }
244 } else if let Err(e) = self.sleep(ctx).await {
245 error!(
246 msg_type = "Update",
247 request_id = %self.request_id,
248 governance_id = %self.governance_id,
249 count = self.count,
250 error = %e,
251 "Failed to schedule sleep"
252 );
253 return Err(emit_fail(ctx, e).await);
254 };
255 }
256 };
257
258 Ok(())
259 }
260}