Skip to main content

edgehog_device_runtime/ota/
mod.rs

1// This file is part of Edgehog.
2//
3// Copyright 2022 - 2025 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19use std::fmt::Debug;
20use std::time::Duration;
21use std::{
22    fmt::Display,
23    path::{Path, PathBuf},
24};
25
26use async_trait::async_trait;
27use event::OtaRequest;
28use futures::stream::BoxStream;
29use futures::TryStreamExt;
30#[cfg(all(feature = "zbus", target_os = "linux"))]
31use ota_handler::{OtaEvent, OtaInProgress, OtaMessage, OtaStatusMessage};
32use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
33use serde::{Deserialize, Serialize};
34use tokio::sync::mpsc;
35use tokio_util::sync::CancellationToken;
36use tracing::{debug, error, info, warn};
37use uuid::Uuid;
38
39#[cfg(test)]
40use mockall::automock;
41
42use crate::controller::actor::Actor;
43use crate::error::DeviceManagerError;
44use crate::ota::rauc::BundleInfo;
45use crate::repository::StateRepository;
46use crate::DeviceManagerOptions;
47
48use self::config::{OtaConfig, Reboot};
49
50pub mod config;
51pub mod event;
52#[cfg(all(feature = "zbus", target_os = "linux"))]
53pub(crate) mod ota_handler;
54#[cfg(test)]
55mod ota_handler_test;
56pub(crate) mod rauc;
57
58/// Provides deploying progress information.
59#[derive(Debug, Clone, PartialEq, Eq, Default)]
60pub struct DeployProgress {
61    percentage: i32,
62    message: String,
63}
64
65impl Display for DeployProgress {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "progress {}%: {}", self.percentage, self.message)
68    }
69}
70
71/// Provides the status of the deployment.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum DeployStatus {
74    Progress(DeployProgress),
75    Completed { signal: i32 },
76}
77
78/// Stream of the [`DeployStatus`] events
79pub type ProgressStream = BoxStream<'static, Result<DeployStatus, DeviceManagerError>>;
80
81/// A **trait** required for all SystemUpdate handlers that want to update a system.
82#[cfg_attr(test, automock)]
83#[async_trait]
84pub trait SystemUpdate: Send + Sync {
85    async fn install_bundle(&self, source: &str) -> Result<(), DeviceManagerError>;
86    async fn last_error(&self) -> Result<String, DeviceManagerError>;
87    async fn info(&self, bundle: &str) -> Result<BundleInfo, DeviceManagerError>;
88    async fn operation(&self) -> Result<String, DeviceManagerError>;
89    async fn compatible(&self) -> Result<String, DeviceManagerError>;
90    async fn boot_slot(&self) -> Result<String, DeviceManagerError>;
91    async fn receive_completed(&self) -> Result<ProgressStream, DeviceManagerError>;
92    async fn get_primary(&self) -> Result<String, DeviceManagerError>;
93    async fn mark(
94        &self,
95        state: &str,
96        slot_identifier: &str,
97    ) -> Result<(String, String), DeviceManagerError>;
98}
99
100/// Edgehog OTA error.
101///
102/// Possible errors returned by OTA.
103#[derive(thiserror::Error, Debug, Clone, PartialEq)]
104pub enum OtaError {
105    /// Invalid OTA update request received
106    #[error("InvalidRequestError: {0}")]
107    Request(&'static str),
108    #[error("UpdateAlreadyInProgress")]
109    /// Attempted to perform OTA operation while there is another one already active*/
110    UpdateAlreadyInProgress,
111    #[error("NetworkError: {0}")]
112    /// A generic network error occurred
113    Network(String),
114    #[error("IOError: {0}")]
115    /// A filesystem error occurred
116    Io(String),
117    #[error("InternalError: {0}")]
118    /// An Internal error occurred during OTA procedure
119    Internal(&'static str),
120    #[error("InvalidBaseImage: {0}")]
121    /// Invalid OTA image received
122    InvalidBaseImage(String),
123    #[error("SystemRollback: {0}")]
124    /// The OTA procedure boot on the wrong partition
125    SystemRollback(&'static str),
126    /// OTA update aborted by Edgehog half way during the procedure
127    #[error("Canceled")]
128    Canceled,
129    /// Attempted to start OTA operation while ota status is different from Idle
130    #[error("Inconsistent ota state")]
131    InconsistentState,
132}
133
134impl Default for DeployStatus {
135    fn default() -> Self {
136        DeployStatus::Progress(DeployProgress::default())
137    }
138}
139
140const DOWNLOAD_PERC_ROUNDING_STEP: f64 = 10.0;
141const DEPLOY_PERC_ROUNDING_STEP: i32 = 10;
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct PersistentState {
145    pub uuid: Uuid,
146    pub slot: String,
147}
148
149#[derive(Clone, PartialEq, Debug)]
150pub enum OtaStatus {
151    /// The device is waiting an OTA event
152    Idle,
153    /// The device initializing the OTA procedure
154    Init(OtaId),
155    /// The device didn't has an OTA procedure pending
156    NoPendingOta,
157    /// The device received a valid OTA Request
158    Acknowledged(OtaId),
159    /// The device is in downloading process, the i32 identify the progress percentage
160    Downloading(OtaId, i32),
161    /// The device is in the process of deploying the update
162    Deploying(OtaId, DeployProgress),
163    /// The device deployed the update
164    Deployed(OtaId),
165    /// The device is in the process of rebooting
166    Rebooting(OtaId),
167    /// The device was rebooted
168    Rebooted,
169    /// The update procedure succeeded.
170    Success(OtaId),
171    /// An error happened during the update procedure.
172    Error(OtaError, OtaId),
173    /// The update procedure failed.
174    Failure(OtaError, Option<OtaId>),
175}
176
177impl OtaStatus {
178    // Checks if the OTA is cancellable
179    fn is_cancellable(&self) -> bool {
180        match self {
181            OtaStatus::Init(_) | OtaStatus::Acknowledged(_) | OtaStatus::Downloading(_, _) => true,
182            OtaStatus::Idle
183            | OtaStatus::NoPendingOta
184            | OtaStatus::Deploying(_, _)
185            | OtaStatus::Deployed(_)
186            | OtaStatus::Rebooting(_)
187            | OtaStatus::Rebooted
188            | OtaStatus::Success(_)
189            | OtaStatus::Error(_, _)
190            | OtaStatus::Failure(_, _) => false,
191        }
192    }
193
194    fn ota_id(&self) -> Option<OtaId> {
195        match self {
196            OtaStatus::Idle | OtaStatus::NoPendingOta | OtaStatus::Rebooted => None,
197            OtaStatus::Init(id)
198            | OtaStatus::Acknowledged(id)
199            | OtaStatus::Downloading(id, _)
200            | OtaStatus::Deploying(id, _)
201            | OtaStatus::Deployed(id)
202            | OtaStatus::Rebooting(id)
203            | OtaStatus::Success(id)
204            | OtaStatus::Error(_, id) => Some(id.clone()),
205            OtaStatus::Failure(_, id) => id.clone(),
206        }
207    }
208
209    /// Converts the status into an event
210    fn as_event(&self) -> Option<OtaEvent> {
211        let mut ota_event = OtaEvent {
212            requestUUID: "".to_string(),
213            status: "".to_string(),
214            statusProgress: 0,
215            statusCode: "".to_string(),
216            message: "".to_string(),
217        };
218
219        match self {
220            OtaStatus::Acknowledged(ota_request) => {
221                ota_event.requestUUID = ota_request.uuid.to_string();
222                ota_event.status = "Acknowledged".to_string();
223            }
224            OtaStatus::Downloading(ota_request, progress) => {
225                ota_event.requestUUID = ota_request.uuid.to_string();
226                ota_event.statusProgress = *progress;
227                ota_event.status = "Downloading".to_string();
228            }
229            OtaStatus::Deploying(ota_request, deploying_progress) => {
230                ota_event.requestUUID = ota_request.uuid.to_string();
231                ota_event.status = "Deploying".to_string();
232                ota_event.statusProgress = deploying_progress.percentage;
233                ota_event.message = deploying_progress.clone().message;
234            }
235            OtaStatus::Deployed(ota_request) => {
236                ota_event.requestUUID = ota_request.uuid.to_string();
237                ota_event.status = "Deployed".to_string();
238            }
239            OtaStatus::Rebooting(ota_request) => {
240                ota_event.requestUUID = ota_request.uuid.to_string();
241                ota_event.status = "Rebooting".to_string()
242            }
243            OtaStatus::Success(ota_request) => {
244                ota_event.requestUUID = ota_request.uuid.to_string();
245                ota_event.status = "Success".to_string();
246            }
247            OtaStatus::Failure(ota_error, Some(ota_request)) => {
248                ota_event.requestUUID = ota_request.uuid.to_string();
249                ota_event.status = "Failure".to_string();
250                let ota_status_message = OtaStatusMessage::from(ota_error);
251                ota_event.statusCode = ota_status_message.status_code;
252                ota_event.message = ota_status_message.message;
253            }
254            OtaStatus::Error(ota_error, ota_request) => {
255                ota_event.status = "Error".to_string();
256                ota_event.requestUUID = ota_request.uuid.to_string();
257                let ota_status_message = OtaStatusMessage::from(ota_error);
258                ota_event.statusCode = ota_status_message.status_code;
259                ota_event.message = ota_status_message.message;
260            }
261            OtaStatus::Idle
262            | OtaStatus::Init(_)
263            | OtaStatus::NoPendingOta
264            | OtaStatus::Rebooted
265            | OtaStatus::Failure(_, None) => return None,
266        }
267
268        if ota_event.requestUUID.is_empty() {
269            error!("Unable to convert ota_event: request_uuid is empty");
270            None
271        } else {
272            Some(ota_event)
273        }
274    }
275}
276
277impl Display for OtaStatus {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        match self {
280            OtaStatus::Idle => write!(f, "Idle"),
281            OtaStatus::Init(req) => write!(f, "Init {req}"),
282            OtaStatus::NoPendingOta => write!(f, "NoPendingOta"),
283            OtaStatus::Acknowledged(req) => write!(f, "Acknowledged {req}"),
284            OtaStatus::Downloading(req, progress) => {
285                write!(f, "Downloading {req} progress {progress}")
286            }
287            OtaStatus::Deploying(req, progress) => write!(f, "Deploying {req} {progress}"),
288            OtaStatus::Deployed(req) => write!(f, "Deployed {req}"),
289            OtaStatus::Rebooting(req) => write!(f, "Rebooting {req}"),
290            OtaStatus::Rebooted => write!(f, "Rebooted"),
291            OtaStatus::Success(req) => write!(f, "Success {req}"),
292            OtaStatus::Error(err, req) => write!(f, "Error {req}: {err}"),
293            OtaStatus::Failure(err, req) => {
294                write!(f, "Failure")?;
295
296                if let Some(req) = req {
297                    write!(f, " {req}")?;
298                }
299
300                write!(f, ": {err}")
301            }
302        }
303    }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct OtaId {
308    pub uuid: Uuid,
309    pub url: String,
310}
311
312impl From<OtaRequest> for OtaId {
313    fn from(value: OtaRequest) -> Self {
314        Self {
315            uuid: value.uuid.0,
316            url: value.url,
317        }
318    }
319}
320
321impl Display for OtaId {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        write!(f, "{}", self.uuid)
324    }
325}
326
327/// Provides OTA resource accessibility only by talking with it.
328pub struct Ota<T, U>
329where
330    T: SystemUpdate,
331    U: StateRepository<PersistentState>,
332{
333    pub system_update: T,
334    pub config: OtaConfig,
335    pub state_repository: U,
336    pub download_file_path: PathBuf,
337    pub ota_status: OtaStatus,
338    pub flag: OtaInProgress,
339    pub publisher_tx: mpsc::Sender<OtaStatus>,
340}
341
342#[async_trait]
343impl<T, U> Actor for Ota<T, U>
344where
345    T: SystemUpdate,
346    U: StateRepository<PersistentState>,
347{
348    type Msg = OtaMessage;
349
350    fn task() -> &'static str {
351        "ota"
352    }
353
354    async fn init(&mut self) -> stable_eyre::Result<()> {
355        if self.state_repository.exists().await {
356            self.ota_status = OtaStatus::Rebooted;
357        }
358
359        // Not cancellable after a reboot, this is just a placeholder
360        let cancel = CancellationToken::new();
361
362        // Always run the publish and cleanup
363        self.handle_ota_update(cancel).await;
364
365        Ok(())
366    }
367
368    async fn handle(&mut self, msg: Self::Msg) -> stable_eyre::Result<()> {
369        if self.ota_status != OtaStatus::Idle {
370            error!("ota request already in progress");
371
372            return Err(OtaError::InconsistentState.into());
373        }
374
375        self.ota_status = OtaStatus::Init(msg.ota_id);
376        self.handle_ota_update(msg.cancel).await;
377
378        Ok(())
379    }
380}
381
382impl<T, U> Ota<T, U>
383where
384    T: SystemUpdate,
385    U: StateRepository<PersistentState>,
386{
387    pub fn new(
388        opts: &DeviceManagerOptions,
389        tx_publisher: mpsc::Sender<OtaStatus>,
390        flag: OtaInProgress,
391        system_update: T,
392        state_repository: U,
393    ) -> Result<Self, DeviceManagerError> {
394        Ok(Ota {
395            system_update,
396            config: opts.ota,
397            state_repository,
398            download_file_path: opts.download_directory.clone(),
399            ota_status: OtaStatus::Idle,
400            flag,
401            publisher_tx: tx_publisher,
402        })
403    }
404
405    pub async fn last_error(&self) -> Result<String, DeviceManagerError> {
406        self.system_update.last_error().await
407    }
408
409    /// Path or URL to the bundle that should be installed
410    fn get_install_uri(&self, req: &OtaId) -> String {
411        if self.config.streaming {
412            req.url.clone()
413        } else {
414            self.get_update_file_path().to_string_lossy().to_string()
415        }
416    }
417
418    /// Returns the path of the downloaded image.
419    fn get_update_file_path(&self) -> PathBuf {
420        self.download_file_path.join("update.bin")
421    }
422
423    /// Called after the ota request has been Acknowledged.
424    fn start_update(&self, ota_request: OtaId) -> OtaStatus {
425        if self.config.streaming {
426            debug!("streaming image directly to disk");
427
428            OtaStatus::Deploying(ota_request, DeployProgress::default())
429        } else {
430            debug!("downloading image file");
431
432            OtaStatus::Downloading(ota_request, 0)
433        }
434    }
435
436    // Retries the download 5 times
437    async fn retry_download(
438        &self,
439        req: &OtaId,
440        ota_path: &Path,
441        ota_file: &str,
442    ) -> Result<String, OtaError> {
443        let client = create_http_client(req)?;
444
445        for i in 1..=5 {
446            debug!(time = i, "downloading ota image");
447
448            let res = wget(&client, req, ota_path, &self.publisher_tx).await;
449
450            match res {
451                Ok(()) => return Ok(ota_file.to_string()),
452                Err(err) => {
453                    error!(error = format!("{err:#}"), "couldn't downloading the image");
454
455                    if self
456                        .publisher_tx
457                        .send(OtaStatus::Error(err, req.clone()))
458                        .await
459                        .is_err()
460                    {
461                        warn!("ota_status_publisher dropped before send error_status")
462                    }
463                }
464            }
465
466            let wait = u64::pow(2, i);
467
468            error!("Next attempt in {wait}s",);
469
470            tokio::time::sleep(tokio::time::Duration::from_secs(wait)).await;
471        }
472
473        Err(OtaError::Internal(
474            "Too many attempts to download the OTA file",
475        ))
476    }
477
478    /// Handle the transition to the deploying status.
479    pub async fn download(&self, ota_request: OtaId) -> OtaStatus {
480        let download_file_path = self.get_update_file_path();
481
482        let Some(download_file_str) = download_file_path.to_str() else {
483            return OtaStatus::Failure(
484                OtaError::Io("Wrong download file path".to_string()),
485                Some(ota_request),
486            );
487        };
488
489        let download_res = self
490            .retry_download(&ota_request, &download_file_path, download_file_str)
491            .await;
492
493        let ota_file = match download_res {
494            Ok(ota_file) => ota_file,
495            Err(err) => {
496                return OtaStatus::Failure(err, Some(ota_request));
497            }
498        };
499
500        let bundle_info = match self.system_update.info(&ota_file).await {
501            Ok(info) => info,
502            Err(err) => {
503                let message = format!(
504                    "Unable to get info from ota_file in {}",
505                    download_file_path.display()
506                );
507                error!("{message} : {err}");
508
509                return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
510            }
511        };
512
513        debug!("bundle info: {:?}", bundle_info);
514
515        let system_image_info = match self.system_update.compatible().await {
516            Ok(info) => info,
517            Err(err) => {
518                let message = "Unable to get info from current deployed image".to_string();
519
520                error!("{message} : {err}");
521
522                return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
523            }
524        };
525
526        if bundle_info.compatible != system_image_info {
527            let message = format!(
528                "bundle {} is not compatible with system {system_image_info}",
529                bundle_info.compatible
530            );
531            error!("{message}");
532            return OtaStatus::Failure(
533                OtaError::InvalidBaseImage(message),
534                Some(ota_request.clone()),
535            );
536        }
537
538        OtaStatus::Deploying(ota_request.clone(), DeployProgress::default())
539    }
540
541    /// Handle the transition to the deployed status.
542    pub async fn deploy(&self, ota_request: OtaId) -> OtaStatus {
543        let booted_slot = match self.system_update.boot_slot().await {
544            Ok(slot) => slot,
545            Err(err) => {
546                let message = "Unable to identify the booted slot";
547
548                error!("{message}: {err}");
549
550                return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request.clone()));
551            }
552        };
553
554        let state = PersistentState {
555            uuid: ota_request.uuid,
556            slot: booted_slot,
557        };
558
559        if let Err(error) = self.state_repository.write(&state).await {
560            let message = "Unable to persist ota state".to_string();
561            error!("{message} : {error}");
562            return OtaStatus::Failure(OtaError::Io(message), Some(ota_request));
563        };
564
565        if let Err(error) = self
566            .system_update
567            .install_bundle(&self.get_install_uri(&ota_request))
568            .await
569        {
570            let message = "Unable to install ota image".to_string();
571            error!("{message} : {error}");
572            return OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request));
573        }
574
575        if let Err(error) = self.system_update.operation().await {
576            let message = "Unable to get status of ota operation";
577            error!("{message} : {error}");
578            return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
579        }
580
581        let stream = self.system_update.receive_completed().await;
582        let stream = match stream {
583            Ok(stream) => stream,
584            Err(err) => {
585                let message = "Unable to get status of ota operation";
586                error!("{message} : {err}");
587                return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
588            }
589        };
590
591        let signal = stream
592            .try_fold(
593                DeployStatus::Progress(DeployProgress::default()),
594                |prev_status, status| {
595                    let ota_request_cl = ota_request.clone();
596                    let status_cl = status.clone();
597
598                    async move {
599                        let progress = match status {
600                            DeployStatus::Completed { .. } => {
601                                return Ok(status);
602                            }
603                            DeployStatus::Progress(progress) => progress,
604                        };
605
606                        let last_progress_sent = match &prev_status {
607                            DeployStatus::Progress(last_progress) => last_progress.percentage,
608                            _ => progress.percentage,
609                        };
610
611                        if (progress.percentage - last_progress_sent) >= DEPLOY_PERC_ROUNDING_STEP {
612                            let res = self
613                                .publisher_tx
614                                .send(OtaStatus::Deploying(ota_request_cl, progress))
615                                .await;
616
617                            if let Err(err) = res {
618                                error!("couldn't send progress update: {err}")
619                            }
620                            return Ok(status_cl);
621                        }
622                        Ok(prev_status)
623                    }
624                },
625            )
626            .await;
627
628        let signal = match signal {
629            Ok(DeployStatus::Completed { signal }) => signal,
630            Ok(DeployStatus::Progress(_)) => {
631                let message = "No progress completion event received";
632                error!("{message}");
633                return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
634            }
635            Err(err) => {
636                let message = "Unable to receive the install completed event";
637                error!("{message} : {err}");
638                return OtaStatus::Failure(OtaError::Internal(message), Some(ota_request));
639            }
640        };
641
642        info!("Completed signal! {:?}", signal);
643
644        match signal {
645            0 => {
646                info!("Update successful");
647
648                OtaStatus::Deployed(ota_request)
649            }
650            _ => {
651                let message = format!("Update failed with signal {signal}");
652
653                match self.last_error().await {
654                    Ok(err) => {
655                        error!("{message}: {err}");
656                    }
657                    Err(err) => {
658                        error!("{message}: {}", stable_eyre::Report::new(err));
659                    }
660                }
661
662                OtaStatus::Failure(OtaError::InvalidBaseImage(message), Some(ota_request))
663            }
664        }
665    }
666
667    /// Handle the transition to rebooting status.
668    pub async fn reboot(&self, ota_request: OtaId) -> OtaStatus {
669        if self
670            .publisher_tx
671            .send(OtaStatus::Rebooting(ota_request.clone()))
672            .await
673            .is_err()
674        {
675            warn!("ota_status_publisher dropped before sending rebooting_status")
676        };
677
678        info!("Rebooting the device");
679
680        if cfg!(test) {
681            return OtaStatus::Rebooted;
682        }
683
684        match self.config.reboot {
685            Reboot::Default => {
686                if let Err(error) = crate::power_management::reboot().await {
687                    let message = "Unable to run reboot command";
688
689                    error!("{message} : {error}");
690
691                    return OtaStatus::Failure(
692                        OtaError::Internal(message),
693                        Some(ota_request.clone()),
694                    );
695                }
696            }
697            Reboot::External => {
698                info!("waiting for next reboot");
699            }
700        }
701
702        OtaStatus::Rebooting(ota_request)
703    }
704
705    /// Handle the rebooting status
706    ///
707    /// This will loop till the reboot is reached
708    async fn wait_reboot(&self, ota_request: OtaId) -> OtaStatus {
709        tokio::time::sleep(Duration::from_secs(30)).await;
710
711        OtaStatus::Rebooting(ota_request)
712    }
713
714    /// Handle the transition to success status.
715    pub async fn check_reboot(&self) -> OtaStatus {
716        if !self.state_repository.exists().await {
717            return OtaStatus::NoPendingOta;
718        }
719
720        info!("Found pending update");
721
722        let ota_state = match self.state_repository.read().await {
723            Ok(state) => state,
724            Err(err) => {
725                let message = "Unable to read pending ota state".to_string();
726                error!("{message} : {}", err);
727                return OtaStatus::Failure(OtaError::Io(message), None);
728            }
729        };
730
731        let request_uuid = ota_state.uuid;
732        let ota_request = OtaId {
733            uuid: request_uuid,
734            url: "".to_string(),
735        };
736
737        if let Err(error) = self.do_pending_ota(&ota_state).await {
738            return OtaStatus::Failure(error, Some(ota_request));
739        }
740
741        OtaStatus::Success(ota_request)
742    }
743
744    pub async fn do_pending_ota(&self, state: &PersistentState) -> Result<(), OtaError> {
745        const GOOD_STATE: &str = "good";
746
747        let booted_slot = self.system_update.boot_slot().await.map_err(|error| {
748            let message = "Unable to identify the booted slot";
749            error!("{message}: {error}");
750            OtaError::Internal(message)
751        })?;
752
753        if state.slot == booted_slot {
754            let message = "Unable to switch slot";
755            error!("{message}");
756            return Err(OtaError::SystemRollback(message));
757        }
758
759        let primary_slot = self.system_update.get_primary().await.map_err(|error| {
760            let message = "Unable to get the current primary slot";
761            error!("{message}: {error}");
762            OtaError::Internal(message)
763        })?;
764
765        let (marked_slot, _) = self
766            .system_update
767            .mark(GOOD_STATE, &primary_slot)
768            .await
769            .map_err(|error| {
770                let message = "Unable to run marking slot operation";
771                error!("{message}: {error}");
772                OtaError::Internal(message)
773            })?;
774
775        if primary_slot != marked_slot {
776            let message = "Unable to mark slot";
777            Err(OtaError::Internal(message))
778        } else {
779            Ok(())
780        }
781    }
782
783    pub async fn next(&mut self) {
784        self.ota_status = match self.ota_status.clone() {
785            OtaStatus::Init(req) => OtaStatus::Acknowledged(req),
786            OtaStatus::Acknowledged(ota_request) => self.start_update(ota_request),
787            OtaStatus::Downloading(ota_request, _) => self.download(ota_request).await,
788            OtaStatus::Deploying(ota_request, _) => self.deploy(ota_request).await,
789            OtaStatus::Deployed(ota_request) => self.reboot(ota_request).await,
790            OtaStatus::Rebooted => self.check_reboot().await,
791            OtaStatus::Rebooting(ota_request) => self.wait_reboot(ota_request).await,
792            OtaStatus::Error(ota_error, ota_request) => {
793                OtaStatus::Failure(ota_error, Some(ota_request))
794            }
795            OtaStatus::Idle
796            | OtaStatus::NoPendingOta
797            | OtaStatus::Success(_)
798            | OtaStatus::Failure(_, _) => OtaStatus::Idle,
799        };
800    }
801
802    pub async fn handle_ota_update(&mut self, cancel: CancellationToken) {
803        let mut check_cancel = true;
804
805        while self.is_ota_in_progress() {
806            info!(status = %self.ota_status, "ota progress");
807
808            self.publish_status(self.ota_status.clone()).await;
809
810            if self.ota_status.is_cancellable() {
811                if cancel.run_until_cancelled(self.next()).await.is_none() {
812                    check_cancel = false;
813                    info!("OTA update cancelled");
814
815                    self.ota_status =
816                        OtaStatus::Failure(OtaError::Canceled, self.ota_status.ota_id())
817                }
818
819                continue;
820            } else if check_cancel && cancel.is_cancelled() {
821                // Not cancellable
822                check_cancel = false;
823
824                self.publish_status(OtaStatus::Failure(OtaError::Canceled, None))
825                    .await;
826            }
827
828            self.next().await;
829        }
830
831        // Publish the final status
832        self.publish_status(self.ota_status.clone()).await;
833
834        self.clear().await;
835    }
836
837    async fn clear(&mut self) {
838        if self.state_repository.exists().await {
839            let _ = self.state_repository.clear().await.map_err(|error| {
840                error!("Error clearing the state repository: {:?}", error);
841            });
842        }
843
844        let path = self.get_update_file_path();
845        if path.exists() {
846            if let Err(e) = tokio::fs::remove_file(&path).await {
847                error!("Unable to remove {}: {}", path.display(), e);
848            }
849        }
850
851        self.ota_status = OtaStatus::Idle;
852    }
853
854    async fn publish_status(&self, status: OtaStatus) {
855        if self.publisher_tx.send(status).await.is_err() {
856            error!(
857                "ota publisher disconnected before sending status {}",
858                self.ota_status
859            )
860        }
861    }
862
863    /// Check if the OTA status is in progress
864    fn is_ota_in_progress(&self) -> bool {
865        // The OTA is always considered in progress until the Idle state
866        let in_progress = self.ota_status != OtaStatus::Idle;
867
868        self.flag.set_in_progress(in_progress);
869
870        in_progress
871    }
872}
873
874/// Create the http client
875fn create_http_client(req: &OtaId) -> Result<reqwest::Client, OtaError> {
876    let mut headers = HeaderMap::new();
877    let name = HeaderName::from_static("x-edgehog-ota-id");
878    match HeaderValue::try_from(req.uuid.to_string()) {
879        Ok(value) => {
880            headers.append(name, value);
881        }
882        Err(error) => {
883            error!(%error, "couldn't set ota-id HTTP header value")
884        }
885    };
886    let tls = edgehog_tls::config().map_err(|error| {
887        error!(%error, "couldn't setup TLS configuration");
888
889        OtaError::Internal("couldn setup TLS configuration")
890    })?;
891    let client = reqwest::Client::builder()
892        .use_preconfigured_tls(tls)
893        .user_agent(concat!(
894            env!("CARGO_PKG_NAME"),
895            "/",
896            env!("CARGO_PKG_VERSION")
897        ))
898        .default_headers(headers)
899        .build()
900        .map_err(|error| {
901            error!(%error,"couldn't build HTTP client");
902
903            OtaError::Internal("couldn't build HTTP client")
904        })?;
905    Ok(client)
906}
907
908pub async fn wget(
909    client: &reqwest::Client,
910    req: &OtaId,
911    file_path: &Path,
912    ota_status_publisher: &mpsc::Sender<OtaStatus>,
913) -> Result<(), OtaError> {
914    use tokio_stream::StreamExt;
915
916    if file_path.exists() {
917        tokio::fs::remove_file(file_path).await.map_err(|err| {
918            error!(
919                "failed to remove old file '{}': {}",
920                file_path.display(),
921                err
922            );
923
924            OtaError::Internal("failed to remove old file")
925        })?;
926    }
927
928    info!(url = req.url, "Downloading");
929
930    let result_response = client.get(&req.url).send().await;
931
932    match result_response {
933        Err(err) => {
934            let message = "Error downloading update".to_string();
935            error!("{message}: {err:?}");
936            Err(OtaError::Network(message))
937        }
938        Ok(response) => {
939            debug!("Writing {}", file_path.display());
940
941            let total_size = response
942                .content_length()
943                .and_then(|size| if size == 0 { None } else { Some(size) })
944                .ok_or_else(|| {
945                    OtaError::Network(format!("Unable to get content length from: {}", req.url))
946                })? as f64;
947
948            let mut downloaded: f64 = 0.0;
949            let mut last_percentage_sent = 0.0;
950            let mut stream = response.bytes_stream();
951
952            let mut os_file = tokio::fs::File::create(file_path).await.map_err(|error| {
953                let message = format!("Unable to create ota_file in {file_path:?}");
954                error!("{message} : {error:?}");
955                OtaError::Io(message)
956            })?;
957
958            while let Some(chunk_result) = stream.next().await {
959                let chunk = chunk_result.map_err(|error| {
960                    let message = "Unable to parse response".to_string();
961                    error!("{message} : {error:?}");
962                    OtaError::Network(message)
963                })?;
964
965                if chunk.is_empty() {
966                    continue;
967                }
968
969                let mut content = std::io::Cursor::new(&chunk);
970
971                tokio::io::copy(&mut content, &mut os_file)
972                    .await
973                    .map_err(|error| {
974                        let message = format!("Unable to write chunk to ota_file in {file_path:?}");
975                        error!("{message} : {error:?}");
976                        OtaError::Io(message)
977                    })?;
978
979                downloaded += chunk.len() as f64;
980                let progress_percentage = (downloaded / total_size) * 100.0;
981                if progress_percentage == 100.0
982                    || (progress_percentage - last_percentage_sent) >= DOWNLOAD_PERC_ROUNDING_STEP
983                {
984                    last_percentage_sent = progress_percentage;
985                    if ota_status_publisher
986                        .send(OtaStatus::Downloading(
987                            req.clone(),
988                            progress_percentage as i32,
989                        ))
990                        .await
991                        .is_err()
992                    {
993                        warn!("ota_status_publisher dropped before send downloading_status")
994                    }
995                }
996            }
997
998            if total_size == downloaded {
999                Ok(())
1000            } else {
1001                let message = "Unable to download file".to_string();
1002                error!("{message}");
1003                Err(OtaError::Network(message))
1004            }
1005        }
1006    }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use std::io;
1012    use std::path::PathBuf;
1013    use std::time::Duration;
1014
1015    use futures::StreamExt;
1016    use httpmock::prelude::*;
1017    use mockall::{predicate, Sequence};
1018    use pretty_assertions::assert_eq;
1019    use tempdir::TempDir;
1020    use tokio::sync::mpsc;
1021    use tokio_util::sync::CancellationToken;
1022    use uuid::Uuid;
1023
1024    use crate::error::DeviceManagerError;
1025    use crate::ota::ota_handler_test::deploy_status_stream;
1026    use crate::ota::rauc::BundleInfo;
1027    use crate::ota::{create_http_client, wget, Ota, OtaId, OtaStatus, PersistentState};
1028    use crate::ota::{DeployProgress, DeployStatus, MockSystemUpdate, OtaError, SystemUpdate};
1029    use crate::repository::file_state_repository::FileStateError;
1030    use crate::repository::{MockStateRepository, StateRepository};
1031
1032    use super::config::OtaConfig;
1033    use super::ota_handler::OtaInProgress;
1034
1035    /// Creates a temporary directory that will be deleted when the returned TempDir is dropped.
1036    fn temp_dir(prefix: &str) -> (TempDir, PathBuf) {
1037        let dir = TempDir::new(&format!("edgehog-{prefix}")).unwrap();
1038        let path = dir.path().to_owned();
1039
1040        (dir, path)
1041    }
1042
1043    impl<T, U> Ota<T, U>
1044    where
1045        T: SystemUpdate,
1046        U: StateRepository<PersistentState>,
1047    {
1048        /// Create the mock with a non existent download path
1049        pub fn mock_new(
1050            system_update: T,
1051            state_repository: U,
1052            publisher_tx: mpsc::Sender<OtaStatus>,
1053        ) -> Self {
1054            Ota {
1055                system_update,
1056                state_repository,
1057                download_file_path: PathBuf::from("/dev/null"),
1058                ota_status: OtaStatus::Idle,
1059                publisher_tx,
1060                flag: OtaInProgress::default(),
1061                config: OtaConfig::default(),
1062            }
1063        }
1064
1065        /// Create the mock with a usable download path
1066        pub fn mock_new_with_path(
1067            system_update: T,
1068            state_repository: U,
1069            prefix: &str,
1070            publisher_tx: mpsc::Sender<OtaStatus>,
1071        ) -> (Self, TempDir) {
1072            let (dir, path) = temp_dir(prefix);
1073            let mock = Ota {
1074                system_update,
1075                state_repository,
1076                download_file_path: path,
1077                ota_status: OtaStatus::Idle,
1078                publisher_tx,
1079                flag: OtaInProgress::default(),
1080                config: OtaConfig::default(),
1081            };
1082
1083            (mock, dir)
1084        }
1085    }
1086
1087    #[tokio::test]
1088    async fn last_error_ok() {
1089        let mut system_update = MockSystemUpdate::new();
1090        let state_mock = MockStateRepository::<PersistentState>::new();
1091
1092        system_update
1093            .expect_last_error()
1094            .returning(|| Ok("Unable to deploy image".to_string()));
1095
1096        let (tx, _rx) = mpsc::channel(10);
1097
1098        let ota = Ota::mock_new(system_update, state_mock, tx);
1099
1100        let last_error_result = ota.last_error().await;
1101
1102        assert!(last_error_result.is_ok());
1103        assert_eq!("Unable to deploy image", last_error_result.unwrap());
1104    }
1105
1106    #[tokio::test]
1107    async fn last_error_fail() {
1108        let mut system_update = MockSystemUpdate::new();
1109        let state_mock = MockStateRepository::<PersistentState>::new();
1110
1111        system_update.expect_last_error().returning(|| {
1112            Err(DeviceManagerError::Fatal(
1113                "Unable to call last error".to_string(),
1114            ))
1115        });
1116
1117        let (tx, _rx) = mpsc::channel(10);
1118
1119        let ota = Ota::mock_new(system_update, state_mock, tx);
1120
1121        let last_error_result = ota.last_error().await;
1122
1123        assert!(last_error_result.is_err());
1124        assert!(matches!(
1125            last_error_result.err().unwrap(),
1126            DeviceManagerError::Fatal(_)
1127        ))
1128    }
1129
1130    #[tokio::test]
1131    async fn try_to_downloading_success() {
1132        let state_mock = MockStateRepository::<PersistentState>::new();
1133        let system_update = MockSystemUpdate::new();
1134
1135        let (publisher_tx, _publisher_rx) = mpsc::channel(8);
1136        let mut ota = Ota::mock_new(system_update, state_mock, publisher_tx);
1137        let ota_id = OtaId {
1138            uuid: Uuid::new_v4(),
1139            url: String::new(),
1140        };
1141        ota.ota_status = OtaStatus::Acknowledged(ota_id.clone());
1142
1143        ota.next().await;
1144
1145        assert_eq!(ota.ota_status, OtaStatus::Downloading(ota_id.clone(), 0));
1146    }
1147
1148    #[tokio::test]
1149    async fn ota_streaming_success() {
1150        let slot = "A";
1151        let req = OtaId {
1152            uuid: Uuid::new_v4(),
1153            url: "http://example.com/ota.bin".to_string(),
1154        };
1155        let state = PersistentState {
1156            uuid: req.uuid,
1157            slot: slot.to_string(),
1158        };
1159
1160        let mut state_mock = MockStateRepository::<PersistentState>::new();
1161        let mut system_update = MockSystemUpdate::new();
1162        let mut seq = Sequence::new();
1163
1164        system_update
1165            .expect_boot_slot()
1166            .once()
1167            .in_sequence(&mut seq)
1168            .returning(|| Ok(slot.to_string()));
1169
1170        state_mock
1171            .expect_write()
1172            .once()
1173            .with(predicate::eq(state.clone()))
1174            .returning(|_| Ok(()));
1175
1176        system_update
1177            .expect_install_bundle()
1178            .once()
1179            .in_sequence(&mut seq)
1180            .with(predicate::eq(req.url.clone()))
1181            .returning(|_| Ok(()));
1182
1183        system_update
1184            .expect_operation()
1185            .once()
1186            .in_sequence(&mut seq)
1187            .returning(|| Ok("".to_string()));
1188
1189        system_update
1190            .expect_receive_completed()
1191            .once()
1192            .in_sequence(&mut seq)
1193            .returning(|| {
1194                let progress = [
1195                    DeployStatus::Progress(DeployProgress {
1196                        percentage: 50,
1197                        message: "Copy image".to_string(),
1198                    }),
1199                    DeployStatus::Progress(DeployProgress {
1200                        percentage: 100,
1201                        message: "Installing is done".to_string(),
1202                    }),
1203                    DeployStatus::Completed { signal: 0 },
1204                ]
1205                .map(Ok);
1206
1207                Ok(futures::stream::iter(progress).boxed())
1208            });
1209
1210        state_mock
1211            .expect_exists()
1212            .once()
1213            .in_sequence(&mut seq)
1214            .returning(|| true);
1215
1216        state_mock
1217            .expect_read()
1218            .once()
1219            .in_sequence(&mut seq)
1220            .return_once(|| Ok(state));
1221
1222        let new_slot = "B";
1223
1224        system_update
1225            .expect_boot_slot()
1226            .once()
1227            .in_sequence(&mut seq)
1228            .return_once(|| Ok(new_slot.to_string()));
1229
1230        system_update
1231            .expect_get_primary()
1232            .once()
1233            .in_sequence(&mut seq)
1234            .returning(|| Ok(new_slot.to_string()));
1235
1236        system_update
1237            .expect_mark()
1238            .once()
1239            .in_sequence(&mut seq)
1240            .with(predicate::eq("good"), predicate::eq(new_slot))
1241            .return_once(|_, _| Ok((new_slot.to_string(), "".to_string())));
1242
1243        state_mock
1244            .expect_exists()
1245            .once()
1246            .in_sequence(&mut seq)
1247            .returning(|| true);
1248
1249        state_mock
1250            .expect_clear()
1251            .once()
1252            .in_sequence(&mut seq)
1253            .returning(|| Ok(()));
1254
1255        let (tx, mut rx) = mpsc::channel(10);
1256        let mut ota = Ota::mock_new(system_update, state_mock, tx);
1257        ota.config.streaming = true;
1258        ota.ota_status = OtaStatus::Acknowledged(req.clone());
1259
1260        tokio::time::timeout(
1261            Duration::from_secs(2),
1262            ota.handle_ota_update(CancellationToken::new()),
1263        )
1264        .await
1265        .unwrap();
1266
1267        let exp = [
1268            OtaStatus::Acknowledged(req.clone()),
1269            OtaStatus::Deploying(req.clone(), DeployProgress::default()),
1270            OtaStatus::Deploying(
1271                req.clone(),
1272                DeployProgress {
1273                    percentage: 50,
1274                    message: "Copy image".to_string(),
1275                },
1276            ),
1277            OtaStatus::Deploying(
1278                req.clone(),
1279                DeployProgress {
1280                    percentage: 100,
1281                    message: "Installing is done".to_string(),
1282                },
1283            ),
1284            OtaStatus::Deployed(req.clone()),
1285            OtaStatus::Rebooting(req.clone()),
1286            OtaStatus::Rebooted,
1287            OtaStatus::Success(OtaId {
1288                uuid: req.uuid,
1289                url: "".to_string(),
1290            }),
1291            OtaStatus::Idle,
1292        ];
1293
1294        let mut received = Vec::new();
1295        tokio::time::timeout(Duration::from_secs(2), rx.recv_many(&mut received, 10))
1296            .await
1297            .unwrap();
1298
1299        assert_eq!(received, exp);
1300
1301        assert!(rx.is_empty());
1302    }
1303
1304    #[tokio::test]
1305    async fn try_to_deploying_fail_ota_request() {
1306        let state_mock = MockStateRepository::<PersistentState>::new();
1307        let mut system_update = MockSystemUpdate::new();
1308
1309        system_update
1310            .expect_info()
1311            .returning(|_: &str| Err(DeviceManagerError::Fatal("Unable to get info".to_string())));
1312
1313        let mut ota_request = OtaId::default();
1314        let binary_content = b"\x80\x02\x03";
1315        let binary_size = binary_content.len();
1316
1317        let server = MockServer::start_async().await;
1318        ota_request.url = server.url("/ota.bin");
1319        let mock_ota_file_request = server
1320            .mock_async(|when, then| {
1321                when.method(GET).path("/ota.bin");
1322                then.status(200)
1323                    .header("content-Length", binary_size.to_string())
1324                    .body(binary_content);
1325            })
1326            .await;
1327
1328        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1329        let (ota, _dir) = Ota::mock_new_with_path(
1330            system_update,
1331            state_mock,
1332            "fail_ota_request",
1333            ota_status_publisher,
1334        );
1335
1336        let ota_status = ota.download(ota_request).await;
1337        mock_ota_file_request.assert_async().await;
1338
1339        let receive_result = ota_status_receiver.try_recv();
1340        assert!(receive_result.is_ok());
1341        let ota_status_received = receive_result.unwrap();
1342        assert!(matches!(
1343            ota_status_received,
1344            OtaStatus::Downloading(_, 100)
1345        ));
1346
1347        let receive_result = ota_status_receiver.try_recv();
1348        assert!(receive_result.is_err());
1349
1350        assert!(matches!(
1351            ota_status,
1352            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _),
1353        ));
1354    }
1355
1356    #[tokio::test]
1357    async fn try_to_deploying_fail_5_wget() {
1358        let state_mock = MockStateRepository::<PersistentState>::new();
1359        let mut system_update = MockSystemUpdate::new();
1360
1361        system_update.expect_info().returning(|_: &str| {
1362            Ok(BundleInfo {
1363                compatible: "rauc-demo-x86".to_string(),
1364                version: "1".to_string(),
1365            })
1366        });
1367
1368        let server = MockServer::start_async().await;
1369        let mock_ota_file_request = server
1370            .mock_async(|when, then| {
1371                when.method(GET).path("/ota.bin");
1372                then.status(404);
1373            })
1374            .await;
1375
1376        let ota_url = server.url("/ota.bin");
1377        let ota_id = OtaId {
1378            uuid: Uuid::new_v4(),
1379            url: ota_url.clone(),
1380        };
1381
1382        let (publisher_tx, mut publisher_rx) = mpsc::channel(10);
1383        let (ota, _dir) =
1384            Ota::mock_new_with_path(system_update, state_mock, "fail_5_wget", publisher_tx);
1385
1386        tokio::time::pause();
1387
1388        tokio::time::advance(tokio::time::Duration::from_secs(60)).await;
1389
1390        let status = ota.download(ota_id.clone()).await;
1391
1392        let exp = [
1393            OtaStatus::Error(
1394                OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1395                ota_id.clone(),
1396            ),
1397            OtaStatus::Error(
1398                OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1399                ota_id.clone(),
1400            ),
1401            OtaStatus::Error(
1402                OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1403                ota_id.clone(),
1404            ),
1405            OtaStatus::Error(
1406                OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1407                ota_id.clone(),
1408            ),
1409            OtaStatus::Error(
1410                OtaError::Network(format!("Unable to get content length from: {ota_url}")),
1411                ota_id.clone(),
1412            ),
1413        ];
1414
1415        // for the timeout to work
1416        tokio::time::resume();
1417
1418        for status in exp {
1419            let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1420                .await
1421                .unwrap()
1422                .unwrap();
1423
1424            assert_eq!(val, status);
1425        }
1426
1427        assert_eq!(
1428            status,
1429            OtaStatus::Failure(
1430                OtaError::Internal("Too many attempts to download the OTA file"),
1431                Some(ota_id)
1432            )
1433        );
1434
1435        mock_ota_file_request.assert_hits_async(5).await;
1436    }
1437
1438    #[tokio::test]
1439    async fn try_to_deploying_fail_ota_info() {
1440        let state_mock = MockStateRepository::<PersistentState>::new();
1441        let mut system_update = MockSystemUpdate::new();
1442
1443        system_update
1444            .expect_info()
1445            .returning(|_: &str| Err(DeviceManagerError::Fatal("Unable to get info".to_string())));
1446
1447        let mut ota_request = OtaId::default();
1448        let binary_content = b"\x80\x02\x03";
1449        let binary_size = binary_content.len();
1450
1451        let server = MockServer::start_async().await;
1452        ota_request.url = server.url("/ota.bin");
1453        let mock_ota_file_request = server
1454            .mock_async(|when, then| {
1455                when.method(GET).path("/ota.bin");
1456                then.status(200)
1457                    .header("content-Length", binary_size.to_string())
1458                    .body(binary_content);
1459            })
1460            .await;
1461
1462        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1463        let (ota, _dir) = Ota::mock_new_with_path(
1464            system_update,
1465            state_mock,
1466            "fail_ota_info",
1467            ota_status_publisher,
1468        );
1469
1470        let ota_status = ota.download(ota_request).await;
1471        mock_ota_file_request.assert_async().await;
1472
1473        let receive_result = ota_status_receiver.try_recv();
1474        assert!(receive_result.is_ok());
1475        let ota_status_received = receive_result.unwrap();
1476        assert!(matches!(
1477            ota_status_received,
1478            OtaStatus::Downloading(_, 100)
1479        ));
1480
1481        assert!(matches!(
1482            ota_status,
1483            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _),
1484        ));
1485    }
1486
1487    #[tokio::test]
1488    async fn try_to_deploying_fail_ota_call_compatible() {
1489        let state_mock = MockStateRepository::<PersistentState>::new();
1490        let mut system_update = MockSystemUpdate::new();
1491
1492        system_update.expect_info().returning(|_: &str| {
1493            Ok(BundleInfo {
1494                compatible: "rauc-demo-x86".to_string(),
1495                version: "1".to_string(),
1496            })
1497        });
1498
1499        system_update
1500            .expect_compatible()
1501            .returning(|| Err(DeviceManagerError::Fatal("empty value".to_string())));
1502
1503        let binary_content = b"\x80\x02\x03";
1504        let binary_size = binary_content.len();
1505
1506        let mut ota_request = OtaId::default();
1507        let server = MockServer::start_async().await;
1508        ota_request.url = server.url("/ota.bin");
1509        let mock_ota_file_request = server
1510            .mock_async(|when, then| {
1511                when.method(GET).path("/ota.bin");
1512                then.status(200)
1513                    .header("content-Length", binary_size.to_string())
1514                    .body(binary_content);
1515            })
1516            .await;
1517
1518        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1519        let (ota, _dir) = Ota::mock_new_with_path(
1520            system_update,
1521            state_mock,
1522            "fail_ota_call_compatible",
1523            ota_status_publisher,
1524        );
1525
1526        let ota_status = ota.download(ota_request).await;
1527        mock_ota_file_request.assert_async().await;
1528
1529        let receive_result = ota_status_receiver.try_recv();
1530        assert!(receive_result.is_ok());
1531        let ota_status_received = receive_result.unwrap();
1532        assert!(matches!(
1533            ota_status_received,
1534            OtaStatus::Downloading(_, 100)
1535        ));
1536
1537        let receive_result = ota_status_receiver.try_recv();
1538        assert!(receive_result.is_err());
1539
1540        assert!(matches!(
1541            ota_status,
1542            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1543        ));
1544    }
1545
1546    #[tokio::test]
1547    async fn try_to_deploying_fail_compatible() {
1548        let state_mock = MockStateRepository::<PersistentState>::new();
1549        let mut system_update = MockSystemUpdate::new();
1550
1551        system_update.expect_info().returning(|_: &str| {
1552            Ok(BundleInfo {
1553                compatible: "rauc-demo-x86".to_string(),
1554                version: "1".to_string(),
1555            })
1556        });
1557
1558        system_update
1559            .expect_compatible()
1560            .returning(|| Ok("rauc-demo-arm".to_string()));
1561
1562        let mut ota_request = OtaId::default();
1563
1564        let binary_content = b"\x80\x02\x03";
1565        let binary_size = binary_content.len();
1566
1567        let server = MockServer::start_async().await;
1568        ota_request.url = server.url("/ota.bin");
1569        let mock_ota_file_request = server
1570            .mock_async(|when, then| {
1571                when.method(GET).path("/ota.bin");
1572                then.status(200)
1573                    .header("content-Length", binary_size.to_string())
1574                    .body(binary_content);
1575            })
1576            .await;
1577
1578        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1579        let (ota, _dir) = Ota::mock_new_with_path(
1580            system_update,
1581            state_mock,
1582            "fail_compatible",
1583            ota_status_publisher,
1584        );
1585
1586        let ota_status = ota.download(ota_request).await;
1587        mock_ota_file_request.assert_async().await;
1588
1589        let receive_result = ota_status_receiver.try_recv();
1590        assert!(receive_result.is_ok());
1591        let ota_status_received = receive_result.unwrap();
1592        assert!(matches!(
1593            ota_status_received,
1594            OtaStatus::Downloading(_, 100)
1595        ));
1596
1597        let receive_result = ota_status_receiver.try_recv();
1598        assert!(receive_result.is_err());
1599
1600        assert!(matches!(
1601            ota_status,
1602            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1603        ));
1604    }
1605
1606    #[tokio::test]
1607    async fn try_to_deploying_fail_call_boot_slot() {
1608        let mut state_mock = MockStateRepository::<PersistentState>::new();
1609        let mut system_update = MockSystemUpdate::new();
1610        let mut seq = Sequence::new();
1611
1612        system_update
1613            .expect_info()
1614            .once()
1615            .in_sequence(&mut seq)
1616            .returning(|_: &str| {
1617                Ok(BundleInfo {
1618                    compatible: "rauc-demo-x86".to_string(),
1619                    version: "1".to_string(),
1620                })
1621            });
1622
1623        system_update
1624            .expect_compatible()
1625            .once()
1626            .in_sequence(&mut seq)
1627            .returning(|| Ok("rauc-demo-x86".to_string()));
1628
1629        system_update
1630            .expect_boot_slot()
1631            .once()
1632            .in_sequence(&mut seq)
1633            .returning(|| {
1634                Err(DeviceManagerError::Fatal(
1635                    "unable to call boot slot".to_string(),
1636                ))
1637            });
1638
1639        state_mock
1640            .expect_exists()
1641            .once()
1642            .in_sequence(&mut seq)
1643            .returning(|| false);
1644
1645        let binary_content = b"\x80\x02\x03";
1646        let binary_size = binary_content.len();
1647
1648        let server = MockServer::start_async().await;
1649        let mock_ota_file_request = server
1650            .mock_async(|when, then| {
1651                when.method(GET).path("/ota.bin");
1652                then.status(200)
1653                    .header("content-Length", binary_size.to_string())
1654                    .body(binary_content);
1655            })
1656            .await;
1657
1658        let url = server.url("/ota.bin");
1659        let req = OtaId {
1660            uuid: Uuid::new_v4(),
1661            url,
1662        };
1663
1664        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(6);
1665        let (mut ota, _dir) = Ota::mock_new_with_path(
1666            system_update,
1667            state_mock,
1668            "fail_call_boot_slot",
1669            ota_status_publisher,
1670        );
1671
1672        ota.ota_status = OtaStatus::Acknowledged(req.clone());
1673
1674        tokio::time::timeout(
1675            Duration::from_secs(2),
1676            ota.handle_ota_update(CancellationToken::new()),
1677        )
1678        .await
1679        .unwrap();
1680
1681        let expected = [
1682            OtaStatus::Acknowledged(req.clone()),
1683            OtaStatus::Downloading(req.clone(), 0),
1684            OtaStatus::Downloading(req.clone(), 100),
1685            OtaStatus::Deploying(req.clone(), DeployProgress::default()),
1686            OtaStatus::Failure(
1687                OtaError::Internal("Unable to identify the booted slot"),
1688                Some(req),
1689            ),
1690            OtaStatus::Idle,
1691        ];
1692
1693        for exp in expected {
1694            let status = ota_status_receiver.try_recv().unwrap();
1695
1696            assert_eq!(status, exp)
1697        }
1698
1699        assert!(ota_status_receiver.is_empty());
1700
1701        mock_ota_file_request.assert_async().await;
1702    }
1703
1704    #[tokio::test]
1705    async fn try_to_deploying_fail_write_state() {
1706        let mut state_mock = MockStateRepository::<PersistentState>::new();
1707        let mut system_update = MockSystemUpdate::new();
1708        let mut seq = Sequence::new();
1709
1710        system_update
1711            .expect_info()
1712            .once()
1713            .in_sequence(&mut seq)
1714            .returning(|_: &str| {
1715                Ok(BundleInfo {
1716                    compatible: "rauc-demo-x86".to_string(),
1717                    version: "1".to_string(),
1718                })
1719            });
1720
1721        system_update
1722            .expect_compatible()
1723            .once()
1724            .in_sequence(&mut seq)
1725            .returning(|| Ok("rauc-demo-x86".to_string()));
1726
1727        system_update
1728            .expect_boot_slot()
1729            .once()
1730            .in_sequence(&mut seq)
1731            .returning(|| Ok("A".to_string()));
1732
1733        state_mock
1734            .expect_write()
1735            .once()
1736            .in_sequence(&mut seq)
1737            .returning(|_| {
1738                Err(FileStateError::Write {
1739                    path: "/ota.bin".into(),
1740                    backtrace: io::Error::new(io::ErrorKind::PermissionDenied, "permission denied"),
1741                })
1742            });
1743
1744        state_mock
1745            .expect_exists()
1746            .once()
1747            .in_sequence(&mut seq)
1748            .returning(|| false);
1749
1750        let binary_content = b"\x80\x02\x03";
1751        let binary_size = binary_content.len();
1752
1753        let server = MockServer::start_async().await;
1754
1755        let mock_ota_file_request = server
1756            .mock_async(|when, then| {
1757                when.method(GET).path("/ota.bin");
1758                then.status(200)
1759                    .header("content-length", binary_size.to_string())
1760                    .body(binary_content);
1761            })
1762            .await;
1763
1764        let ota_url = server.url("/ota.bin");
1765
1766        let (publisher_tx, mut publisher_rx) = mpsc::channel(10);
1767        let (mut ota, _dir) =
1768            Ota::mock_new_with_path(system_update, state_mock, "fail_write_state", publisher_tx);
1769
1770        let ota_id = OtaId {
1771            uuid: Uuid::new_v4(),
1772            url: ota_url,
1773        };
1774
1775        tracing_subscriber::fmt()
1776            .with_max_level(tracing::Level::DEBUG)
1777            .with_test_writer()
1778            .init();
1779
1780        ota.ota_status = OtaStatus::Acknowledged(ota_id.clone());
1781
1782        tokio::time::timeout(
1783            Duration::from_secs(2),
1784            ota.handle_ota_update(CancellationToken::new()),
1785        )
1786        .await
1787        .unwrap();
1788
1789        let exp = [
1790            OtaStatus::Acknowledged(ota_id.clone()),
1791            OtaStatus::Downloading(ota_id.clone(), 0),
1792            OtaStatus::Downloading(ota_id.clone(), 100),
1793            OtaStatus::Deploying(ota_id.clone(), DeployProgress::default()),
1794            OtaStatus::Failure(
1795                OtaError::Io("Unable to persist ota state".to_string()),
1796                Some(ota_id),
1797            ),
1798            OtaStatus::Idle,
1799        ];
1800
1801        for status in exp {
1802            let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1803                .await
1804                .unwrap()
1805                .unwrap();
1806
1807            assert_eq!(val, status, "got {status}");
1808        }
1809
1810        assert!(publisher_rx.is_empty());
1811
1812        mock_ota_file_request.assert_async().await;
1813    }
1814
1815    #[tokio::test]
1816    async fn try_to_download_success() {
1817        let uuid = Uuid::new_v4();
1818
1819        let state_mock = MockStateRepository::<PersistentState>::new();
1820        let mut system_update = MockSystemUpdate::new();
1821        let mut seq = Sequence::new();
1822
1823        system_update
1824            .expect_info()
1825            .once()
1826            .in_sequence(&mut seq)
1827            .returning(|_: &str| {
1828                Ok(BundleInfo {
1829                    compatible: "rauc-demo-x86".to_string(),
1830                    version: "1".to_string(),
1831                })
1832            });
1833
1834        system_update
1835            .expect_compatible()
1836            .once()
1837            .in_sequence(&mut seq)
1838            .returning(|| Ok("rauc-demo-x86".to_string()));
1839
1840        let binary_content = b"\x80\x02\x03";
1841        let binary_size = binary_content.len();
1842
1843        let server = MockServer::start_async().await;
1844        let ota_url = server.url("/ota.bin");
1845
1846        let mock_ota_file_request = server
1847            .mock_async(|when, then| {
1848                when.method(GET).path("/ota.bin");
1849                then.status(200)
1850                    .header("content-Length", binary_size.to_string())
1851                    .body(binary_content);
1852            })
1853            .await;
1854
1855        let (publisher_tx, mut publisher_rx) = mpsc::channel(2);
1856        let (ota, _dir) =
1857            Ota::mock_new_with_path(system_update, state_mock, "deploying_success", publisher_tx);
1858
1859        let ota_id = OtaId { uuid, url: ota_url };
1860
1861        let status = ota.download(ota_id.clone()).await;
1862
1863        let exp = [OtaStatus::Downloading(ota_id.clone(), 100)];
1864
1865        for status in exp {
1866            let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
1867                .await
1868                .unwrap()
1869                .unwrap();
1870
1871            assert_eq!(val, status);
1872        }
1873
1874        assert_eq!(
1875            status,
1876            OtaStatus::Deploying(
1877                ota_id,
1878                DeployProgress {
1879                    percentage: 0,
1880                    message: String::new()
1881                }
1882            )
1883        );
1884
1885        mock_ota_file_request.assert_async().await;
1886    }
1887
1888    #[tokio::test]
1889    async fn try_to_deployed_fail_install_bundle() {
1890        let req = OtaId::default();
1891
1892        let mut state_mock = MockStateRepository::<PersistentState>::new();
1893        let mut system_update = MockSystemUpdate::new();
1894        let mut seq = Sequence::new();
1895
1896        system_update
1897            .expect_boot_slot()
1898            .once()
1899            .in_sequence(&mut seq)
1900            .returning(|| Ok("A".to_string()));
1901
1902        state_mock
1903            .expect_write()
1904            .once()
1905            .in_sequence(&mut seq)
1906            .withf(move |p| p.uuid == req.uuid && p.slot == "A")
1907            .returning(|_| Ok(()));
1908
1909        system_update
1910            .expect_install_bundle()
1911            .once()
1912            .in_sequence(&mut seq)
1913            .returning(|_| Err(DeviceManagerError::Fatal("install fail".to_string())));
1914
1915        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1916        let (ota, _dir) = Ota::mock_new_with_path(
1917            system_update,
1918            state_mock,
1919            "fail_install_bundle",
1920            ota_status_publisher,
1921        );
1922
1923        let ota_status = ota.deploy(req).await;
1924
1925        let receive_result = ota_status_receiver.try_recv();
1926        assert!(receive_result.is_err());
1927
1928        assert!(matches!(
1929            ota_status,
1930            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
1931        ));
1932    }
1933
1934    #[tokio::test]
1935    async fn try_to_deployed_fail_operation() {
1936        let req = OtaId::default();
1937
1938        let mut state_mock = MockStateRepository::<PersistentState>::new();
1939        let mut seq = Sequence::new();
1940
1941        state_mock
1942            .expect_write()
1943            .once()
1944            .in_sequence(&mut seq)
1945            .withf(move |p| p.uuid == req.uuid && p.slot == "A")
1946            .returning(|_| Ok(()));
1947
1948        let mut system_update = MockSystemUpdate::new();
1949        let mut seq = Sequence::new();
1950
1951        system_update
1952            .expect_boot_slot()
1953            .once()
1954            .in_sequence(&mut seq)
1955            .returning(|| Ok("A".to_string()));
1956        system_update
1957            .expect_install_bundle()
1958            .once()
1959            .in_sequence(&mut seq)
1960            .returning(|_| Ok(()));
1961        system_update
1962            .expect_operation()
1963            .once()
1964            .in_sequence(&mut seq)
1965            .returning(|| Err(DeviceManagerError::Fatal("operation call fail".to_string())));
1966
1967        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
1968        let (ota, _dir) = Ota::mock_new_with_path(
1969            system_update,
1970            state_mock,
1971            "deployed_fail_operation",
1972            ota_status_publisher,
1973        );
1974
1975        let ota_status = ota.deploy(req).await;
1976
1977        let receive_result = ota_status_receiver.try_recv();
1978        assert!(receive_result.is_err());
1979
1980        assert!(matches!(
1981            ota_status,
1982            OtaStatus::Failure(OtaError::Internal(_), _)
1983        ));
1984    }
1985
1986    #[tokio::test]
1987    async fn try_to_deployed_fail_receive_completed() {
1988        let req = OtaId::default();
1989
1990        let mut state_mock = MockStateRepository::<PersistentState>::new();
1991        let mut system_update = MockSystemUpdate::new();
1992        let mut seq = Sequence::new();
1993
1994        system_update
1995            .expect_boot_slot()
1996            .once()
1997            .in_sequence(&mut seq)
1998            .returning(|| Ok("A".to_string()));
1999
2000        state_mock
2001            .expect_write()
2002            .once()
2003            .in_sequence(&mut seq)
2004            .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2005            .returning(|_| Ok(()));
2006
2007        system_update
2008            .expect_install_bundle()
2009            .once()
2010            .in_sequence(&mut seq)
2011            .returning(|_| Ok(()));
2012        system_update
2013            .expect_operation()
2014            .once()
2015            .in_sequence(&mut seq)
2016            .returning(|| Ok("".to_string()));
2017        system_update
2018            .expect_receive_completed()
2019            .once()
2020            .in_sequence(&mut seq)
2021            .returning(|| {
2022                Err(DeviceManagerError::Fatal(
2023                    "receive_completed call fail".to_string(),
2024                ))
2025            });
2026
2027        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2028        let (ota, _dir) = Ota::mock_new_with_path(
2029            system_update,
2030            state_mock,
2031            "deployed_fail_receive_completed",
2032            ota_status_publisher,
2033        );
2034
2035        let ota_status = ota.deploy(req).await;
2036
2037        let receive_result = ota_status_receiver.try_recv();
2038        assert!(receive_result.is_err());
2039
2040        assert!(matches!(
2041            ota_status,
2042            OtaStatus::Failure(OtaError::Internal(_), _)
2043        ));
2044    }
2045
2046    #[tokio::test]
2047    async fn try_to_deployed_fail_signal() {
2048        let req = OtaId::default();
2049
2050        let mut state_mock = MockStateRepository::<PersistentState>::new();
2051        let mut seq = Sequence::new();
2052
2053        state_mock
2054            .expect_write()
2055            .once()
2056            .in_sequence(&mut seq)
2057            .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2058            .returning(|_| Ok(()));
2059
2060        let mut system_update = MockSystemUpdate::new();
2061        let mut seq = Sequence::new();
2062
2063        system_update
2064            .expect_boot_slot()
2065            .once()
2066            .in_sequence(&mut seq)
2067            .returning(|| Ok("A".to_string()));
2068        system_update
2069            .expect_install_bundle()
2070            .once()
2071            .in_sequence(&mut seq)
2072            .returning(|_| Ok(()));
2073        system_update
2074            .expect_operation()
2075            .once()
2076            .in_sequence(&mut seq)
2077            .returning(|| Ok("".to_string()));
2078        system_update
2079            .expect_receive_completed()
2080            .once()
2081            .in_sequence(&mut seq)
2082            .returning(|| deploy_status_stream([DeployStatus::Completed { signal: -1 }]));
2083        system_update
2084            .expect_last_error()
2085            .once()
2086            .in_sequence(&mut seq)
2087            .returning(|| Ok("Unable to deploy image".to_string()));
2088
2089        let (ota_status_publisher, _) = mpsc::channel(1);
2090        let (ota, _dir) = Ota::mock_new_with_path(
2091            system_update,
2092            state_mock,
2093            "deployed_fail_signal",
2094            ota_status_publisher,
2095        );
2096
2097        let ota_status = ota.deploy(req).await;
2098
2099        assert!(matches!(
2100            ota_status,
2101            OtaStatus::Failure(OtaError::InvalidBaseImage(_), _)
2102        ));
2103    }
2104
2105    #[tokio::test]
2106    async fn try_to_deployed_success() {
2107        let req = OtaId::default();
2108
2109        let mut state_mock = MockStateRepository::<PersistentState>::new();
2110        let mut seq = Sequence::new();
2111
2112        state_mock
2113            .expect_write()
2114            .once()
2115            .in_sequence(&mut seq)
2116            .withf(move |p| p.uuid == req.uuid && p.slot == "A")
2117            .returning(|_| Ok(()));
2118
2119        let mut system_update = MockSystemUpdate::new();
2120        let mut seq = Sequence::new();
2121
2122        system_update
2123            .expect_boot_slot()
2124            .once()
2125            .in_sequence(&mut seq)
2126            .returning(|| Ok("A".to_string()));
2127        system_update
2128            .expect_install_bundle()
2129            .once()
2130            .in_sequence(&mut seq)
2131            .returning(|_| Ok(()));
2132        system_update
2133            .expect_operation()
2134            .once()
2135            .in_sequence(&mut seq)
2136            .returning(|| Ok("".to_string()));
2137        system_update
2138            .expect_receive_completed()
2139            .once()
2140            .in_sequence(&mut seq)
2141            .returning(|| {
2142                let progress = [
2143                    DeployStatus::Progress(DeployProgress {
2144                        percentage: 50,
2145                        message: "Copy image".to_string(),
2146                    }),
2147                    DeployStatus::Progress(DeployProgress {
2148                        percentage: 100,
2149                        message: "Installing is done".to_string(),
2150                    }),
2151                    DeployStatus::Completed { signal: 0 },
2152                ]
2153                .map(Ok);
2154
2155                Ok(futures::stream::iter(progress).boxed())
2156            });
2157
2158        let (publisher_tx, mut publisher_rx) = mpsc::channel(3);
2159        let (ota, _dir) =
2160            Ota::mock_new_with_path(system_update, state_mock, "deployed_success", publisher_tx);
2161
2162        let status = ota.deploy(req.clone()).await;
2163
2164        let exp = [
2165            OtaStatus::Deploying(
2166                req.clone(),
2167                DeployProgress {
2168                    percentage: 50,
2169                    message: "Copy image".to_string(),
2170                },
2171            ),
2172            OtaStatus::Deploying(
2173                req.clone(),
2174                DeployProgress {
2175                    percentage: 100,
2176                    message: "Installing is done".to_string(),
2177                },
2178            ),
2179        ];
2180
2181        for status in exp {
2182            let val = tokio::time::timeout(Duration::from_secs(2), publisher_rx.recv())
2183                .await
2184                .unwrap()
2185                .unwrap();
2186
2187            assert_eq!(val, status);
2188        }
2189
2190        assert_eq!(status, OtaStatus::Deployed(req));
2191    }
2192
2193    #[tokio::test]
2194    async fn try_to_rebooting_success() {
2195        let state_mock = MockStateRepository::<PersistentState>::new();
2196        let system_update = MockSystemUpdate::new();
2197        let ota_request = OtaId::default();
2198
2199        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2200        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2201        let ota_status = ota.reboot(ota_request).await;
2202
2203        let receive_result = ota_status_receiver.try_recv();
2204        assert!(receive_result.is_ok());
2205        let ota_status_received = receive_result.unwrap();
2206        assert!(matches!(ota_status_received, OtaStatus::Rebooting(_)));
2207
2208        let receive_result = ota_status_receiver.try_recv();
2209        assert!(receive_result.is_err());
2210
2211        assert!(matches!(ota_status, OtaStatus::Rebooted));
2212    }
2213
2214    #[tokio::test]
2215    async fn try_to_success_no_pending_update() {
2216        let mut state_mock = MockStateRepository::<PersistentState>::new();
2217        let system_update = MockSystemUpdate::new();
2218
2219        state_mock.expect_exists().returning(|| false);
2220
2221        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2222        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2223        let ota_status = ota.check_reboot().await;
2224
2225        assert!(matches!(ota_status, OtaStatus::NoPendingOta));
2226    }
2227
2228    #[tokio::test]
2229    async fn try_to_success_fail_read_state() {
2230        let mut state_mock = MockStateRepository::<PersistentState>::new();
2231        let system_update = MockSystemUpdate::new();
2232
2233        state_mock.expect_exists().returning(|| true);
2234        state_mock.expect_read().returning(move || {
2235            Err(FileStateError::Write {
2236                path: "/ota.bin".into(),
2237                backtrace: io::Error::new(io::ErrorKind::PermissionDenied, "permission denied"),
2238            })
2239        });
2240
2241        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2242        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2243        let ota_status = ota.check_reboot().await;
2244
2245        assert!(matches!(ota_status, OtaStatus::Failure(OtaError::Io(_), _)));
2246    }
2247
2248    #[tokio::test]
2249    async fn try_to_success_fail_pending_ota() {
2250        let mut state_mock = MockStateRepository::<PersistentState>::new();
2251        let mut system_update = MockSystemUpdate::new();
2252        let uuid = Uuid::new_v4();
2253        let slot = "A";
2254
2255        state_mock.expect_exists().returning(|| true);
2256        state_mock.expect_read().returning(move || {
2257            Ok(PersistentState {
2258                uuid,
2259                slot: slot.to_owned(),
2260            })
2261        });
2262        state_mock.expect_clear().returning(|| Ok(()));
2263
2264        system_update
2265            .expect_boot_slot()
2266            .returning(|| Ok("A".to_owned()));
2267
2268        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2269        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2270        let ota_status = ota.check_reboot().await;
2271
2272        assert!(matches!(
2273            ota_status,
2274            OtaStatus::Failure(OtaError::SystemRollback(_), _)
2275        ));
2276    }
2277
2278    #[tokio::test]
2279    async fn try_to_success() {
2280        let mut state_mock = MockStateRepository::<PersistentState>::new();
2281        let mut system_update = MockSystemUpdate::new();
2282        let uuid = Uuid::new_v4();
2283        let slot = "A";
2284
2285        state_mock.expect_exists().returning(|| true);
2286        state_mock.expect_read().returning(move || {
2287            Ok(PersistentState {
2288                uuid,
2289                slot: slot.to_owned(),
2290            })
2291        });
2292        state_mock.expect_clear().returning(|| Ok(()));
2293
2294        system_update
2295            .expect_boot_slot()
2296            .returning(|| Ok("B".to_owned()));
2297        system_update
2298            .expect_get_primary()
2299            .returning(|| Ok("rootfs.0".to_owned()));
2300        system_update.expect_mark().returning(|_: &str, _: &str| {
2301            Ok((
2302                "rootfs.0".to_owned(),
2303                "marked slot rootfs.0 as good".to_owned(),
2304            ))
2305        });
2306
2307        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2308        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2309        let ota_status = ota.check_reboot().await;
2310
2311        assert!(matches!(ota_status, OtaStatus::Success(_)));
2312    }
2313
2314    #[tokio::test]
2315    async fn do_pending_ota_fail_boot_slot() {
2316        let mut state_mock = MockStateRepository::<PersistentState>::new();
2317        let uuid = Uuid::new_v4();
2318        let slot = "A";
2319
2320        state_mock.expect_read().returning(move || {
2321            Ok(PersistentState {
2322                uuid,
2323                slot: slot.to_owned(),
2324            })
2325        });
2326
2327        let mut system_update = MockSystemUpdate::new();
2328        system_update.expect_boot_slot().returning(|| {
2329            Err(DeviceManagerError::Fatal(
2330                "unable to call boot slot".to_string(),
2331            ))
2332        });
2333
2334        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2335        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2336
2337        let state = ota.state_repository.read().await.unwrap();
2338        let result = ota.do_pending_ota(&state).await;
2339
2340        assert!(result.is_err());
2341        assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2342    }
2343
2344    #[tokio::test]
2345    async fn do_pending_ota_fail_switch_slot() {
2346        let mut state_mock = MockStateRepository::<PersistentState>::new();
2347        let uuid = Uuid::new_v4();
2348        let slot = "A";
2349
2350        state_mock.expect_read().returning(move || {
2351            Ok(PersistentState {
2352                uuid,
2353                slot: slot.to_owned(),
2354            })
2355        });
2356
2357        let mut system_update = MockSystemUpdate::new();
2358        system_update
2359            .expect_boot_slot()
2360            .returning(|| Ok("A".to_owned()));
2361
2362        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2363        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2364
2365        let state = ota.state_repository.read().await.unwrap();
2366        let result = ota.do_pending_ota(&state).await;
2367
2368        assert!(result.is_err());
2369        assert!(matches!(result.err().unwrap(), OtaError::SystemRollback(_),));
2370    }
2371
2372    #[tokio::test]
2373    async fn do_pending_ota_fail_get_primary() {
2374        let mut state_mock = MockStateRepository::<PersistentState>::new();
2375        let uuid = Uuid::new_v4();
2376        let slot = "A";
2377
2378        state_mock.expect_read().returning(move || {
2379            Ok(PersistentState {
2380                uuid,
2381                slot: slot.to_owned(),
2382            })
2383        });
2384
2385        let mut system_update = MockSystemUpdate::new();
2386        system_update
2387            .expect_boot_slot()
2388            .returning(|| Ok("B".to_owned()));
2389        system_update.expect_get_primary().returning(|| {
2390            Err(DeviceManagerError::Fatal(
2391                "unable to call boot slot".to_string(),
2392            ))
2393        });
2394
2395        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2396        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2397        let state = ota.state_repository.read().await.unwrap();
2398        let result = ota.do_pending_ota(&state).await;
2399
2400        assert!(result.is_err());
2401        assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2402    }
2403
2404    #[tokio::test]
2405    async fn do_pending_ota_mark_slot_fail() {
2406        let uuid = Uuid::new_v4();
2407        let slot = "A";
2408
2409        let mut state_mock = MockStateRepository::<PersistentState>::new();
2410        state_mock.expect_exists().returning(|| true);
2411        state_mock.expect_read().returning(move || {
2412            Ok(PersistentState {
2413                uuid,
2414                slot: slot.to_owned(),
2415            })
2416        });
2417
2418        state_mock.expect_clear().returning(|| Ok(()));
2419
2420        let mut system_update = MockSystemUpdate::new();
2421        system_update
2422            .expect_boot_slot()
2423            .returning(|| Ok("B".to_owned()));
2424        system_update
2425            .expect_get_primary()
2426            .returning(|| Ok("rootfs.0".to_owned()));
2427        system_update.expect_mark().returning(|_: &str, _: &str| {
2428            Err(DeviceManagerError::Fatal(
2429                "Unable to call mark function".to_string(),
2430            ))
2431        });
2432
2433        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2434        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2435
2436        let state = ota.state_repository.read().await.unwrap();
2437        let result = ota.do_pending_ota(&state).await;
2438        assert!(result.is_err());
2439        assert!(matches!(result.err().unwrap(), OtaError::Internal(_),));
2440    }
2441
2442    #[tokio::test]
2443    async fn do_pending_ota_fail_marked_wrong_slot() {
2444        let state_mock = MockStateRepository::<PersistentState>::new();
2445        let mut system_update = MockSystemUpdate::new();
2446        let mut seq = Sequence::new();
2447
2448        system_update
2449            .expect_boot_slot()
2450            .once()
2451            .in_sequence(&mut seq)
2452            .returning(|| Ok("B".to_owned()));
2453
2454        system_update
2455            .expect_get_primary()
2456            .once()
2457            .in_sequence(&mut seq)
2458            .returning(|| Ok("rootfs.0".to_owned()));
2459
2460        system_update.expect_mark().returning(|_: &str, _: &str| {
2461            Ok((
2462                "rootfs.1".to_owned(),
2463                "marked slot rootfs.1 as good".to_owned(),
2464            ))
2465        });
2466
2467        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2468        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2469
2470        let state = PersistentState {
2471            uuid: Uuid::new_v4(),
2472            slot: "A".to_string(),
2473        };
2474
2475        let err = ota.do_pending_ota(&state).await.unwrap_err();
2476
2477        assert_eq!(err, OtaError::Internal("Unable to mark slot"));
2478    }
2479
2480    #[tokio::test]
2481    async fn do_pending_ota_success() {
2482        let uuid = Uuid::new_v4();
2483        let slot = "A";
2484
2485        let mut state_mock = MockStateRepository::<PersistentState>::new();
2486        state_mock.expect_exists().returning(|| true);
2487        state_mock.expect_read().returning(move || {
2488            Ok(PersistentState {
2489                uuid,
2490                slot: slot.to_owned(),
2491            })
2492        });
2493
2494        state_mock.expect_clear().returning(|| Ok(()));
2495
2496        let mut system_update = MockSystemUpdate::new();
2497        system_update
2498            .expect_boot_slot()
2499            .returning(|| Ok("B".to_owned()));
2500        system_update
2501            .expect_get_primary()
2502            .returning(|| Ok("rootfs.0".to_owned()));
2503        system_update.expect_mark().returning(|_: &str, _: &str| {
2504            Ok((
2505                "rootfs.0".to_owned(),
2506                "marked slot rootfs.0 as good".to_owned(),
2507            ))
2508        });
2509
2510        let (ota_status_publisher, _ota_status_receiver) = mpsc::channel(1);
2511        let ota = Ota::mock_new(system_update, state_mock, ota_status_publisher);
2512
2513        let state = ota.state_repository.read().await.unwrap();
2514        let result = ota.do_pending_ota(&state).await;
2515        assert!(result.is_ok());
2516    }
2517
2518    #[tokio::test]
2519    async fn wget_failed() {
2520        let (_dir, t_dir) = temp_dir("wget_failed");
2521
2522        let server = MockServer::start_async().await;
2523        let hello_mock = server
2524            .mock_async(|when, then| {
2525                when.method(GET);
2526                then.status(500);
2527            })
2528            .await;
2529
2530        let ota_file = t_dir.join("ota,bin");
2531        let (ota_status_publisher, _) = mpsc::channel(1);
2532
2533        let url = server.url("/ota.bin").to_string();
2534        let req = OtaId {
2535            uuid: Uuid::new_v4(),
2536            url,
2537        };
2538
2539        let client = create_http_client(&req).unwrap();
2540        let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2541
2542        hello_mock.assert_async().await;
2543        assert!(result.is_err());
2544        assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2545    }
2546
2547    #[tokio::test]
2548    async fn wget_failed_wrong_content_length() {
2549        let (_dir, t_dir) = temp_dir("wget_failed_wrong_content_length");
2550
2551        let binary_content = b"\x80\x02\x03";
2552
2553        let server = MockServer::start_async().await;
2554        let ota_url = server.url("/ota.bin");
2555        let mock_ota_file_request = server
2556            .mock_async(|when, then| {
2557                when.method(GET).path("/ota.bin");
2558                then.status(200)
2559                    .header("content-Length", 0.to_string())
2560                    .body(binary_content);
2561            })
2562            .await;
2563
2564        let ota_file = t_dir.join("ota.bin");
2565        let req = OtaId {
2566            uuid: Uuid::new_v4(),
2567            url: ota_url,
2568        };
2569
2570        let (ota_status_publisher, _) = mpsc::channel(1);
2571        let client = create_http_client(&req).unwrap();
2572
2573        let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2574
2575        mock_ota_file_request.assert_async().await;
2576        assert!(result.is_err());
2577        assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2578    }
2579
2580    #[tokio::test]
2581    async fn wget_with_empty_payload() {
2582        let (_dir, t_dir) = temp_dir("wget_with_empty_payload");
2583
2584        let server = MockServer::start_async().await;
2585        let mock_ota_file_request = server
2586            .mock_async(|when, then| {
2587                when.method(GET).path("/ota.bin");
2588                then.status(200).body(b"");
2589            })
2590            .await;
2591
2592        let ota_file = t_dir.join("ota.bin");
2593        let (ota_status_publisher, _) = mpsc::channel(1);
2594
2595        let url = server.url("/ota.bin");
2596        let req = OtaId {
2597            url,
2598            uuid: Uuid::new_v4(),
2599        };
2600
2601        let client = create_http_client(&req).unwrap();
2602
2603        let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2604
2605        mock_ota_file_request.assert_async().await;
2606        assert!(result.is_err());
2607        assert!(matches!(result.err().unwrap(), OtaError::Network(_),));
2608    }
2609
2610    #[tokio::test]
2611    async fn wget_success() {
2612        let (_dir, t_dir) = temp_dir("wget_success");
2613
2614        let binary_content = b"\x80\x02\x03";
2615        let binary_size = binary_content.len();
2616
2617        let server = MockServer::start_async().await;
2618        let ota_url = server.url("/ota.bin");
2619        let mock_ota_file_request = server
2620            .mock_async(|when, then| {
2621                when.method(GET).path("/ota.bin");
2622                then.status(200)
2623                    .header("content-Length", binary_size.to_string())
2624                    .body(binary_content);
2625            })
2626            .await;
2627
2628        let ota_file = t_dir.join("ota.bin");
2629
2630        let (ota_status_publisher, mut ota_status_receiver) = mpsc::channel(1);
2631
2632        let req = OtaId {
2633            uuid: Uuid::new_v4(),
2634            url: ota_url,
2635        };
2636
2637        let client = create_http_client(&req).unwrap();
2638        let result = wget(&client, &req, &ota_file, &ota_status_publisher).await;
2639        mock_ota_file_request.assert_async().await;
2640
2641        let receive_result = ota_status_receiver.try_recv();
2642        assert!(receive_result.is_ok());
2643        let ota_status_received = receive_result.unwrap();
2644        assert!(matches!(
2645            ota_status_received,
2646            OtaStatus::Downloading(_, 100)
2647        ));
2648
2649        let receive_result = ota_status_receiver.try_recv();
2650        assert!(receive_result.is_err());
2651
2652        assert!(result.is_ok());
2653    }
2654}