1use std::time::Duration;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorError, ActorPath, Handler, Message, NotPersistentActor,
6};
7use ave_common::identity::DigestIdentifier;
8use serde::{Deserialize, Serialize};
9use tracing::{Span, debug, error, info_span};
10
11use crate::model::common::{emit_fail, subject::get_gov_sn};
12
13use super::manager::{RequestManager, RequestManagerMessage};
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct Reboot {
17 request_id: DigestIdentifier,
18 governance_id: DigestIdentifier,
19 actual_sn: u64,
20 count: u64,
21}
22
23impl Reboot {
24 pub const fn new(
25 governance_id: DigestIdentifier,
26 request_id: DigestIdentifier,
27 ) -> Self {
28 Self {
29 request_id,
30 governance_id,
31 actual_sn: 0,
32 count: 0,
33 }
34 }
35
36 async fn sleep(
37 &self,
38 ctx: &ave_actors::ActorContext<Self>,
39 ) -> Result<(), ActorError> {
40 let actor = ctx.reference().await?;
41 let request = RebootMessage::Update;
42 let request_id = self.request_id.clone();
43 let governance_id = self.governance_id.clone();
44 tokio::spawn(async move {
45 tokio::time::sleep(Duration::from_secs(5)).await;
46 if let Err(e) = actor.tell(request).await {
47 error!(
48 request_id = %request_id,
49 governance_id = %governance_id,
50 error = %e,
51 "Failed to send Update message to Reboot actor"
52 );
53 }
54 });
55
56 Ok(())
57 }
58
59 async fn finish(
60 &self,
61 ctx: &ave_actors::ActorContext<Self>,
62 ) -> Result<(), ActorError> {
63 debug!(
64 request_id = %self.request_id,
65 governance_id = %self.governance_id,
66 count = self.count,
67 "Finishing reboot, notifying parent"
68 );
69
70 let request_actor = match ctx.get_parent::<RequestManager>().await {
71 Ok(actor) => actor,
72 Err(e) => {
73 error!(
74 request_id = %self.request_id,
75 governance_id = %self.governance_id,
76 error = %e,
77 "Failed to get parent RequestManager"
78 );
79 return Err(e);
80 }
81 };
82
83 if let Err(e) = request_actor
84 .tell(RequestManagerMessage::FinishReboot {
85 request_id: self.request_id.clone(),
86 })
87 .await
88 {
89 error!(
90 request_id = %self.request_id,
91 governance_id = %self.governance_id,
92 error = %e,
93 "Failed to send FinishReboot message to parent"
94 );
95 return Err(e);
96 }
97
98 ctx.stop(None).await;
99 Ok(())
100 }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum RebootMessage {
105 Init,
106 Update,
107}
108
109impl Message for RebootMessage {}
110
111impl NotPersistentActor for Reboot {}
112
113#[async_trait]
114impl Actor for Reboot {
115 type Message = RebootMessage;
116 type Event = ();
117 type Response = ();
118
119 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
120 parent_span.map_or_else(
121 || info_span!("Reboot"),
122 |parent_span| info_span!(parent: parent_span, "Reboot"),
123 )
124 }
125}
126
127#[async_trait]
128impl Handler<Self> for Reboot {
129 async fn handle_message(
130 &mut self,
131 _sender: ActorPath,
132 msg: RebootMessage,
133 ctx: &mut ave_actors::ActorContext<Self>,
134 ) -> Result<(), ActorError> {
135 match msg {
136 RebootMessage::Init => {
137 match get_gov_sn(ctx, &self.governance_id).await {
138 Ok(sn) => {
139 self.actual_sn = sn;
140 debug!(
141 msg_type = "Init",
142 request_id = %self.request_id,
143 governance_id = %self.governance_id,
144 sn = sn,
145 "Reboot initialized with governance sn"
146 );
147 }
148 Err(e) => {
149 error!(
150 msg_type = "Init",
151 request_id = %self.request_id,
152 governance_id = %self.governance_id,
153 error = %e,
154 "Failed to get governance sn"
155 );
156 return Err(emit_fail(ctx, e).await);
157 }
158 };
159
160 if let Err(e) = self.sleep(ctx).await {
161 error!(
162 msg_type = "Init",
163 request_id = %self.request_id,
164 governance_id = %self.governance_id,
165 error = %e,
166 "Failed to schedule sleep"
167 );
168 return Err(emit_fail(ctx, e).await);
169 };
170 }
171 RebootMessage::Update => {
172 let actual_sn = self.actual_sn;
173
174 match get_gov_sn(ctx, &self.governance_id).await {
175 Ok(sn) => {
176 self.actual_sn = sn;
177 debug!(
178 msg_type = "Update",
179 request_id = %self.request_id,
180 governance_id = %self.governance_id,
181 old_sn = actual_sn,
182 new_sn = sn,
183 "Governance sn retrieved"
184 );
185 }
186 Err(e) => {
187 error!(
188 msg_type = "Update",
189 request_id = %self.request_id,
190 governance_id = %self.governance_id,
191 error = %e,
192 "Failed to get governance sn"
193 );
194 return Err(emit_fail(ctx, e).await);
195 }
196 };
197
198 if actual_sn == self.actual_sn {
199 self.count += 1;
200 debug!(
201 msg_type = "Update",
202 request_id = %self.request_id,
203 governance_id = %self.governance_id,
204 sn = actual_sn,
205 count = self.count,
206 "Governance sn unchanged, incrementing counter"
207 );
208 } else {
209 debug!(
210 msg_type = "Update",
211 request_id = %self.request_id,
212 governance_id = %self.governance_id,
213 old_sn = actual_sn,
214 new_sn = self.actual_sn,
215 count = self.count,
216 "Governance sn changed"
217 );
218 }
219
220 if self.count >= 3 {
221 debug!(
222 msg_type = "Update",
223 request_id = %self.request_id,
224 governance_id = %self.governance_id,
225 count = self.count,
226 "Max retry count reached, finishing reboot"
227 );
228 if let Err(e) = self.finish(ctx).await {
229 error!(
230 msg_type = "Update",
231 request_id = %self.request_id,
232 governance_id = %self.governance_id,
233 error = %e,
234 "Failed to finish reboot"
235 );
236 return Err(emit_fail(ctx, e).await);
237 }
238 } else if let Err(e) = self.sleep(ctx).await {
239 error!(
240 msg_type = "Update",
241 request_id = %self.request_id,
242 governance_id = %self.governance_id,
243 count = self.count,
244 error = %e,
245 "Failed to schedule sleep"
246 );
247 return Err(emit_fail(ctx, e).await);
248 };
249 }
250 };
251
252 Ok(())
253 }
254}