Skip to main content

ave_core/subject/
sinkdata.rs

1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6    NotPersistentActor, Response,
7};
8use ave_common::{
9    DataToSink, DataToSinkEvent, identity::TimeStamp, response::SubjectDB,
10};
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13
14use crate::model::common::emit_fail;
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct SinkData {
18    pub public_key: String,
19}
20
21#[derive(
22    Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
23)]
24pub enum SinkTypes {
25    Create,
26    Fact,
27    Transfer,
28    Confirm,
29    Reject,
30    EOL,
31    All,
32}
33
34impl Display for SinkTypes {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::Create => write!(f, "Create"),
38            Self::Fact => write!(f, "Fact"),
39            Self::Transfer => write!(f, "Transfer"),
40            Self::Confirm => write!(f, "Confirm"),
41            Self::Reject => write!(f, "Reject"),
42            Self::EOL => write!(f, "EOL"),
43            Self::All => write!(f, "All"),
44        }
45    }
46}
47
48impl From<&DataToSink> for SinkTypes {
49    fn from(value: &DataToSink) -> Self {
50        match value.payload {
51            DataToSinkEvent::Create { .. } => Self::Create,
52            DataToSinkEvent::FactFull { .. }
53            | DataToSinkEvent::FactOpaque { .. } => Self::Fact,
54            DataToSinkEvent::Transfer { .. } => Self::Transfer,
55            DataToSinkEvent::Confirm { .. } => Self::Confirm,
56            DataToSinkEvent::Reject { .. } => Self::Reject,
57            DataToSinkEvent::Eol { .. } => Self::EOL,
58        }
59    }
60}
61
62impl From<String> for SinkTypes {
63    fn from(value: String) -> Self {
64        match value.trim() {
65            "Create" => Self::Create,
66            "Fact" => Self::Fact,
67            "Transfer" => Self::Transfer,
68            "Confirm" => Self::Confirm,
69            "Reject" => Self::Reject,
70            "EOL" => Self::EOL,
71            _ => Self::All,
72        }
73    }
74}
75
76impl SinkDataMessage {
77    pub fn get_subject_schema(&self) -> (String, String) {
78        match self {
79            Self::UpdateState(metadata) => {
80                (metadata.subject_id.to_string(), metadata.schema_id.clone())
81            }
82            Self::Event { event, .. } => event.get_subject_schema(),
83        }
84    }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SinkDataMessage {
89    UpdateState(Box<SubjectDB>),
90    Event {
91        event: Box<DataToSinkEvent>,
92        event_request_timestamp: u64,
93        event_ledger_timestamp: u64,
94    },
95}
96
97impl Message for SinkDataMessage {
98    fn is_critical(&self) -> bool {
99        true
100    }
101}
102
103impl NotPersistentActor for SinkData {}
104
105#[derive(Debug, Clone)]
106pub enum SinkDataResponse {
107    None,
108}
109
110impl Response for SinkDataResponse {}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum SinkDataEvent {
114    Event(Box<DataToSink>),
115    State(Box<SubjectDB>),
116}
117
118impl Event for SinkDataEvent {}
119
120#[async_trait]
121impl Actor for SinkData {
122    type Event = SinkDataEvent;
123    type Message = SinkDataMessage;
124    type Response = SinkDataResponse;
125
126    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
127        parent_span.map_or_else(
128            || info_span!("SinkData"),
129            |parent_span| info_span!(parent: parent_span, "SinkData"),
130        )
131    }
132}
133
134#[async_trait]
135impl Handler<Self> for SinkData {
136    async fn handle_message(
137        &mut self,
138        _sender: ActorPath,
139        msg: SinkDataMessage,
140        ctx: &mut ave_actors::ActorContext<Self>,
141    ) -> Result<SinkDataResponse, ActorError> {
142        let (subject_id, schema_id) = msg.get_subject_schema();
143        let msg_type = match &msg {
144            SinkDataMessage::UpdateState(..) => "UpdateState",
145            SinkDataMessage::Event { event, .. } => match &**event {
146                DataToSinkEvent::Create { .. } => "Create",
147                DataToSinkEvent::FactFull { .. } => "FactFull",
148                DataToSinkEvent::FactOpaque { .. } => "FactOpaque",
149                DataToSinkEvent::Transfer { .. } => "Transfer",
150                DataToSinkEvent::Confirm { .. } => "Confirm",
151                DataToSinkEvent::Reject { .. } => "Reject",
152                DataToSinkEvent::Eol { .. } => "EOL",
153            },
154        };
155
156        let event = match msg {
157            SinkDataMessage::UpdateState(metadata) => {
158                SinkDataEvent::State(metadata)
159            }
160            SinkDataMessage::Event {
161                event,
162                event_request_timestamp,
163                event_ledger_timestamp,
164            } => SinkDataEvent::Event(Box::new(DataToSink {
165                payload: *event,
166                public_key: self.public_key.clone(),
167                event_request_timestamp,
168                event_ledger_timestamp,
169                sink_timestamp: TimeStamp::now().as_nanos(),
170            })),
171        };
172
173        self.on_event(event, ctx).await;
174
175        debug!(
176            msg_type = msg_type,
177            subject_id = %subject_id,
178            schema_id = %schema_id,
179            public_key = %self.public_key,
180            "Sink data event processed"
181        );
182
183        Ok(SinkDataResponse::None)
184    }
185
186    async fn on_event(
187        &mut self,
188        event: SinkDataEvent,
189        ctx: &mut ActorContext<Self>,
190    ) {
191        let (subject_id, schema_id) = match &event {
192            SinkDataEvent::Event(data_to_sink) => {
193                data_to_sink.payload.get_subject_schema()
194            }
195            SinkDataEvent::State(metadata) => {
196                (metadata.subject_id.to_string(), metadata.schema_id.clone())
197            }
198        };
199        if let Err(e) = ctx.publish_event(event.clone()).await {
200            error!(
201                error = %e,
202                subject_id = %subject_id,
203                schema_id = %schema_id,
204                public_key = %self.public_key,
205                "Failed to publish sink data event"
206            );
207            emit_fail(ctx, e).await;
208        } else {
209            debug!(
210                subject_id = %subject_id,
211                schema_id = %schema_id,
212                "Sink data event published successfully"
213            );
214        }
215    }
216}