ockam_api 0.93.0

Ockam's request-response API
use core::sync::atomic::{AtomicBool, Ordering};
use ockam::{route, Address, Context};
use ockam_api::session::replacer::{
    AdditionalSessionReplacer, CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind,
    SessionReplacer,
};
use ockam_core::compat::sync::Arc;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{async_trait, Any, Error, NeutralMessage, Result, Route, Routed, Worker};
use std::sync::atomic::AtomicU8;
use std::time::Duration;
use tracing::info;

pub struct MockEchoer {
    pub responsive: Arc<AtomicBool>,
    pub drop_every: Arc<AtomicU8>,

    drop_counter: u8,
}

impl MockEchoer {
    pub fn new() -> Self {
        Self {
            responsive: Arc::new(AtomicBool::new(true)),
            drop_every: Arc::new(AtomicU8::new(0)),

            drop_counter: 0,
        }
    }
}

#[ockam::worker]
impl Worker for MockEchoer {
    type Context = Context;
    type Message = Any;

    async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
        if !self.responsive.load(Ordering::Relaxed) {
            info!("Drop message responsive=false");
            return Ok(());
        }

        let drop_every = self.drop_every.load(Ordering::Relaxed);
        if drop_every != 0 {
            self.drop_counter += 1;

            if self.drop_counter == drop_every {
                info!("Drop message drop_counter={}", drop_every);
                self.drop_counter = 0;
                return Ok(());
            }
        }

        ctx.send(
            msg.return_route().clone(),
            NeutralMessage::from(msg.into_payload()),
        )
        .await?;
        info!("Echo message back");

        Ok(())
    }
}

pub struct MockHop {
    pub responsive: Arc<AtomicBool>,
}

impl MockHop {
    pub fn new() -> Self {
        Self {
            responsive: Arc::new(AtomicBool::new(true)),
        }
    }
}

#[ockam::worker]
impl Worker for MockHop {
    type Context = Context;
    type Message = Any;

    async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
        if !self.responsive.load(Ordering::Relaxed) {
            info!("Drop Hop message, {}", ctx.primary_address());
            return Ok(());
        }

        info!("Forward Hop message {}", ctx.primary_address());

        let msg = msg.into_local_message();
        let msg = msg.pop_front_onward_route()?;

        ctx.forward(msg).await
    }
}

#[derive(Clone)]
pub struct MockReplacer {
    pub name: String,
    pub create_called: Arc<AtomicBool>,
    pub recreate_called: Arc<AtomicBool>,
    pub close_called: Arc<AtomicBool>,
    pub succeeds: Arc<AtomicBool>,
    pub ping_route: Route,
}

impl Default for MockReplacer {
    fn default() -> Self {
        Self::new("", route![])
    }
}

impl MockReplacer {
    pub fn new(name: &str, ping_route: Route) -> Self {
        Self {
            name: name.to_string(),
            create_called: Arc::new(AtomicBool::new(false)),
            recreate_called: Arc::new(AtomicBool::new(false)),
            close_called: Arc::new(AtomicBool::new(false)),
            succeeds: Arc::new(AtomicBool::new(true)),
            ping_route,
        }
    }

    async fn create_impl(&mut self) -> Result<()> {
        self.create_called.store(true, Ordering::Relaxed);

        info!("MockReplacer {} create called", self.name);
        tokio::time::sleep(Duration::from_millis(500)).await;

        if !self.succeeds.load(Ordering::Relaxed) {
            info!("MockReplacer {} create failed", self.name);
            return Err(Error::new(Origin::Api, Kind::Invalid, ""));
        }

        info!("MockReplacer {} create succeeded", self.name);

        Ok(())
    }

    async fn recreate_impl(&mut self) -> Result<()> {
        self.recreate_called.store(true, Ordering::Relaxed);

        info!("MockReplacer {} recreate called", self.name);

        self.close_impl();
        self.create_impl().await
    }

    fn close_impl(&mut self) {
        self.close_called.store(true, Ordering::Relaxed);

        info!("MockReplacer {} close called", self.name);
    }
}

#[async_trait]
impl SessionReplacer for MockReplacer {
    async fn create(&mut self) -> Result<ReplacerOutcome> {
        self.create_impl().await?;

        Ok(ReplacerOutcome {
            ping_route: self.ping_route.clone(),
            kind: ReplacerOutputKind::Inlet(CurrentInletStatus {
                route: route![],
                worker: Some(Address::from_string("echo")),
            }),
        })
    }

    async fn close(&mut self) {
        self.close_impl()
    }

    async fn on_session_down(&self) {
        info!("MockReplacer {} on_session_down called", self.name);
    }

    async fn on_session_replaced(&self) {
        info!("MockReplacer {} on_session_replaced called", self.name);
    }
}

#[async_trait]
impl AdditionalSessionReplacer for MockReplacer {
    async fn create_additional(&mut self) -> Result<Route> {
        self.create_impl().await?;

        Ok(self.ping_route.clone())
    }

    async fn close_additional(&mut self, _enable_fallback: bool) {
        self.close_impl()
    }
}