ave-core 0.11.0

Averiun Ledger core runtime and node API
Documentation
use std::num::NonZeroUsize;

use async_trait::async_trait;
use ave_actors::{
    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
    NotPersistentActor, Response,
};
use ave_common::{
    identity::DigestIdentifier,
    response::{RequestInfo, RequestInfoExtend, RequestState},
};
use borsh::{BorshDeserialize, BorshSerialize};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tracing::{Span, debug, error, info_span, warn};

#[derive(Clone, Debug)]
pub struct RequestTracking {
    cache: LruCache<DigestIdentifier, RequestInfo>,
}

impl RequestTracking {
    pub fn new(size: usize) -> Self {
        let size = if size == 0 { 100 } else { size };

        Self {
            cache: LruCache::new(NonZeroUsize::new(size).expect("size > 0")),
        }
    }
}

impl NotPersistentActor for RequestTracking {}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RequestTrackingMessage {
    UpdateState {
        request_id: DigestIdentifier,
        state: RequestState,
    },
    UpdateVersion {
        request_id: DigestIdentifier,
        version: u64,
    },
    AllRequests,
    SearchRequest(DigestIdentifier),
}

#[derive(Debug, Clone)]
pub enum RequestTrackingResponse {
    Ok,
    AllInfo(Vec<RequestInfoExtend>),
    Info(RequestInfo),
    NotFound,
}

impl Response for RequestTrackingResponse {}

impl Message for RequestTrackingMessage {}

#[async_trait]
impl Actor for RequestTracking {
    type Message = RequestTrackingMessage;
    type Event = RequestTrackingEvent;
    type Response = RequestTrackingResponse;

    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
        parent_span.map_or_else(
            || info_span!("RequestTracking"),
            |parent_span| info_span!(parent: parent_span, "RequestTracking"),
        )
    }
}

#[derive(
    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
)]
pub struct RequestTrackingEvent {
    pub request_id: String,
    pub subject_id: String,
    pub sn: Option<u64>,
    pub error: String,
    pub who: String,
    pub abort_type: String,
}

impl Event for RequestTrackingEvent {}

#[async_trait]
impl Handler<Self> for RequestTracking {
    async fn handle_message(
        &mut self,
        _sender: ActorPath,
        msg: RequestTrackingMessage,
        ctx: &mut ave_actors::ActorContext<Self>,
    ) -> Result<RequestTrackingResponse, ActorError> {
        match msg {
            RequestTrackingMessage::AllRequests => {
                let count = self.cache.len();
                debug!(
                    msg_type = "AllRequests",
                    requests_count = count,
                    "Retrieving all tracked requests"
                );
                Ok(RequestTrackingResponse::AllInfo(
                    self.cache
                        .iter()
                        .map(|x| RequestInfoExtend {
                            request_id: x.0.to_string(),
                            state: x.1.state.clone(),
                            version: x.1.version,
                        })
                        .collect(),
                ))
            }
            RequestTrackingMessage::UpdateState { request_id, state } => {
                if let Some(info) = self.cache.get_mut(&request_id) {
                    let old_state = info.state.clone();
                    info.state = state.clone();
                    debug!(
                        msg_type = "UpdateState",
                        request_id = %request_id,
                        old_state = ?old_state,
                        new_state = ?state,
                        "Request state updated"
                    );
                } else {
                    self.cache.put(
                        request_id.clone(),
                        RequestInfo {
                            state: state.clone(),
                            version: 0,
                        },
                    );

                    debug!(
                        msg_type = "UpdateState",
                        request_id = %request_id,
                        state = ?state,
                        "New request tracked"
                    );
                };

                let event = match state {
                    RequestState::Invalid {
                        subject_id,
                        who,
                        sn,
                        error,
                    } => Some(RequestTrackingEvent {
                        request_id: request_id.to_string(),
                        abort_type: "Invalid".to_string(),
                        error,
                        sn,
                        subject_id,
                        who,
                    }),
                    RequestState::Abort {
                        subject_id,
                        who,
                        sn,
                        error,
                    } => Some(RequestTrackingEvent {
                        request_id: request_id.to_string(),
                        abort_type: "Abort".to_string(),
                        error,
                        sn,
                        subject_id,
                        who,
                    }),
                    _ => None,
                };

                if let Some(event) = event {
                    self.on_event(event, ctx).await;
                }

                Ok(RequestTrackingResponse::Ok)
            }
            RequestTrackingMessage::UpdateVersion {
                request_id,
                version,
            } => {
                if let Some(info) = self.cache.get_mut(&request_id) {
                    let old_version = info.version;
                    info.version = version;
                    debug!(
                        msg_type = "UpdateVersion",
                        request_id = %request_id,
                        old_version = old_version,
                        new_version = version,
                        "Request version updated"
                    );
                } else {
                    warn!(
                        msg_type = "UpdateVersion",
                        request_id = %request_id,
                        version = version,
                        "Request not found in cache"
                    );
                };

                Ok(RequestTrackingResponse::Ok)
            }
            RequestTrackingMessage::SearchRequest(request_id) => {
                self.cache.get(&request_id).map_or_else(
                    || {
                        debug!(
                            msg_type = "SearchRequest",
                            request_id = %request_id,
                            "Request not found in cache"
                        );
                        Ok(RequestTrackingResponse::NotFound)
                    },
                    |info| {
                        debug!(
                            msg_type = "SearchRequest",
                            request_id = %request_id,
                            state = ?info.state,
                            version = info.version,
                            "Request found in cache"
                        );
                        Ok(RequestTrackingResponse::Info(info.clone()))
                    },
                )
            }
        }
    }

    async fn on_event(
        &mut self,
        event: RequestTrackingEvent,
        ctx: &mut ActorContext<Self>,
    ) {
        if let Err(e) = ctx.publish_event(event).await {
            error!(
                error = %e,
                "Failed to publish event"
            );
            ctx.system().crash_system();
        };
    }
}