ave-core 0.11.0

Averiun Ledger core runtime and node API
Documentation
use std::fmt::Display;

use async_trait::async_trait;
use ave_actors::{
    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
    NotPersistentActor, Response,
};
use ave_common::{
    DataToSink, DataToSinkEvent, identity::TimeStamp, response::SubjectDB,
};
use serde::{Deserialize, Serialize};
use tracing::{Span, debug, error, info_span};

use crate::model::common::emit_fail;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SinkData {
    pub public_key: String,
}

#[derive(
    Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum SinkTypes {
    Create,
    Fact,
    Transfer,
    Confirm,
    Reject,
    EOL,
    All,
}

impl Display for SinkTypes {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Create => write!(f, "Create"),
            Self::Fact => write!(f, "Fact"),
            Self::Transfer => write!(f, "Transfer"),
            Self::Confirm => write!(f, "Confirm"),
            Self::Reject => write!(f, "Reject"),
            Self::EOL => write!(f, "EOL"),
            Self::All => write!(f, "All"),
        }
    }
}

impl From<&DataToSink> for SinkTypes {
    fn from(value: &DataToSink) -> Self {
        match value.payload {
            DataToSinkEvent::Create { .. } => Self::Create,
            DataToSinkEvent::FactFull { .. }
            | DataToSinkEvent::FactOpaque { .. } => Self::Fact,
            DataToSinkEvent::Transfer { .. } => Self::Transfer,
            DataToSinkEvent::Confirm { .. } => Self::Confirm,
            DataToSinkEvent::Reject { .. } => Self::Reject,
            DataToSinkEvent::Eol { .. } => Self::EOL,
        }
    }
}

impl From<String> for SinkTypes {
    fn from(value: String) -> Self {
        match value.trim() {
            "Create" => Self::Create,
            "Fact" => Self::Fact,
            "Transfer" => Self::Transfer,
            "Confirm" => Self::Confirm,
            "Reject" => Self::Reject,
            "EOL" => Self::EOL,
            _ => Self::All,
        }
    }
}

impl SinkDataMessage {
    pub fn get_subject_schema(&self) -> (String, String) {
        match self {
            Self::UpdateState(metadata) => {
                (metadata.subject_id.to_string(), metadata.schema_id.clone())
            }
            Self::Event { event, .. } => event.get_subject_schema(),
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SinkDataMessage {
    UpdateState(Box<SubjectDB>),
    Event {
        event: Box<DataToSinkEvent>,
        event_request_timestamp: u64,
        event_ledger_timestamp: u64,
    },
}

impl Message for SinkDataMessage {}

impl NotPersistentActor for SinkData {}

#[derive(Debug, Clone)]
pub enum SinkDataResponse {
    None,
}

impl Response for SinkDataResponse {}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SinkDataEvent {
    Event(Box<DataToSink>),
    State(Box<SubjectDB>),
}

impl Event for SinkDataEvent {}

#[async_trait]
impl Actor for SinkData {
    type Event = SinkDataEvent;
    type Message = SinkDataMessage;
    type Response = SinkDataResponse;

    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
        parent_span.map_or_else(
            || info_span!("SinkData"),
            |parent_span| info_span!(parent: parent_span, "SinkData"),
        )
    }
}

#[async_trait]
impl Handler<Self> for SinkData {
    async fn handle_message(
        &mut self,
        _sender: ActorPath,
        msg: SinkDataMessage,
        ctx: &mut ave_actors::ActorContext<Self>,
    ) -> Result<SinkDataResponse, ActorError> {
        let (subject_id, schema_id) = msg.get_subject_schema();
        let msg_type = match &msg {
            SinkDataMessage::UpdateState(..) => "UpdateState",
            SinkDataMessage::Event { event, .. } => match &**event {
                DataToSinkEvent::Create { .. } => "Create",
                DataToSinkEvent::FactFull { .. } => "FactFull",
                DataToSinkEvent::FactOpaque { .. } => "FactOpaque",
                DataToSinkEvent::Transfer { .. } => "Transfer",
                DataToSinkEvent::Confirm { .. } => "Confirm",
                DataToSinkEvent::Reject { .. } => "Reject",
                DataToSinkEvent::Eol { .. } => "EOL",
            },
        };

        let event = match msg {
            SinkDataMessage::UpdateState(metadata) => {
                SinkDataEvent::State(metadata)
            }
            SinkDataMessage::Event {
                event,
                event_request_timestamp,
                event_ledger_timestamp,
            } => SinkDataEvent::Event(Box::new(DataToSink {
                payload: *event,
                public_key: self.public_key.clone(),
                event_request_timestamp,
                event_ledger_timestamp,
                sink_timestamp: TimeStamp::now().as_nanos(),
            })),
        };

        self.on_event(event, ctx).await;

        debug!(
            msg_type = msg_type,
            subject_id = %subject_id,
            schema_id = %schema_id,
            public_key = %self.public_key,
            "Sink data event processed"
        );

        Ok(SinkDataResponse::None)
    }

    async fn on_event(
        &mut self,
        event: SinkDataEvent,
        ctx: &mut ActorContext<Self>,
    ) {
        let (subject_id, schema_id) = match &event {
            SinkDataEvent::Event(data_to_sink) => {
                data_to_sink.payload.get_subject_schema()
            }
            SinkDataEvent::State(metadata) => {
                (metadata.subject_id.to_string(), metadata.schema_id.clone())
            }
        };
        if let Err(e) = ctx.publish_event(event.clone()).await {
            error!(
                error = %e,
                subject_id = %subject_id,
                schema_id = %schema_id,
                public_key = %self.public_key,
                "Failed to publish sink data event"
            );
            emit_fail(ctx, e).await;
        } else {
            debug!(
                subject_id = %subject_id,
                schema_id = %schema_id,
                "Sink data event published successfully"
            );
        }
    }
}