1use std::fmt::Debug;
20use std::time::Duration;
21use std::{
22 fmt::Display,
23 path::{Path, PathBuf},
24};
25
26use async_trait::async_trait;
27use event::OtaRequest;
28use futures::stream::BoxStream;
29use futures::TryStreamExt;
30#[cfg(all(feature = "zbus", target_os = "linux"))]
31use ota_handler::{OtaEvent, OtaInProgress, OtaMessage, OtaStatusMessage};
32use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
33use serde::{Deserialize, Serialize};
34use tokio::sync::mpsc;
35use tokio_util::sync::CancellationToken;
36use tracing::{debug, error, info, warn};
37use uuid::Uuid;
38
39#[cfg(test)]
40use mockall::automock;
41
42use crate::controller::actor::Actor;
43use crate::error::DeviceManagerError;
44use crate::ota::rauc::BundleInfo;
45use crate::repository::StateRepository;
46use crate::DeviceManagerOptions;
47
48use self::config::{OtaConfig, Reboot};
49
50pub mod config;
51pub mod event;
52#[cfg(all(feature = "zbus", target_os = "linux"))]
53pub(crate) mod ota_handler;
54#[cfg(test)]
55mod ota_handler_test;
56pub(crate) mod rauc;
57
58#[derive(Debug, Clone, PartialEq, Eq, Default)]
60pub struct DeployProgress {
61 percentage: i32,
62 message: String,
63}
64
65impl Display for DeployProgress {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "progress {}%: {}", self.percentage, self.message)
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum DeployStatus {
74 Progress(DeployProgress),
75 Completed { signal: i32 },
76}
77
78pub type ProgressStream = BoxStream<'static, Result<DeployStatus, DeviceManagerError>>;
80
81#[cfg_attr(test, automock)]
83#[async_trait]
84pub trait SystemUpdate: Send + Sync {
85 async fn install_bundle(&self, source: &str) -> Result<(), DeviceManagerError>;
86 async fn last_error(&self) -> Result<String, DeviceManagerError>;
87 async fn info(&self, bundle: &str) -> Result<BundleInfo, DeviceManagerError>;
88 async fn operation(&self) -> Result<String, DeviceManagerError>;
89 async fn compatible(&self) -> Result<String, DeviceManagerError>;
90 async fn boot_slot(&self) -> Result<String, DeviceManagerError>;
91 async fn receive_completed(&self) -> Result<ProgressStream, DeviceManagerError>;
92 async fn get_primary(&self) -> Result<String, DeviceManagerError>;
93 async fn mark(
94 &self,
95 state: &str,
96 slot_identifier: &str,
97 ) -> Result<(String, String), DeviceManagerError>;
98}
99
100#[derive(thiserror::Error, Debug, Clone, PartialEq)]
104pub enum OtaError {
105 #[error("InvalidRequestError: {0}")]
107 Request(&'static str),
108 #[error("UpdateAlreadyInProgress")]
109 UpdateAlreadyInProgress,
111 #[error("NetworkError: {0}")]
112 Network(String),
114 #[error("IOError: {0}")]
115 Io(String),
117 #[error("InternalError: {0}")]
118 Internal(&'static str),
120 #[error("InvalidBaseImage: {0}")]
121 InvalidBaseImage(String),
123 #[error("SystemRollback: {0}")]
124 SystemRollback(&'static str),
126 #[error("Canceled")]
128 Canceled,
129 #[error("Inconsistent ota state")]
131 InconsistentState,
132}
133
134impl Default for DeployStatus {
135 fn default() -> Self {
136 DeployStatus::Progress(DeployProgress::default())
137 }
138}
139
140const DOWNLOAD_PERC_ROUNDING_STEP: f64 = 10.0;
141const DEPLOY_PERC_ROUNDING_STEP: i32 = 10;
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct PersistentState {
145 pub uuid: Uuid,
146 pub slot: String,
147}
148
149#[derive(Clone, PartialEq, Debug)]
150pub enum OtaStatus {
151 Idle,
153 Init(OtaId),
155 NoPendingOta,
157 Acknowledged(OtaId),
159 Downloading(OtaId, i32),
161 Deploying(OtaId, DeployProgress),
163 Deployed(OtaId),
165 Rebooting(OtaId),
167 Rebooted,
169 Success(OtaId),
171 Error(OtaError, OtaId),
173 Failure(OtaError, Option<OtaId>),
175}
176
177impl OtaStatus {
178 fn is_cancellable(&self) -> bool {
180 match self {
181 OtaStatus::Init(_) | OtaStatus::Acknowledged(_) | OtaStatus::Downloading(_, _) => true,
182 OtaStatus::Idle
183 | OtaStatus::NoPendingOta
184 | OtaStatus::Deploying(_, _)
185 | OtaStatus::Deployed(_)
186 | OtaStatus::Rebooting(_)
187 | OtaStatus::Rebooted
188 | OtaStatus::Success(_)
189 | OtaStatus::Error(_, _)
190 | OtaStatus::Failure(_, _) => false,
191 }
192 }
193
194 fn ota_id(&self) -> Option<OtaId> {
195 match self {
196 OtaStatus::Idle | OtaStatus::NoPendingOta | OtaStatus::Rebooted => None,
197 OtaStatus::Init(id)
198 | OtaStatus::Acknowledged(id)
199 | OtaStatus::Downloading(id, _)
200 | OtaStatus::Deploying(id, _)
201 | OtaStatus::Deployed(id)
202 | OtaStatus::Rebooting(id)
203 | OtaStatus::Success(id)
204 | OtaStatus::Error(_, id) => Some(id.clone()),
205 OtaStatus::Failure(_, id) => id.clone(),
206 }
207 }
208
209 fn as_event(&self) -> Option<OtaEvent> {
211 let mut ota_event = OtaEvent {
212 requestUUID: "".to_string(),
213 status: "".to_string(),
214 statusProgress: 0,
215 statusCode: "".to_string(),
216 message: "".to_string(),
217 };
218
219 match self {
220 OtaStatus::Acknowledged(ota_request) => {
221 ota_event.requestUUID = ota_request.uuid.to_string();
222 ota_event.status = "Acknowledged".to_string();
223 }
224 OtaStatus::Downloading(ota_request, progress) => {
225 ota_event.requestUUID = ota_request.uuid.to_string();
226 ota_event.statusProgress = *progress;
227 ota_event.status = "Downloading".to_string();
228 }
229 OtaStatus::Deploying(ota_request, deploying_progress) => {
230 ota_event.requestUUID = ota_request.uuid.to_string();
231 ota_event.status = "Deploying".to_string();
232 ota_event.statusProgress = deploying_progress.percentage;
233 ota_event.message = deploying_progress.clone().message;
234 }
235 OtaStatus::Deployed(ota_request) => {
236 ota_event.requestUUID = ota_request.uuid.to_string();
237 ota_event.status = "Deployed".to_string();
238 }
239 OtaStatus::Rebooting(ota_request) => {
240 ota_event.requestUUID = ota_request.uuid.to_string();
241 ota_event.status = "Rebooting".to_string()
242 }
243 OtaStatus::Success(ota_request) => {
244 ota_event.requestUUID = ota_request.uuid.to_string();
245 ota_event.status = "Success".to_string();
246 }
247 OtaStatus::Failure(ota_error, Some(ota_request)) => {
248 ota_event.requestUUID = ota_request.uuid.to_string();
249 ota_event.status = "Failure".to_string();
250 let ota_status_message = OtaStatusMessage::from(ota_error);
251 ota_event.statusCode = ota_status_message.status_code;
252 ota_event.message = ota_status_message.message;
253 }
254 OtaStatus::Error(ota_error, ota_request) => {
255 ota_event.status = "Error".to_string();
256 ota_event.requestUUID = ota_request.uuid.to_string();
257 let ota_status_message = OtaStatusMessage::from(ota_error);
258 ota_event.statusCode = ota_status_message.status_code;
259 ota_event.message = ota_status_message.message;
260 }
261 OtaStatus::Idle
262 | OtaStatus::Init(_)
263 | OtaStatus::NoPendingOta
264 | OtaStatus::Rebooted
265 | OtaStatus::Failure(_, None) => return None,
266 }
267
268 if ota_event.requestUUID.is_empty() {
269 error!("Unable to convert ota_event: request_uuid is empty");
270 None
271 } else {
272 Some(ota_event)
273 }
274 }
275}
276
277impl Display for OtaStatus {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 match self {
280 OtaStatus::Idle => write!(f, "Idle"),
281 OtaStatus::Init(req) => write!(f, "Init {req}"),
282 OtaStatus::NoPendingOta => write!(f, "NoPendingOta"),
283 OtaStatus::Acknowledged(req) => write!(f, "Acknowledged {req}"),
284 OtaStatus::Downloading(req, progress) => {
285 write!(f, "Downloading {req} progress {progress}")
286 }
287 OtaStatus::Deploying(req, progress) => write!(f, "Deploying {req} {progress}"),
288 OtaStatus::Deployed(req) => write!(f, "Deployed {req}"),
289 OtaStatus::Rebooting(req) => write!(f, "Rebooting {req}"),
290 OtaStatus::Rebooted => write!(f, "Rebooted"),
291 OtaStatus::Success(req) => write!(f, "Success {req}"),
292 OtaStatus::Error(err, req) => write!(f, "Error {req}: {err}"),
293 OtaStatus::Failure(err, req) => {
294 write!(f, "Failure")?;
295
296 if let Some(req) = req {
297 write!(f, " {req}")?;
298 }
299
300 write!(f, ": {err}")
301 }
302 }
303 }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct OtaId {
308 pub uuid: Uuid,
309 pub url: String,
310}
311
312impl From<OtaRequest> for OtaId {
313 fn from(value: OtaRequest) -> Self {
314 Self {
315 uuid: value.uuid.0,
316 url: value.url,
317 }
318 }
319}
320
321impl Display for OtaId {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 write!(f, "{}", self.uuid)
324 }
325}
326
327pub struct Ota<T, U>
329where
330 T: SystemUpdate,
331 U: StateRepository<PersistentState>,
332{
333 pub system_update: T,
334 pub config: OtaConfig,
335 pub state_repository: U,
336 pub download_file_path: PathBuf,
337 pub ota_status: OtaStatus,
338 pub flag: OtaInProgress,
339 pub publisher_tx: mpsc::Sender<OtaStatus>,
340}
341
342#[async_trait]
343impl<T, U> Actor for Ota<T, U>
344where
345 T: SystemUpdate,
346 U: StateRepository<PersistentState>,
347{
348 type Msg = OtaMessage;
349
350 fn task() -> &'static str {
351 "ota"
352 }
353
354 async fn init(&mut self) -> stable_eyre::Result<()> {
355 if self.state_repository.exists().await {
356 self.ota_status = OtaStatus::Rebooted;
357 }
358
359 let cancel = CancellationToken::new();
361
362 self.handle_ota_update(cancel).await;
364
365 Ok(())
366 }
367
368 async fn handle(&mut self, msg: Self::Msg) -> stable_eyre::Result<()> {
369 if self.ota_status != OtaStatus::Idle {
370 error!("ota request already in progress");
371
372 return Err(OtaError::InconsistentState.into());
373 }
374
375 self.ota_status = OtaStatus::Init(msg.ota_id);
376 self.handle_ota_update(msg.cancel).await;
377
378 Ok(())
379 }
380}
381
382impl<T, U> Ota<T, U>
383where
384 T: SystemUpdate,
385 U: StateRepository<PersistentState>,
386{
387 pub fn new(
388 opts: &DeviceManagerOptions,
389 tx_publisher: mpsc::Sender<OtaStatus>,
390 flag: OtaInProgress,
391 system_update: T,
392 state_repository: U,
393 ) -> Result<Self, DeviceManagerError> {
394 Ok(Ota {
395 system_update,
396 config: opts.ota,
397 state_repository,
398 download_file_path: opts.download_directory.clone(),
399 ota_status: OtaStatus::Idle,
400 flag,
401 publisher_tx: tx_publisher,
402 })
403 }
404
405 pub async fn last_error(&self) -> Result<String, DeviceManagerError> {
406 self.system_update.last_error().await
407 }
408
409 fn get_install_uri(&self, req: &OtaId) -> String {
411 if self.config.streaming {
412 req.url.clone()
413 } else {
414 self.get_update_file_path().to_string_lossy().to_string()
415 }
416 }
417
418 fn get_update_file_path(&self) -> PathBuf {
420 self.download_file_path.join("update.bin")
421 }
422
423 fn start_update(&self, ota_request: OtaId) -> OtaStatus {
425 if self.config.streaming {
426 debug!("streaming image directly to disk");
427
428 OtaStatus::Deploying(ota_request, DeployProgress::default())
429 } else {
430 debug!("downloading image file");
431
432 OtaStatus::Downloading(ota_request, 0)
433 }
434 }
435
436 async fn retry_download(
438 &self,
439 req: &OtaId,
440 ota_path: &Path,
441 ota_file: &str,
442 ) -> Result<String, OtaError> {
443 let client = create_http_client(req)?;
444
445 for i in 1..=5 {
446 debug!(time = i, "downloading ota image");
447
448 let res = wget(&client, req, ota_path, &self.publisher_tx).await;
449
450 match res {
451 Ok(()) => return Ok(ota_file.to_string()),
452 Err(err) => {
453 error!(error = format!("{err:#}"), "couldn't downloading the image");
454
455 if self
456 .publisher_tx
457 .send(OtaStatus::Error(err, req.clone()))
458 .await
459 .is_err()
460 {
461 warn!("ota_status_publisher dropped before send error_status")
462 }
463 }
464 }
465
466 let wait = u64::pow(2, i);
467
468 error!("Next attempt in {wait}s",);
469
470 tokio::time::sleep(tokio::time::Duration::from_secs(wait)).await;
471 }
472
473 Err(OtaError::Internal(
474 "Too many attempts to download the OTA file",
475 ))
476 }
477
478 pub async fn download(&self, ota_request: OtaId) -> OtaStatus {
480 let download_file_path = self.get_update_file_path();
481
482 let Some(download_file_str) = download_file_path.to_str() else {
483 return OtaStatus::Failure(
484 OtaError::Io("Wrong download file path".to_string()),
485 Some(ota_request),
486 );
487 };
488
489 let download_res = self
490 .retry_download(&ota_request, &download_file_path, download_file_str)
491 .await;
492
493 let ota_file = match download_res {
494 Ok(ota_file) => ota_file,
495 Err(err) => {
496 return OtaStatus::Failure(err, Some(ota_request));
497 }
498 };
499
500 let bundle_info = match self.system_update.info(&ota_file).await {
501 Ok(info) => info,
502 Err(err) => {
503 let message = format!(
504 "Unable to get info from ota_file in {}",
505 download_file_path.display()
506 );
507 error!("{message} : {err}");
508
509 return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
510 }
511 };
512
513 debug!("bundle info: {:?}", bundle_info);
514
515 let system_image_info = match self.system_update.compatible().await {
516 Ok(info) => info,
517 Err(err) => {
518 let message = "Unable to get info from current deployed image".to_string();
519
520 error!("{message} : {err}");
521
522 return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
523 }
524 };
525
526 if bundle_info.compatible != system_image_info {
527 let message = format!(
528 "bundle {} is not compatible with system {system_image_info}",
529 bundle_info.compatible
530 );
531 error!("{message}");
532 return OtaStatus::Failure(
533 OtaError::InvalidBaseImage(message),
534 Some(ota_request.clone()),
535 );
536 }
537
538 OtaStatus::Deploying(ota_request.clone(), DeployProgress::default())
539 }
540
541 pub async fn deploy(&self, ota_request: OtaId) -> OtaStatus {
543 let booted_slot = match self.system_update.boot_slot().await {
544 Ok(slot) => slot,
545 Err(err) => {
546 let message = "Unable to identify the booted slot";
547
548 error!("{message}: {err}");
549
550 return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request.clone()));
551 }
552 };
553
554 let state = PersistentState {
555 uuid: ota_request.uuid,
556 slot: booted_slot,
557 };
558
559 if let Err(error) = self.state_repository.write(&state).await {
560 let message = "Unable to persist ota state".to_string();
561 error!("{message} : {error}");
562 return OtaStatus::Failure(OtaError::Io(message), Some(ota_request));
563 };
564
565 if let Err(error) = self
566 .system_update
567 .install_bundle(&self.get_install_uri(&ota_request))
568 .await
569 {
570 let message = "Unable to install ota image".to_string();
571 error!("{message} : {error}");
572 return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
573 }
574
575 if let Err(error) = self.system_update.operation().await {
576 let message = "Unable to get status of ota operation";
577 error!("{message} : {error}");
578 return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
579 }
580
581 let stream = self.system_update.receive_completed().await;
582 let stream = match stream {
583 Ok(stream) => stream,
584 Err(err) => {
585 let message = "Unable to get status of ota operation";
586 error!("{message} : {err}");
587 return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
588 }
589 };
590
591 let signal = stream
592 .try_fold(
593 DeployStatus::Progress(DeployProgress::default()),
594 |prev_status, status| {
595 let ota_request_cl = ota_request.clone();
596 let status_cl = status.clone();
597
598 async move {
599 let progress = match status {
600 DeployStatus::Completed { .. } => {
601 return Ok(status);
602 }
603 DeployStatus::Progress(progress) => progress,
604 };
605
606 let last_progress_sent = match &prev_status {
607 DeployStatus::Progress(last_progress) => last_progress.percentage,
608 _ => progress.percentage,
609 };
610
611 if (progress.percentage - last_progress_sent) >= DEPLOY_PERC_ROUNDING_STEP {
612 let res = self
613 .publisher_tx
614 .send(OtaStatus::Deploying(ota_request_cl, progress))
615 .await;
616
617 if let Err(err) = res {
618 error!("couldn't send progress update: {err}")
619 }
620 return Ok(status_cl);
621 }
622 Ok(prev_status)
623 }
624 },
625 )
626 .await;
627
628 let signal = match signal {
629 Ok(DeployStatus::Completed { signal }) => signal,
630 Ok(DeployStatus::Progress(_)) => {
631 let message = "No progress completion event received";
632 error!("{message}");
633 return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
634 }
635 Err(err) => {
636 let message = "Unable to receive the install completed event";
637 error!("{message} : {err}");
638 return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
639 }
640 };
641
642 info!("Completed signal! {:?}", signal);
643
644 match signal {
645 0 => {
646 info!("Update successful");
647
648 OtaStatus::Deployed(ota_request)
649 }
650 _ => {
651 let message = format!("Update failed with signal {signal}");
652
653 match self.last_error().await {
654 Ok(err) => {
655 error!("{message}: {err}");
656 }
657 Err(err) => {
658 error!("{message}: {}", stable_eyre::Report::new(err));
659 }
660 }
661
662 OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request))
663 }
664 }
665 }
666
667 pub async fn reboot(&self, ota_request: OtaId) -> OtaStatus {
669 if self
670 .publisher_tx
671 .send(OtaStatus::Rebooting(ota_request.clone()))
672 .await
673 .is_err()
674 {
675 warn!("ota_status_publisher dropped before sending rebooting_status")
676 };
677
678 info!("Rebooting the device");
679
680 if cfg!(test) {
681 return OtaStatus::Rebooted;
682 }
683
684 match self.config.reboot {
685 Reboot::Default => {
686 if let Err(error) = crate::power_management::reboot().await {
687 let message = "Unable to run reboot command";
688
689 error!("{message} : {error}");
690
691 return OtaStatus::Failure(
692 OtaError::Internal(message),
693 Some(ota_request.clone()),
694 );
695 }
696 }
697 Reboot::External => {
698 info!("waiting for next reboot");
699 }
700 }
701
702 OtaStatus::Rebooting(ota_request)
703 }
704
705 async fn wait_reboot(&self, ota_request: OtaId) -> OtaStatus {
709 tokio::time::sleep(Duration::from_secs(30)).await;
710
711 OtaStatus::Rebooting(ota_request)
712 }
713
714 pub async fn check_reboot(&self) -> OtaStatus {
716 if !self.state_repository.exists().await {
717 return OtaStatus::NoPendingOta;
718 }
719
720 info!("Found pending update");
721
722 let ota_state = match self.state_repository.read().await {
723 Ok(state) => state,
724 Err(err) => {
725 let message = "Unable to read pending ota state".to_string();
726 error!("{message} : {}", err);
727 return OtaStatus::Failure(OtaError::Io(message), None);
728 }
729 };
730
731 let request_uuid = ota_state.uuid;
732 let ota_request = OtaId {
733 uuid: request_uuid,
734 url: "".to_string(),
735 };
736
737 if let Err(error) = self.do_pending_ota(&ota_state).await {
738 return OtaStatus::Failure(error, Some(ota_request));
739 }
740
741 OtaStatus::Success(ota_request)
742 }
743
744 pub async fn do_pending_ota(&self, state: &PersistentState) -> Result<(), OtaError> {
745 const GOOD_STATE: &str = "good";
746
747 let booted_slot = self.system_update.boot_slot().await.map_err(|error| {
748 let message = "Unable to identify the booted slot";
749 error!("{message}: {error}");
750 OtaError::Internal(message)
751 })?;
752
753 if state.slot == booted_slot {
754 let message = "Unable to switch slot";
755 error!("{message}");
756 return Err(OtaError::SystemRollback(message));
757 }
758
759 let primary_slot = self.system_update.get_primary().await.map_err(|error| {
760 let message = "Unable to get the current primary slot";
761 error!("{message}: {error}");
762 OtaError::Internal(message)
763 })?;
764
765 let (marked_slot, _) = self
766 .system_update
767 .mark(GOOD_STATE, &primary_slot)
768 .await
769 .map_err(|error| {
770 let message = "Unable to run marking slot operation";
771 error!("{message}: {error}");
772 OtaError::Internal(message)
773 })?;
774
775 if primary_slot != marked_slot {
776 let message = "Unable to mark slot";
777 Err(OtaError::Internal(message))
778 } else {
779 Ok(())
780 }
781 }
782
783 pub async fn next(&mut self) {
784 self.ota_status = match self.ota_status.clone() {
785 OtaStatus::Init(req) => OtaStatus::Acknowledged(req),
786 OtaStatus::Acknowledged(ota_request) => self.start_update(ota_request),
787 OtaStatus::Downloading(ota_request, _) => self.download(ota_request).await,
788 OtaStatus::Deploying(ota_request, _) => self.deploy(ota_request).await,
789 OtaStatus::Deployed(ota_request) => self.reboot(ota_request).await,
790 OtaStatus::Rebooted => self.check_reboot().await,
791 OtaStatus::Rebooting(ota_request) => self.wait_reboot(ota_request).await,
792 OtaStatus::Error(ota_error, ota_request) => {
793 OtaStatus::Failure(ota_error, Some(ota_request))
794 }
795 OtaStatus::Idle
796 | OtaStatus::NoPendingOta
797 | OtaStatus::Success(_)
798 | OtaStatus::Failure(_, _) => OtaStatus::Idle,
799 };
800 }
801
802 pub async fn handle_ota_update(&mut self, cancel: CancellationToken) {
803 let mut check_cancel = true;
804
805 while self.is_ota_in_progress() {
806 info!(status = %self.ota_status, "ota progress");
807
808 self.publish_status(self.ota_status.clone()).await;
809
810 if self.ota_status.is_cancellable() {
811 if cancel.run_until_cancelled(self.next()).await.is_none() {
812 check_cancel = false;
813 info!("OTA update cancelled");
814
815 self.ota_status =
816 OtaStatus::Failure(OtaError::Canceled, self.ota_status.ota_id())
817 }
818
819 continue;
820 } else if check_cancel && cancel.is_cancelled() {
821 check_cancel = false;
823
824 self.publish_status(OtaStatus::Failure(OtaError::Canceled, None))
825 .await;
826 }
827
828 self.next().await;
829 }
830
831 self.publish_status(self.ota_status.clone()).await;
833
834 self.clear().await;
835 }
836
837 async fn clear(&mut self) {
838 if self.state_repository.exists().await {
839 let _ = self.state_repository.clear().await.map_err(|error| {
840 error!("Error clearing the state repository: {:?}", error);
841 });
842 }
843
844 let path = self.get_update_file_path();
845 if path.exists() {
846 if let Err(e) = tokio::fs::remove_file(&path).await {
847 error!("Unable to remove {}: {}", path.display(), e);
848 }
849 }
850
851 self.ota_status = OtaStatus::Idle;
852 }
853
854 async fn publish_status(&self, status: OtaStatus) {
855 if self.publisher_tx.send(status).await.is_err() {
856 error!(
857 "ota publisher disconnected before sending status {}",
858 self.ota_status
859 )
860 }
861 }
862
863 fn is_ota_in_progress(&self) -> bool {
865 let in_progress = self.ota_status != OtaStatus::Idle;
867
868 self.flag.set_in_progress(in_progress);
869
870 in_progress
871 }
872}
873
874fn create_http_client(req: &OtaId) -> Result<reqwest::Client, OtaError> {
876 let mut headers = HeaderMap::new();
877 let name = HeaderName::from_static("x-edgehog-ota-id");
878 match HeaderValue::try_from(req.uuid.to_string()) {
879 Ok(value) => {
880 headers.append(name, value);
881 }
882 Err(error) => {
883 error!(%error, "couldn't set ota-id HTTP header value")
884 }
885 };
886 let tls = edgehog_tls::config().map_err(|error| {
887 error!(%error, "couldn't setup TLS configuration");
888
889 OtaError::Internal("couldn setup TLS configuration")
890 })?;
891 let client = reqwest::Client::builder()
892 .use_preconfigured_tls(tls)
893 .user_agent(concat!(
894 env!("CARGO_PKG_NAME"),
895 "/",
896 env!("CARGO_PKG_VERSION")
897 ))
898 .default_headers(headers)
899 .build()
900 .map_err(|error| {
901 error!(%error,"couldn't build HTTP client");
902
903 OtaError::Internal("couldn't build HTTP client")
904 })?;
905 Ok(client)
906}
907
908pub async fn wget(
909 client: &reqwest::Client,
910 req: &OtaId,
911 file_path: &Path,
912 ota_status_publisher: &mpsc::Sender<OtaStatus>,
913) -> Result<(), OtaError> {
914 use tokio_stream::StreamExt;
915
916 if file_path.exists() {
917 tokio::fs::remove_file(file_path).await.map_err(|err| {
918 error!(
919 "failed to remove old file '{}': {}",
920 file_path.display(),
921 err
922 );
923
924 OtaError::Internal("failed to remove old file")
925 })?;
926 }
927
928 info!(url = req.url, "Downloading");
929
930 let result_response = client.get(&req.url).send().await;
931
932 match result_response {
933 Err(err) => {
934 let message = "Error downloading update".to_string();
935 error!("{message}: {err:?}");
936 Err(OtaError::Network(message))
937 }
938 Ok(response) => {
939 debug!("Writing {}", file_path.display());
940
941 let total_size = response
942 .content_length()
943 .and_then(|size| if size == 0 { None } else { Some(size) })
944 .ok_or_else(|| {
945 OtaError::Network(format!("Unable to get content length from: {}", req.url))
946 })? as f64;
947
948 let mut downloaded: f64 = 0.0;
949 let mut last_percentage_sent = 0.0;
950 let mut stream = response.bytes_stream();
951
952 let mut os_file = tokio::fs::File::create(file_path).await.map_err(|error| {
953 let message = format!("Unable to create ota_file in {file_path:?}");
954 error!("{message} : {error:?}");
955 OtaError::Io(message)
956 })?;
957
958 while let Some(chunk_result) = stream.next().await {
959 let chunk = chunk_result.map_err(|error| {
960 let message = "Unable to parse response".to_string();
961 error!("{message} : {error:?}");
962 OtaError::Network(message)
963 })?;
964
965 if chunk.is_empty() {
966 continue;
967 }
968
969 let mut content = std::io::Cursor::new(&chunk);
970
971 tokio::io::copy(&mut content, &mut os_file)
972 .await
973 .map_err(|error| {
974 let message = format!("Unable to write chunk to ota_file in {file_path:?}");
975 error!("{message} : {error:?}");
976 OtaError::Io(message)
977 })?;
978
979 downloaded += chunk.len() as f64;
980 let progress_percentage = (downloaded / total_size) * 100.0;
981 if progress_percentage == 100.0
982 || (progress_percentage - last_percentage_sent) >= DOWNLOAD_PERC_ROUNDING_STEP
983 {
984 last_percentage_sent = progress_percentage;
985 if ota_status_publisher
986 .send(OtaStatus::Downloading(
987 req.clone(),
988 progress_percentage as i32,
989 ))
990 .await
991 .is_err()
992 {
993 warn!("ota_status_publisher dropped before send downloading_status")
994 }
995 }
996 }
997
998 if total_size == downloaded {
999 Ok(())
1000 } else {
1001 let message = "Unable to download file".to_string();
1002 error!("{message}");
1003 Err(OtaError::Network(message))
1004 }
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use std::io;
1012 use std::path::PathBuf;
1013 use std::time::Duration;
1014
1015 use futures::StreamExt;
1016 use httpmock::prelude::*;
1017 use mockall::{predicate, Sequence};
1018 use pretty_assertions::assert_eq;
1019 use tempdir::TempDir;
1020 use tokio::sync::mpsc;
1021 use tokio_util::sync::CancellationToken;
1022 use uuid::Uuid;
1023
1024 use crate::error::DeviceManagerError;
1025 use crate::ota::ota_handler_test::deploy_status_stream;
1026 use crate::ota::rauc::BundleInfo;
1027 use crate::ota::{create_http_client, wget, Ota, OtaId, OtaStatus, PersistentState};
1028 use crate::ota::{DeployProgress, DeployStatus, MockSystemUpdate, OtaError, SystemUpdate};
1029 use crate::repository::file_state_repository::FileStateError;
1030 use crate::repository::{MockStateRepository, StateRepository};
1031
1032 use super::config::OtaConfig;
1033 use super::ota_handler::OtaInProgress;
1034
1035 fn temp_dir(prefix: &str) -> (TempDir, PathBuf) {
1037 let dir = TempDir::new(&format!("edgehog-{prefix}")).unwrap();
1038 let path = dir.path().to_owned();
1039
1040 (dir, path)
1041 }
1042
1043 impl<T, U> Ota<T, U>
1044 where
1045 T: SystemUpdate,
1046 U: StateRepository<PersistentState>,
1047 {
1048 pub fn mock_new(
1050 system_update: T,
1051 state_repository: U,
1052 publisher_tx: mpsc::Sender<OtaStatus>,
1053 ) -> Self {
1054 Ota {
1055 system_update,
1056 state_repository,
1057 download_file_path: PathBuf::from("/dev/null"),
1058 ota_status: OtaStatus::Idle,
1059 publisher_tx,
1060 flag: OtaInProgress::default(),
1061 config: OtaConfig::default(),
1062 }
1063 }
1064
1065 pub fn mock_new_with_path(
1067 system_update: T,
1068 state_repository: U,
1069 prefix: &str,
1070 publisher_tx: mpsc::Sender<OtaStatus>,
1071 ) -> (Self, TempDir) {
1072 let (dir, path) = temp_dir(prefix);
1073 let mock = Ota {
1074 system_update,
1075 state_repository,
1076 download_file_path: path,
1077 ota_status: OtaStatus::Idle,
1078 publisher_tx,
1079 flag: OtaInProgress::default(),
1080 config: OtaConfig::default(),
1081 };
1082
1083 (mock, dir)
1084 }
1085 }
1086
1087 #[tokio::test]
1088 async fn last_error_ok() {
1089 let mut system_update = MockSystemUpdate::new();
1090 let state_mock = MockStateRepository::<PersistentState>::new();
1091
1092 system_update
1093 .expect_last_error()
1094 .returning(|| Ok("Unable to deploy image".to_string()));
1095
1096 let (tx, _rx) = mpsc::channel(10);
1097
1098 let ota = Ota::mock_new(system_update, state_mock, tx);
1099
1100 let last_error_result = ota.last_error().await;
1101
1102 assert!(last_error_result.is_ok());
1103 assert_eq!("Unable to deploy image", last_error_result.unwrap());
1104 }
1105
1106 #[tokio::test]
1107 async fn last_error_fail() {
1108 let mut system_update = MockSystemUpdate::new();
1109 let state_mock = MockStateRepository::<PersistentState>::new();
1110
1111 system_update.expect_last_error().returning(|| {
1112 Err(DeviceManagerError::Fatal(
1113 "Unable to call last error".to_string(),
1114 ))
1115 });
1116
1117 let (tx, _rx) = mpsc::channel(10);
1118
1119 let ota = Ota::mock_new(system_update, state_mock, tx);
1120
1121 let last_error_result = ota.last_error().await;
1122
1123 assert!(last_error_result.is_err());
1124 assert!(matches!(
1125 last_error_result.err().unwrap(),
1126 DeviceManagerError::Fatal(_)
1127 ))
1128 }
1129
1130 #[tokio::test]
1131 async fn try_to_downloading_success() {
1132 let state_mock = MockStateRepository::<PersistentState>::new();
1133 let system_update = MockSystemUpdate::new();
1134
1135 let (publisher_tx, _publisher_rx) = mpsc::channel(8);
1136 let mut ota = Ota::mock_new(system_update, state_mock, publisher_tx);
1137 let ota_id = OtaId {
1138 uuid: Uuid::new_v4(),
1139 url: String::new(),
1140 };
1141 ota.ota_status = OtaStatus::Acknowledged(ota_id.clone());
1142
1143 ota.next().await;
1144
1145 assert_eq!(ota.ota_status, OtaStatus::Downloading(ota_id.clone(), 0));
1146 }
1147
1148 #[tokio::test]
1149 async fn ota_streaming_success() {
1150 let slot = "A";
1151 let req = OtaId {
1152 uuid: Uuid::new_v4(),
1153 url: "http://example.com/ota.bin".to_string(),
1154 };
1155 let state = PersistentState {
1156 uuid: req.uuid,
1157 slot: slot.to_string(),
1158 };
1159
1160 let mut state_mock = MockStateRepository::<PersistentState>::new();
1161 let mut system_update = MockSystemUpdate::new();
1162 let mut seq = Sequence::new();
1163
1164 system_update
1165 .expect_boot_slot()
1166 .once()
1167 .in_sequence(&mut seq)
1168 .returning(|| Ok(slot.to_string()));
1169
1170 state_mock
1171 .expect_write()
1172 .once()
1173 .with(predicate::eq(state.clone()))
1174 .returning(|_| Ok(()));
1175
1176 system_update
1177 .expect_install_bundle()
1178 .once()
1179 .in_sequence(&mut seq)
1180 .with(predicate::eq(req.url.clone()))
1181 .returning(|_| Ok(()));
1182
1183 system_update
1184 .expect_operation()
1185 .once()
1186 .in_sequence(&mut seq)
1187 .returning(|| Ok("".to_string()));
1188
1189 system_update
1190 .expect_receive_completed()
1191 .once()
1192 .in_sequence(&mut seq)
1193 .returning(|| {
1194 let progress = [
1195 DeployStatus::Progress(DeployProgress {
1196 percentage: 50,
1197 message: "Copy image".to_string(),
1198 }),
1199 DeployStatus::Progress(DeployProgress {
1200 percentage: 100,
1201 message: "Installing is done".to_string(),
1202 }),
1203 DeployStatus::Completed { signal: 0 },
1204 ]
1205 .map(Ok);
1206
1207 Ok(futures::stream::iter(progress).boxed())
1208 });
1209
1210 state_mock
1211 .expect_exists()
1212 .once()
1213 .in_sequence(&mut seq)
1214 .returning(|| true);
1215
1216 state_mock
1217 .expect_read()
1218 .once()
1219 .in_sequence(&mut seq)
1220 .return_once(|| Ok(state));
1221
1222 let new_slot = "B";
1223
1224 system_update
1225 .expect_boot_slot()
1226 .once()
1227 .in_sequence(&mut seq)
1228 .return_once(|| Ok(new_slot.to_string()));
1229
1230 system_update
1231 .expect_get_primary()
1232 .once()
1233 .in_sequence(&mut seq)
1234 .returning(|| Ok(new_slot.to_string()));
1235
1236 system_update
1237 .expect_mark()
1238 .once()
1239 .in_sequence(&mut seq)
1240 .with(predicate::eq("good"), predicate::eq(new_slot))
1241 .return_once(|_, _| Ok((new_slot.to_string(), "".to_string())));
1242
1243 state_mock
1244 .expect_exists()
1245 .once()
1246 .in_sequence(&mut seq)
1247 .returning(|| true);
1248
1249 state_mock
1250 .expect_clear()
1251 .once()
1252 .in_sequence(&mut seq)
1253 .returning(|| Ok(()));
1254
1255 let (tx, mut rx) = mpsc::channel(10);
1256 let mut ota = Ota::mock_new(system_update, state_mock, tx);
1257 ota.config.streaming = true;
1258 ota.ota_status = OtaStatus::Acknowledged(req.clone());
1259
1260 tokio::time::timeout(
1261 Duration::from_secs(2),
1262 ota.handle_ota_update(CancellationToken::new()),
1263 )
1264 .await
1265 .unwrap();
1266
1267 let exp = [
1268 OtaStatus::Acknowledged(req.clone()),
1269 OtaStatus::Deploying(req.clone(), DeployProgress::default()),
1270 OtaStatus::Deploying(
1271 req.clone(),
1272 DeployProgress {
1273 percentage: 50,
1274 message: "Copy image".to_string(),
1275 },
1276 ),
1277 OtaStatus::Deploying(
1278 req.clone(),
1279 DeployProgress {
1280 percentage: 100,
1281 message: "Installing is done".to_string(),
1282 },
1283 ),
1284 OtaStatus::Deployed(req.clone()),
1285 OtaStatus::Rebooting(req.clone()),
1286 OtaStatus::Rebooted,
1287 OtaStatus::Success(OtaId {
1288 uuid: req.uuid,
1289 url: "".to_string(),
1290 }),
1291 OtaStatus::Idle,
1292 ];
1293
1294 let mut received = Vec::new();
1295 tokio::time::timeout(Duration::from_secs(2), rx.recv_many(&mut received, 10))
1296 .await
1297 .unwrap();
1298
1299 assert_eq!(received, exp);
1300
1301 assert!(rx.is_empty());
1302 }
1303
1304 #[tokio::test]
1305 async fn try_to_deploying_fail_ota_request() {
1306 let state_mock = MockStateRepository::<PersistentState>::new();
1307 let mut system_update = MockSystemUpdate::new();
1308
1309 system_update
1310 .expect_info()
1311 .returning(|_: &str| Err(DeviceManagerError::Fatal("Unable to get info".to_string())));
1312
1313 let mut ota_request = OtaId::default();
1314 let binary_content = b"\x80\x02\x03";
1315 let binary_size = binary_content.len();
1316
1317 let server = MockServer::start_async().await;
1318 ota_request.url = server.url("/ota.bin");
1319 let mock_ota_file_request = server
1320 .mock_async(|when, then| {
1321 when.method(GET).path("/ota.bin");
1322 then.status(200)
1323 .header("content-Length", binary_size.to_string())
1324 .body(binary_content);
1325 })
1326 .await;
1327
1328 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1329 let (ota, _dir) = Ota::mock_new_with_path(
1330 system_update,
1331 state_mock,
1332 "fail_ota_request",
1333 ota_status_publisher,
1334 );
1335
1336 let ota_status = ota.download(ota_request).await;
1337 mock_ota_file_request.assert_async().await;
1338
1339 let receive_result = ota_status_receiver.try_recv();
1340 assert!(receive_result.is_ok());
1341 let ota_status_received = receive_result.unwrap();
1342 assert!(matches!(
1343 ota_status_received,
1344 OtaStatus::Downloading(_, 100)
1345 ));
1346
1347 let receive_result = ota_status_receiver.try_recv();
1348 assert!(receive_result.is_err());
1349
1350 assert!(matches!(
1351 ota_status,
1352 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _),
1353 ));
1354 }
1355
1356 #[tokio::test]
1357 async fn try_to_deploying_fail_5_wget() {
1358 let state_mock = MockStateRepository::<PersistentState>::new();
1359 let mut system_update = MockSystemUpdate::new();
1360
1361 system_update.expect_info().returning(|_: &str| {
1362 Ok(BundleInfo {
1363 compatible: "rauc-demo-x86".to_string(),
1364 version: "1".to_string(),
1365 })
1366 });
1367
1368 let server = MockServer::start_async().await;
1369 let mock_ota_file_request = server
1370 .mock_async(|when, then| {
1371 when.method(GET).path("/ota.bin");
1372 then.status(404);
1373 })
1374 .await;
1375
1376 let ota_url = server.url("/ota.bin");
1377 let ota_id = OtaId {
1378 uuid: Uuid::new_v4(),
1379 url: ota_url.clone(),
1380 };
1381
1382 let (publisher_tx, mut publisher_rx) = mpsc::channel(10);
1383 let (ota, _dir) =
1384 Ota::mock_new_with_path(system_update, state_mock, "fail_5_wget", publisher_tx);
1385
1386 tokio::time::pause();
1387
1388 tokio::time::advance(tokio::time::Duration::from_secs(60)).await;
1389
1390 let status = ota.download(ota_id.clone()).await;
1391
1392 let exp = [
1393 OtaStatus::Error(
1394 OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1395 ota_id.clone(),
1396 ),
1397 OtaStatus::Error(
1398 OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1399 ota_id.clone(),
1400 ),
1401 OtaStatus::Error(
1402 OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1403 ota_id.clone(),
1404 ),
1405 OtaStatus::Error(
1406 OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1407 ota_id.clone(),
1408 ),
1409 OtaStatus::Error(
1410 OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1411 ota_id.clone(),
1412 ),
1413 ];
1414
1415 tokio::time::resume();
1417
1418 for status in exp {
1419 let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1420 .await
1421 .unwrap()
1422 .unwrap();
1423
1424 assert_eq!(val, status);
1425 }
1426
1427 assert_eq!(
1428 status,
1429 OtaStatus::Failure(
1430 OtaError::Internal("Too many attempts to download the OTA file"),
1431 Some(ota_id)
1432 )
1433 );
1434
1435 mock_ota_file_request.assert_hits_async(5).await;
1436 }
1437
1438 #[tokio::test]
1439 async fn try_to_deploying_fail_ota_info() {
1440 let state_mock = MockStateRepository::<PersistentState>::new();
1441 let mut system_update = MockSystemUpdate::new();
1442
1443 system_update
1444 .expect_info()
1445 .returning(|_: &str| Err(DeviceManagerError::Fatal("Unable to get info".to_string())));
1446
1447 let mut ota_request = OtaId::default();
1448 let binary_content = b"\x80\x02\x03";
1449 let binary_size = binary_content.len();
1450
1451 let server = MockServer::start_async().await;
1452 ota_request.url = server.url("/ota.bin");
1453 let mock_ota_file_request = server
1454 .mock_async(|when, then| {
1455 when.method(GET).path("/ota.bin");
1456 then.status(200)
1457 .header("content-Length", binary_size.to_string())
1458 .body(binary_content);
1459 })
1460 .await;
1461
1462 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1463 let (ota, _dir) = Ota::mock_new_with_path(
1464 system_update,
1465 state_mock,
1466 "fail_ota_info",
1467 ota_status_publisher,
1468 );
1469
1470 let ota_status = ota.download(ota_request).await;
1471 mock_ota_file_request.assert_async().await;
1472
1473 let receive_result = ota_status_receiver.try_recv();
1474 assert!(receive_result.is_ok());
1475 let ota_status_received = receive_result.unwrap();
1476 assert!(matches!(
1477 ota_status_received,
1478 OtaStatus::Downloading(_, 100)
1479 ));
1480
1481 assert!(matches!(
1482 ota_status,
1483 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _),
1484 ));
1485 }
1486
1487 #[tokio::test]
1488 async fn try_to_deploying_fail_ota_call_compatible() {
1489 let state_mock = MockStateRepository::<PersistentState>::new();
1490 let mut system_update = MockSystemUpdate::new();
1491
1492 system_update.expect_info().returning(|_: &str| {
1493 Ok(BundleInfo {
1494 compatible: "rauc-demo-x86".to_string(),
1495 version: "1".to_string(),
1496 })
1497 });
1498
1499 system_update
1500 .expect_compatible()
1501 .returning(|| Err(DeviceManagerError::Fatal("empty value".to_string())));
1502
1503 let binary_content = b"\x80\x02\x03";
1504 let binary_size = binary_content.len();
1505
1506 let mut ota_request = OtaId::default();
1507 let server = MockServer::start_async().await;
1508 ota_request.url = server.url("/ota.bin");
1509 let mock_ota_file_request = server
1510 .mock_async(|when, then| {
1511 when.method(GET).path("/ota.bin");
1512 then.status(200)
1513 .header("content-Length", binary_size.to_string())
1514 .body(binary_content);
1515 })
1516 .await;
1517
1518 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1519 let (ota, _dir) = Ota::mock_new_with_path(
1520 system_update,
1521 state_mock,
1522 "fail_ota_call_compatible",
1523 ota_status_publisher,
1524 );
1525
1526 let ota_status = ota.download(ota_request).await;
1527 mock_ota_file_request.assert_async().await;
1528
1529 let receive_result = ota_status_receiver.try_recv();
1530 assert!(receive_result.is_ok());
1531 let ota_status_received = receive_result.unwrap();
1532 assert!(matches!(
1533 ota_status_received,
1534 OtaStatus::Downloading(_, 100)
1535 ));
1536
1537 let receive_result = ota_status_receiver.try_recv();
1538 assert!(receive_result.is_err());
1539
1540 assert!(matches!(
1541 ota_status,
1542 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1543 ));
1544 }
1545
1546 #[tokio::test]
1547 async fn try_to_deploying_fail_compatible() {
1548 let state_mock = MockStateRepository::<PersistentState>::new();
1549 let mut system_update = MockSystemUpdate::new();
1550
1551 system_update.expect_info().returning(|_: &str| {
1552 Ok(BundleInfo {
1553 compatible: "rauc-demo-x86".to_string(),
1554 version: "1".to_string(),
1555 })
1556 });
1557
1558 system_update
1559 .expect_compatible()
1560 .returning(|| Ok("rauc-demo-arm".to_string()));
1561
1562 let mut ota_request = OtaId::default();
1563
1564 let binary_content = b"\x80\x02\x03";
1565 let binary_size = binary_content.len();
1566
1567 let server = MockServer::start_async().await;
1568 ota_request.url = server.url("/ota.bin");
1569 let mock_ota_file_request = server
1570 .mock_async(|when, then| {
1571 when.method(GET).path("/ota.bin");
1572 then.status(200)
1573 .header("content-Length", binary_size.to_string())
1574 .body(binary_content);
1575 })
1576 .await;
1577
1578 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1579 let (ota, _dir) = Ota::mock_new_with_path(
1580 system_update,
1581 state_mock,
1582 "fail_compatible",
1583 ota_status_publisher,
1584 );
1585
1586 let ota_status = ota.download(ota_request).await;
1587 mock_ota_file_request.assert_async().await;
1588
1589 let receive_result = ota_status_receiver.try_recv();
1590 assert!(receive_result.is_ok());
1591 let ota_status_received = receive_result.unwrap();
1592 assert!(matches!(
1593 ota_status_received,
1594 OtaStatus::Downloading(_, 100)
1595 ));
1596
1597 let receive_result = ota_status_receiver.try_recv();
1598 assert!(receive_result.is_err());
1599
1600 assert!(matches!(
1601 ota_status,
1602 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1603 ));
1604 }
1605
1606 #[tokio::test]
1607 async fn try_to_deploying_fail_call_boot_slot() {
1608 let mut state_mock = MockStateRepository::<PersistentState>::new();
1609 let mut system_update = MockSystemUpdate::new();
1610 let mut seq = Sequence::new();
1611
1612 system_update
1613 .expect_info()
1614 .once()
1615 .in_sequence(&mut seq)
1616 .returning(|_: &str| {
1617 Ok(BundleInfo {
1618 compatible: "rauc-demo-x86".to_string(),
1619 version: "1".to_string(),
1620 })
1621 });
1622
1623 system_update
1624 .expect_compatible()
1625 .once()
1626 .in_sequence(&mut seq)
1627 .returning(|| Ok("rauc-demo-x86".to_string()));
1628
1629 system_update
1630 .expect_boot_slot()
1631 .once()
1632 .in_sequence(&mut seq)
1633 .returning(|| {
1634 Err(DeviceManagerError::Fatal(
1635 "unable to call boot slot".to_string(),
1636 ))
1637 });
1638
1639 state_mock
1640 .expect_exists()
1641 .once()
1642 .in_sequence(&mut seq)
1643 .returning(|| false);
1644
1645 let binary_content = b"\x80\x02\x03";
1646 let binary_size = binary_content.len();
1647
1648 let server = MockServer::start_async().await;
1649 let mock_ota_file_request = server
1650 .mock_async(|when, then| {
1651 when.method(GET).path("/ota.bin");
1652 then.status(200)
1653 .header("content-Length", binary_size.to_string())
1654 .body(binary_content);
1655 })
1656 .await;
1657
1658 let url = server.url("/ota.bin");
1659 let req = OtaId {
1660 uuid: Uuid::new_v4(),
1661 url,
1662 };
1663
1664 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(6);
1665 let (mut ota, _dir) = Ota::mock_new_with_path(
1666 system_update,
1667 state_mock,
1668 "fail_call_boot_slot",
1669 ota_status_publisher,
1670 );
1671
1672 ota.ota_status = OtaStatus::Acknowledged(req.clone());
1673
1674 tokio::time::timeout(
1675 Duration::from_secs(2),
1676 ota.handle_ota_update(CancellationToken::new()),
1677 )
1678 .await
1679 .unwrap();
1680
1681 let expected = [
1682 OtaStatus::Acknowledged(req.clone()),
1683 OtaStatus::Downloading(req.clone(), 0),
1684 OtaStatus::Downloading(req.clone(), 100),
1685 OtaStatus::Deploying(req.clone(), DeployProgress::default()),
1686 OtaStatus::Failure(
1687 OtaError::Internal("Unable to identify the booted slot"),
1688 Some(req),
1689 ),
1690 OtaStatus::Idle,
1691 ];
1692
1693 for exp in expected {
1694 let status = ota_status_receiver.try_recv().unwrap();
1695
1696 assert_eq!(status, exp)
1697 }
1698
1699 assert!(ota_status_receiver.is_empty());
1700
1701 mock_ota_file_request.assert_async().await;
1702 }
1703
1704 #[tokio::test]
1705 async fn try_to_deploying_fail_write_state() {
1706 let mut state_mock = MockStateRepository::<PersistentState>::new();
1707 let mut system_update = MockSystemUpdate::new();
1708 let mut seq = Sequence::new();
1709
1710 system_update
1711 .expect_info()
1712 .once()
1713 .in_sequence(&mut seq)
1714 .returning(|_: &str| {
1715 Ok(BundleInfo {
1716 compatible: "rauc-demo-x86".to_string(),
1717 version: "1".to_string(),
1718 })
1719 });
1720
1721 system_update
1722 .expect_compatible()
1723 .once()
1724 .in_sequence(&mut seq)
1725 .returning(|| Ok("rauc-demo-x86".to_string()));
1726
1727 system_update
1728 .expect_boot_slot()
1729 .once()
1730 .in_sequence(&mut seq)
1731 .returning(|| Ok("A".to_string()));
1732
1733 state_mock
1734 .expect_write()
1735 .once()
1736 .in_sequence(&mut seq)
1737 .returning(|_| {
1738 Err(FileStateError::Write {
1739 path: "/ota.bin".into(),
1740 backtrace: io::Error::new(io::ErrorKind::PermissionDenied, "permission denied"),
1741 })
1742 });
1743
1744 state_mock
1745 .expect_exists()
1746 .once()
1747 .in_sequence(&mut seq)
1748 .returning(|| false);
1749
1750 let binary_content = b"\x80\x02\x03";
1751 let binary_size = binary_content.len();
1752
1753 let server = MockServer::start_async().await;
1754
1755 let mock_ota_file_request = server
1756 .mock_async(|when, then| {
1757 when.method(GET).path("/ota.bin");
1758 then.status(200)
1759 .header("content-length", binary_size.to_string())
1760 .body(binary_content);
1761 })
1762 .await;
1763
1764 let ota_url = server.url("/ota.bin");
1765
1766 let (publisher_tx, mut publisher_rx) = mpsc::channel(10);
1767 let (mut ota, _dir) =
1768 Ota::mock_new_with_path(system_update, state_mock, "fail_write_state", publisher_tx);
1769
1770 let ota_id = OtaId {
1771 uuid: Uuid::new_v4(),
1772 url: ota_url,
1773 };
1774
1775 tracing_subscriber::fmt()
1776 .with_max_level(tracing::Level::DEBUG)
1777 .with_test_writer()
1778 .init();
1779
1780 ota.ota_status = OtaStatus::Acknowledged(ota_id.clone());
1781
1782 tokio::time::timeout(
1783 Duration::from_secs(2),
1784 ota.handle_ota_update(CancellationToken::new()),
1785 )
1786 .await
1787 .unwrap();
1788
1789 let exp = [
1790 OtaStatus::Acknowledged(ota_id.clone()),
1791 OtaStatus::Downloading(ota_id.clone(), 0),
1792 OtaStatus::Downloading(ota_id.clone(), 100),
1793 OtaStatus::Deploying(ota_id.clone(), DeployProgress::default()),
1794 OtaStatus::Failure(
1795 OtaError::Io("Unable to persist ota state".to_string()),
1796 Some(ota_id),
1797 ),
1798 OtaStatus::Idle,
1799 ];
1800
1801 for status in exp {
1802 let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1803 .await
1804 .unwrap()
1805 .unwrap();
1806
1807 assert_eq!(val, status, "got {status}");
1808 }
1809
1810 assert!(publisher_rx.is_empty());
1811
1812 mock_ota_file_request.assert_async().await;
1813 }
1814
1815 #[tokio::test]
1816 async fn try_to_download_success() {
1817 let uuid = Uuid::new_v4();
1818
1819 let state_mock = MockStateRepository::<PersistentState>::new();
1820 let mut system_update = MockSystemUpdate::new();
1821 let mut seq = Sequence::new();
1822
1823 system_update
1824 .expect_info()
1825 .once()
1826 .in_sequence(&mut seq)
1827 .returning(|_: &str| {
1828 Ok(BundleInfo {
1829 compatible: "rauc-demo-x86".to_string(),
1830 version: "1".to_string(),
1831 })
1832 });
1833
1834 system_update
1835 .expect_compatible()
1836 .once()
1837 .in_sequence(&mut seq)
1838 .returning(|| Ok("rauc-demo-x86".to_string()));
1839
1840 let binary_content = b"\x80\x02\x03";
1841 let binary_size = binary_content.len();
1842
1843 let server = MockServer::start_async().await;
1844 let ota_url = server.url("/ota.bin");
1845
1846 let mock_ota_file_request = server
1847 .mock_async(|when, then| {
1848 when.method(GET).path("/ota.bin");
1849 then.status(200)
1850 .header("content-Length", binary_size.to_string())
1851 .body(binary_content);
1852 })
1853 .await;
1854
1855 let (publisher_tx, mut publisher_rx) = mpsc::channel(2);
1856 let (ota, _dir) =
1857 Ota::mock_new_with_path(system_update, state_mock, "deploying_success", publisher_tx);
1858
1859 let ota_id = OtaId { uuid, url: ota_url };
1860
1861 let status = ota.download(ota_id.clone()).await;
1862
1863 let exp = [OtaStatus::Downloading(ota_id.clone(), 100)];
1864
1865 for status in exp {
1866 let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1867 .await
1868 .unwrap()
1869 .unwrap();
1870
1871 assert_eq!(val, status);
1872 }
1873
1874 assert_eq!(
1875 status,
1876 OtaStatus::Deploying(
1877 ota_id,
1878 DeployProgress {
1879 percentage: 0,
1880 message: String::new()
1881 }
1882 )
1883 );
1884
1885 mock_ota_file_request.assert_async().await;
1886 }
1887
1888 #[tokio::test]
1889 async fn try_to_deployed_fail_install_bundle() {
1890 let req = OtaId::default();
1891
1892 let mut state_mock = MockStateRepository::<PersistentState>::new();
1893 let mut system_update = MockSystemUpdate::new();
1894 let mut seq = Sequence::new();
1895
1896 system_update
1897 .expect_boot_slot()
1898 .once()
1899 .in_sequence(&mut seq)
1900 .returning(|| Ok("A".to_string()));
1901
1902 state_mock
1903 .expect_write()
1904 .once()
1905 .in_sequence(&mut seq)
1906 .withf(move |p| p.uuid == req.uuid && p.slot == "A")
1907 .returning(|_| Ok(()));
1908
1909 system_update
1910 .expect_install_bundle()
1911 .once()
1912 .in_sequence(&mut seq)
1913 .returning(|_| Err(DeviceManagerError::Fatal("install fail".to_string())));
1914
1915 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1916 let (ota, _dir) = Ota::mock_new_with_path(
1917 system_update,
1918 state_mock,
1919 "fail_install_bundle",
1920 ota_status_publisher,
1921 );
1922
1923 let ota_status = ota.deploy(req).await;
1924
1925 let receive_result = ota_status_receiver.try_recv();
1926 assert!(receive_result.is_err());
1927
1928 assert!(matches!(
1929 ota_status,
1930 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1931 ));
1932 }
1933
1934 #[tokio::test]
1935 async fn try_to_deployed_fail_operation() {
1936 let req = OtaId::default();
1937
1938 let mut state_mock = MockStateRepository::<PersistentState>::new();
1939 let mut seq = Sequence::new();
1940
1941 state_mock
1942 .expect_write()
1943 .once()
1944 .in_sequence(&mut seq)
1945 .withf(move |p| p.uuid == req.uuid && p.slot == "A")
1946 .returning(|_| Ok(()));
1947
1948 let mut system_update = MockSystemUpdate::new();
1949 let mut seq = Sequence::new();
1950
1951 system_update
1952 .expect_boot_slot()
1953 .once()
1954 .in_sequence(&mut seq)
1955 .returning(|| Ok("A".to_string()));
1956 system_update
1957 .expect_install_bundle()
1958 .once()
1959 .in_sequence(&mut seq)
1960 .returning(|_| Ok(()));
1961 system_update
1962 .expect_operation()
1963 .once()
1964 .in_sequence(&mut seq)
1965 .returning(|| Err(DeviceManagerError::Fatal("operation call fail".to_string())));
1966
1967 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1968 let (ota, _dir) = Ota::mock_new_with_path(
1969 system_update,
1970 state_mock,
1971 "deployed_fail_operation",
1972 ota_status_publisher,
1973 );
1974
1975 let ota_status = ota.deploy(req).await;
1976
1977 let receive_result = ota_status_receiver.try_recv();
1978 assert!(receive_result.is_err());
1979
1980 assert!(matches!(
1981 ota_status,
1982 OtaStatus::Failure(OtaError::Internal(_), _)
1983 ));
1984 }
1985
1986 #[tokio::test]
1987 async fn try_to_deployed_fail_receive_completed() {
1988 let req = OtaId::default();
1989
1990 let mut state_mock = MockStateRepository::<PersistentState>::new();
1991 let mut system_update = MockSystemUpdate::new();
1992 let mut seq = Sequence::new();
1993
1994 system_update
1995 .expect_boot_slot()
1996 .once()
1997 .in_sequence(&mut seq)
1998 .returning(|| Ok("A".to_string()));
1999
2000 state_mock
2001 .expect_write()
2002 .once()
2003 .in_sequence(&mut seq)
2004 .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2005 .returning(|_| Ok(()));
2006
2007 system_update
2008 .expect_install_bundle()
2009 .once()
2010 .in_sequence(&mut seq)
2011 .returning(|_| Ok(()));
2012 system_update
2013 .expect_operation()
2014 .once()
2015 .in_sequence(&mut seq)
2016 .returning(|| Ok("".to_string()));
2017 system_update
2018 .expect_receive_completed()
2019 .once()
2020 .in_sequence(&mut seq)
2021 .returning(|| {
2022 Err(DeviceManagerError::Fatal(
2023 "receive_completed call fail".to_string(),
2024 ))
2025 });
2026
2027 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2028 let (ota, _dir) = Ota::mock_new_with_path(
2029 system_update,
2030 state_mock,
2031 "deployed_fail_receive_completed",
2032 ota_status_publisher,
2033 );
2034
2035 let ota_status = ota.deploy(req).await;
2036
2037 let receive_result = ota_status_receiver.try_recv();
2038 assert!(receive_result.is_err());
2039
2040 assert!(matches!(
2041 ota_status,
2042 OtaStatus::Failure(OtaError::Internal(_), _)
2043 ));
2044 }
2045
2046 #[tokio::test]
2047 async fn try_to_deployed_fail_signal() {
2048 let req = OtaId::default();
2049
2050 let mut state_mock = MockStateRepository::<PersistentState>::new();
2051 let mut seq = Sequence::new();
2052
2053 state_mock
2054 .expect_write()
2055 .once()
2056 .in_sequence(&mut seq)
2057 .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2058 .returning(|_| Ok(()));
2059
2060 let mut system_update = MockSystemUpdate::new();
2061 let mut seq = Sequence::new();
2062
2063 system_update
2064 .expect_boot_slot()
2065 .once()
2066 .in_sequence(&mut seq)
2067 .returning(|| Ok("A".to_string()));
2068 system_update
2069 .expect_install_bundle()
2070 .once()
2071 .in_sequence(&mut seq)
2072 .returning(|_| Ok(()));
2073 system_update
2074 .expect_operation()
2075 .once()
2076 .in_sequence(&mut seq)
2077 .returning(|| Ok("".to_string()));
2078 system_update
2079 .expect_receive_completed()
2080 .once()
2081 .in_sequence(&mut seq)
2082 .returning(|| deploy_status_stream([DeployStatus::Completed { signal: -1 }]));
2083 system_update
2084 .expect_last_error()
2085 .once()
2086 .in_sequence(&mut seq)
2087 .returning(|| Ok("Unable to deploy image".to_string()));
2088
2089 let (ota_status_publisher, _) = mpsc::channel(1);
2090 let (ota, _dir) = Ota::mock_new_with_path(
2091 system_update,
2092 state_mock,
2093 "deployed_fail_signal",
2094 ota_status_publisher,
2095 );
2096
2097 let ota_status = ota.deploy(req).await;
2098
2099 assert!(matches!(
2100 ota_status,
2101 OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
2102 ));
2103 }
2104
2105 #[tokio::test]
2106 async fn try_to_deployed_success() {
2107 let req = OtaId::default();
2108
2109 let mut state_mock = MockStateRepository::<PersistentState>::new();
2110 let mut seq = Sequence::new();
2111
2112 state_mock
2113 .expect_write()
2114 .once()
2115 .in_sequence(&mut seq)
2116 .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2117 .returning(|_| Ok(()));
2118
2119 let mut system_update = MockSystemUpdate::new();
2120 let mut seq = Sequence::new();
2121
2122 system_update
2123 .expect_boot_slot()
2124 .once()
2125 .in_sequence(&mut seq)
2126 .returning(|| Ok("A".to_string()));
2127 system_update
2128 .expect_install_bundle()
2129 .once()
2130 .in_sequence(&mut seq)
2131 .returning(|_| Ok(()));
2132 system_update
2133 .expect_operation()
2134 .once()
2135 .in_sequence(&mut seq)
2136 .returning(|| Ok("".to_string()));
2137 system_update
2138 .expect_receive_completed()
2139 .once()
2140 .in_sequence(&mut seq)
2141 .returning(|| {
2142 let progress = [
2143 DeployStatus::Progress(DeployProgress {
2144 percentage: 50,
2145 message: "Copy image".to_string(),
2146 }),
2147 DeployStatus::Progress(DeployProgress {
2148 percentage: 100,
2149 message: "Installing is done".to_string(),
2150 }),
2151 DeployStatus::Completed { signal: 0 },
2152 ]
2153 .map(Ok);
2154
2155 Ok(futures::stream::iter(progress).boxed())
2156 });
2157
2158 let (publisher_tx, mut publisher_rx) = mpsc::channel(3);
2159 let (ota, _dir) =
2160 Ota::mock_new_with_path(system_update, state_mock, "deployed_success", publisher_tx);
2161
2162 let status = ota.deploy(req.clone()).await;
2163
2164 let exp = [
2165 OtaStatus::Deploying(
2166 req.clone(),
2167 DeployProgress {
2168 percentage: 50,
2169 message: "Copy image".to_string(),
2170 },
2171 ),
2172 OtaStatus::Deploying(
2173 req.clone(),
2174 DeployProgress {
2175 percentage: 100,
2176 message: "Installing is done".to_string(),
2177 },
2178 ),
2179 ];
2180
2181 for status in exp {
2182 let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
2183 .await
2184 .unwrap()
2185 .unwrap();
2186
2187 assert_eq!(val, status);
2188 }
2189
2190 assert_eq!(status, OtaStatus::Deployed(req));
2191 }
2192
2193 #[tokio::test]
2194 async fn try_to_rebooting_success() {
2195 let state_mock = MockStateRepository::<PersistentState>::new();
2196 let system_update = MockSystemUpdate::new();
2197 let ota_request = OtaId::default();
2198
2199 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2200 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2201 let ota_status = ota.reboot(ota_request).await;
2202
2203 let receive_result = ota_status_receiver.try_recv();
2204 assert!(receive_result.is_ok());
2205 let ota_status_received = receive_result.unwrap();
2206 assert!(matches!(ota_status_received, OtaStatus::Rebooting(_)));
2207
2208 let receive_result = ota_status_receiver.try_recv();
2209 assert!(receive_result.is_err());
2210
2211 assert!(matches!(ota_status, OtaStatus::Rebooted));
2212 }
2213
2214 #[tokio::test]
2215 async fn try_to_success_no_pending_update() {
2216 let mut state_mock = MockStateRepository::<PersistentState>::new();
2217 let system_update = MockSystemUpdate::new();
2218
2219 state_mock.expect_exists().returning(|| false);
2220
2221 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2222 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2223 let ota_status = ota.check_reboot().await;
2224
2225 assert!(matches!(ota_status, OtaStatus::NoPendingOta));
2226 }
2227
2228 #[tokio::test]
2229 async fn try_to_success_fail_read_state() {
2230 let mut state_mock = MockStateRepository::<PersistentState>::new();
2231 let system_update = MockSystemUpdate::new();
2232
2233 state_mock.expect_exists().returning(|| true);
2234 state_mock.expect_read().returning(move || {
2235 Err(FileStateError::Write {
2236 path: "/ota.bin".into(),
2237 backtrace: io::Error::new(io::ErrorKind::PermissionDenied, "permission denied"),
2238 })
2239 });
2240
2241 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2242 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2243 let ota_status = ota.check_reboot().await;
2244
2245 assert!(matches!(ota_status, OtaStatus::Failure(OtaError::Io(_), _)));
2246 }
2247
2248 #[tokio::test]
2249 async fn try_to_success_fail_pending_ota() {
2250 let mut state_mock = MockStateRepository::<PersistentState>::new();
2251 let mut system_update = MockSystemUpdate::new();
2252 let uuid = Uuid::new_v4();
2253 let slot = "A";
2254
2255 state_mock.expect_exists().returning(|| true);
2256 state_mock.expect_read().returning(move || {
2257 Ok(PersistentState {
2258 uuid,
2259 slot: slot.to_owned(),
2260 })
2261 });
2262 state_mock.expect_clear().returning(|| Ok(()));
2263
2264 system_update
2265 .expect_boot_slot()
2266 .returning(|| Ok("A".to_owned()));
2267
2268 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2269 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2270 let ota_status = ota.check_reboot().await;
2271
2272 assert!(matches!(
2273 ota_status,
2274 OtaStatus::Failure(OtaError::SystemRollback(_), _)
2275 ));
2276 }
2277
2278 #[tokio::test]
2279 async fn try_to_success() {
2280 let mut state_mock = MockStateRepository::<PersistentState>::new();
2281 let mut system_update = MockSystemUpdate::new();
2282 let uuid = Uuid::new_v4();
2283 let slot = "A";
2284
2285 state_mock.expect_exists().returning(|| true);
2286 state_mock.expect_read().returning(move || {
2287 Ok(PersistentState {
2288 uuid,
2289 slot: slot.to_owned(),
2290 })
2291 });
2292 state_mock.expect_clear().returning(|| Ok(()));
2293
2294 system_update
2295 .expect_boot_slot()
2296 .returning(|| Ok("B".to_owned()));
2297 system_update
2298 .expect_get_primary()
2299 .returning(|| Ok("rootfs.0".to_owned()));
2300 system_update.expect_mark().returning(|_: &str, _: &str| {
2301 Ok((
2302 "rootfs.0".to_owned(),
2303 "marked slot rootfs.0 as good".to_owned(),
2304 ))
2305 });
2306
2307 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2308 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2309 let ota_status = ota.check_reboot().await;
2310
2311 assert!(matches!(ota_status, OtaStatus::Success(_)));
2312 }
2313
2314 #[tokio::test]
2315 async fn do_pending_ota_fail_boot_slot() {
2316 let mut state_mock = MockStateRepository::<PersistentState>::new();
2317 let uuid = Uuid::new_v4();
2318 let slot = "A";
2319
2320 state_mock.expect_read().returning(move || {
2321 Ok(PersistentState {
2322 uuid,
2323 slot: slot.to_owned(),
2324 })
2325 });
2326
2327 let mut system_update = MockSystemUpdate::new();
2328 system_update.expect_boot_slot().returning(|| {
2329 Err(DeviceManagerError::Fatal(
2330 "unable to call boot slot".to_string(),
2331 ))
2332 });
2333
2334 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2335 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2336
2337 let state = ota.state_repository.read().await.unwrap();
2338 let result = ota.do_pending_ota(&state).await;
2339
2340 assert!(result.is_err());
2341 assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2342 }
2343
2344 #[tokio::test]
2345 async fn do_pending_ota_fail_switch_slot() {
2346 let mut state_mock = MockStateRepository::<PersistentState>::new();
2347 let uuid = Uuid::new_v4();
2348 let slot = "A";
2349
2350 state_mock.expect_read().returning(move || {
2351 Ok(PersistentState {
2352 uuid,
2353 slot: slot.to_owned(),
2354 })
2355 });
2356
2357 let mut system_update = MockSystemUpdate::new();
2358 system_update
2359 .expect_boot_slot()
2360 .returning(|| Ok("A".to_owned()));
2361
2362 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2363 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2364
2365 let state = ota.state_repository.read().await.unwrap();
2366 let result = ota.do_pending_ota(&state).await;
2367
2368 assert!(result.is_err());
2369 assert!(matches!(result.err().unwrap(), OtaError::SystemRollback(_),));
2370 }
2371
2372 #[tokio::test]
2373 async fn do_pending_ota_fail_get_primary() {
2374 let mut state_mock = MockStateRepository::<PersistentState>::new();
2375 let uuid = Uuid::new_v4();
2376 let slot = "A";
2377
2378 state_mock.expect_read().returning(move || {
2379 Ok(PersistentState {
2380 uuid,
2381 slot: slot.to_owned(),
2382 })
2383 });
2384
2385 let mut system_update = MockSystemUpdate::new();
2386 system_update
2387 .expect_boot_slot()
2388 .returning(|| Ok("B".to_owned()));
2389 system_update.expect_get_primary().returning(|| {
2390 Err(DeviceManagerError::Fatal(
2391 "unable to call boot slot".to_string(),
2392 ))
2393 });
2394
2395 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2396 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2397 let state = ota.state_repository.read().await.unwrap();
2398 let result = ota.do_pending_ota(&state).await;
2399
2400 assert!(result.is_err());
2401 assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2402 }
2403
2404 #[tokio::test]
2405 async fn do_pending_ota_mark_slot_fail() {
2406 let uuid = Uuid::new_v4();
2407 let slot = "A";
2408
2409 let mut state_mock = MockStateRepository::<PersistentState>::new();
2410 state_mock.expect_exists().returning(|| true);
2411 state_mock.expect_read().returning(move || {
2412 Ok(PersistentState {
2413 uuid,
2414 slot: slot.to_owned(),
2415 })
2416 });
2417
2418 state_mock.expect_clear().returning(|| Ok(()));
2419
2420 let mut system_update = MockSystemUpdate::new();
2421 system_update
2422 .expect_boot_slot()
2423 .returning(|| Ok("B".to_owned()));
2424 system_update
2425 .expect_get_primary()
2426 .returning(|| Ok("rootfs.0".to_owned()));
2427 system_update.expect_mark().returning(|_: &str, _: &str| {
2428 Err(DeviceManagerError::Fatal(
2429 "Unable to call mark function".to_string(),
2430 ))
2431 });
2432
2433 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2434 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2435
2436 let state = ota.state_repository.read().await.unwrap();
2437 let result = ota.do_pending_ota(&state).await;
2438 assert!(result.is_err());
2439 assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2440 }
2441
2442 #[tokio::test]
2443 async fn do_pending_ota_fail_marked_wrong_slot() {
2444 let state_mock = MockStateRepository::<PersistentState>::new();
2445 let mut system_update = MockSystemUpdate::new();
2446 let mut seq = Sequence::new();
2447
2448 system_update
2449 .expect_boot_slot()
2450 .once()
2451 .in_sequence(&mut seq)
2452 .returning(|| Ok("B".to_owned()));
2453
2454 system_update
2455 .expect_get_primary()
2456 .once()
2457 .in_sequence(&mut seq)
2458 .returning(|| Ok("rootfs.0".to_owned()));
2459
2460 system_update.expect_mark().returning(|_: &str, _: &str| {
2461 Ok((
2462 "rootfs.1".to_owned(),
2463 "marked slot rootfs.1 as good".to_owned(),
2464 ))
2465 });
2466
2467 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2468 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2469
2470 let state = PersistentState {
2471 uuid: Uuid::new_v4(),
2472 slot: "A".to_string(),
2473 };
2474
2475 let err = ota.do_pending_ota(&state).await.unwrap_err();
2476
2477 assert_eq!(err, OtaError::Internal("Unable to mark slot"));
2478 }
2479
2480 #[tokio::test]
2481 async fn do_pending_ota_success() {
2482 let uuid = Uuid::new_v4();
2483 let slot = "A";
2484
2485 let mut state_mock = MockStateRepository::<PersistentState>::new();
2486 state_mock.expect_exists().returning(|| true);
2487 state_mock.expect_read().returning(move || {
2488 Ok(PersistentState {
2489 uuid,
2490 slot: slot.to_owned(),
2491 })
2492 });
2493
2494 state_mock.expect_clear().returning(|| Ok(()));
2495
2496 let mut system_update = MockSystemUpdate::new();
2497 system_update
2498 .expect_boot_slot()
2499 .returning(|| Ok("B".to_owned()));
2500 system_update
2501 .expect_get_primary()
2502 .returning(|| Ok("rootfs.0".to_owned()));
2503 system_update.expect_mark().returning(|_: &str, _: &str| {
2504 Ok((
2505 "rootfs.0".to_owned(),
2506 "marked slot rootfs.0 as good".to_owned(),
2507 ))
2508 });
2509
2510 let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2511 let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2512
2513 let state = ota.state_repository.read().await.unwrap();
2514 let result = ota.do_pending_ota(&state).await;
2515 assert!(result.is_ok());
2516 }
2517
2518 #[tokio::test]
2519 async fn wget_failed() {
2520 let (_dir, t_dir) = temp_dir("wget_failed");
2521
2522 let server = MockServer::start_async().await;
2523 let hello_mock = server
2524 .mock_async(|when, then| {
2525 when.method(GET);
2526 then.status(500);
2527 })
2528 .await;
2529
2530 let ota_file = t_dir.join("ota,bin");
2531 let (ota_status_publisher, _) = mpsc::channel(1);
2532
2533 let url = server.url("/ota.bin").to_string();
2534 let req = OtaId {
2535 uuid: Uuid::new_v4(),
2536 url,
2537 };
2538
2539 let client = create_http_client(&req).unwrap();
2540 let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2541
2542 hello_mock.assert_async().await;
2543 assert!(result.is_err());
2544 assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2545 }
2546
2547 #[tokio::test]
2548 async fn wget_failed_wrong_content_length() {
2549 let (_dir, t_dir) = temp_dir("wget_failed_wrong_content_length");
2550
2551 let binary_content = b"\x80\x02\x03";
2552
2553 let server = MockServer::start_async().await;
2554 let ota_url = server.url("/ota.bin");
2555 let mock_ota_file_request = server
2556 .mock_async(|when, then| {
2557 when.method(GET).path("/ota.bin");
2558 then.status(200)
2559 .header("content-Length", 0.to_string())
2560 .body(binary_content);
2561 })
2562 .await;
2563
2564 let ota_file = t_dir.join("ota.bin");
2565 let req = OtaId {
2566 uuid: Uuid::new_v4(),
2567 url: ota_url,
2568 };
2569
2570 let (ota_status_publisher, _) = mpsc::channel(1);
2571 let client = create_http_client(&req).unwrap();
2572
2573 let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2574
2575 mock_ota_file_request.assert_async().await;
2576 assert!(result.is_err());
2577 assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2578 }
2579
2580 #[tokio::test]
2581 async fn wget_with_empty_payload() {
2582 let (_dir, t_dir) = temp_dir("wget_with_empty_payload");
2583
2584 let server = MockServer::start_async().await;
2585 let mock_ota_file_request = server
2586 .mock_async(|when, then| {
2587 when.method(GET).path("/ota.bin");
2588 then.status(200).body(b"");
2589 })
2590 .await;
2591
2592 let ota_file = t_dir.join("ota.bin");
2593 let (ota_status_publisher, _) = mpsc::channel(1);
2594
2595 let url = server.url("/ota.bin");
2596 let req = OtaId {
2597 url,
2598 uuid: Uuid::new_v4(),
2599 };
2600
2601 let client = create_http_client(&req).unwrap();
2602
2603 let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2604
2605 mock_ota_file_request.assert_async().await;
2606 assert!(result.is_err());
2607 assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2608 }
2609
2610 #[tokio::test]
2611 async fn wget_success() {
2612 let (_dir, t_dir) = temp_dir("wget_success");
2613
2614 let binary_content = b"\x80\x02\x03";
2615 let binary_size = binary_content.len();
2616
2617 let server = MockServer::start_async().await;
2618 let ota_url = server.url("/ota.bin");
2619 let mock_ota_file_request = server
2620 .mock_async(|when, then| {
2621 when.method(GET).path("/ota.bin");
2622 then.status(200)
2623 .header("content-Length", binary_size.to_string())
2624 .body(binary_content);
2625 })
2626 .await;
2627
2628 let ota_file = t_dir.join("ota.bin");
2629
2630 let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2631
2632 let req = OtaId {
2633 uuid: Uuid::new_v4(),
2634 url: ota_url,
2635 };
2636
2637 let client = create_http_client(&req).unwrap();
2638 let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2639 mock_ota_file_request.assert_async().await;
2640
2641 let receive_result = ota_status_receiver.try_recv();
2642 assert!(receive_result.is_ok());
2643 let ota_status_received = receive_result.unwrap();
2644 assert!(matches!(
2645 ota_status_received,
2646 OtaStatus::Downloading(_, 100)
2647 ));
2648
2649 let receive_result = ota_status_receiver.try_recv();
2650 assert!(receive_result.is_err());
2651
2652 assert!(result.is_ok());
2653 }
2654}