Skip to main content

ave_core/subject/
sinkdata.rs

1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6    NotPersistentActor, Response,
7};
8use ave_common::{DataToSink, DataToSinkEvent, identity::TimeStamp};
9use serde::{Deserialize, Serialize};
10use tracing::{Span, debug, error, info_span};
11
12use crate::{model::common::emit_fail, subject::Metadata};
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct SinkData {
16    pub public_key: String,
17}
18
19#[derive(
20    Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
21)]
22pub enum SinkTypes {
23    Create,
24    Fact,
25    Transfer,
26    Confirm,
27    Reject,
28    EOL,
29    All,
30}
31
32impl Display for SinkTypes {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            Self::Create => write!(f, "Create"),
36            Self::Fact => write!(f, "Fact"),
37            Self::Transfer => write!(f, "Transfer"),
38            Self::Confirm => write!(f, "Confirm"),
39            Self::Reject => write!(f, "Reject"),
40            Self::EOL => write!(f, "EOL"),
41            Self::All => write!(f, "All"),
42        }
43    }
44}
45
46impl From<&DataToSink> for SinkTypes {
47    fn from(value: &DataToSink) -> Self {
48        match value.event {
49            DataToSinkEvent::Create { .. } => Self::Create,
50            DataToSinkEvent::Fact { .. } => Self::Fact,
51            DataToSinkEvent::Transfer { .. } => Self::Transfer,
52            DataToSinkEvent::Confirm { .. } => Self::Confirm,
53            DataToSinkEvent::Reject { .. } => Self::Reject,
54            DataToSinkEvent::Eol { .. } => Self::EOL,
55        }
56    }
57}
58
59impl From<String> for SinkTypes {
60    fn from(value: String) -> Self {
61        match value.trim() {
62            "Create" => Self::Create,
63            "Fact" => Self::Fact,
64            "Transfer" => Self::Transfer,
65            "Confirm" => Self::Confirm,
66            "Reject" => Self::Reject,
67            "EOL" => Self::EOL,
68            _ => Self::All,
69        }
70    }
71}
72
73impl SinkDataMessage {
74    pub fn get_subject_schema(&self) -> (String, String) {
75        match self {
76            Self::UpdateState(metadata) => (
77                metadata.subject_id.to_string(),
78                metadata.schema_id.to_string(),
79            ),
80            Self::Event { event, .. } => match &**event {
81                DataToSinkEvent::Create {
82                    subject_id,
83                    schema_id,
84                    ..
85                }
86                | DataToSinkEvent::Fact {
87                    subject_id,
88                    schema_id,
89                    ..
90                }
91                | DataToSinkEvent::Transfer {
92                    subject_id,
93                    schema_id,
94                    ..
95                }
96                | DataToSinkEvent::Confirm {
97                    subject_id,
98                    schema_id,
99                    ..
100                }
101                | DataToSinkEvent::Reject {
102                    subject_id,
103                    schema_id,
104                    ..
105                }
106                | DataToSinkEvent::Eol {
107                    subject_id,
108                    schema_id,
109                    ..
110                } => (subject_id.clone(), schema_id.to_string()),
111            },
112        }
113    }
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub enum SinkDataMessage {
118    UpdateState(Box<Metadata>),
119    Event {
120        event: Box<DataToSinkEvent>,
121        event_request_timestamp: u64,
122        event_ledger_timestamp: u64,
123    },
124}
125
126impl Message for SinkDataMessage {}
127
128impl NotPersistentActor for SinkData {}
129
130#[derive(Debug, Clone)]
131pub enum SinkDataResponse {
132    None,
133}
134
135impl Response for SinkDataResponse {}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub enum SinkDataEvent {
139    Event(Box<DataToSink>),
140    State(Box<Metadata>),
141}
142
143impl Event for SinkDataEvent {}
144
145#[async_trait]
146impl Actor for SinkData {
147    type Event = SinkDataEvent;
148    type Message = SinkDataMessage;
149    type Response = SinkDataResponse;
150
151    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
152        parent_span.map_or_else(
153            || info_span!("SinkData"),
154            |parent_span| info_span!(parent: parent_span, "SinkData"),
155        )
156    }
157}
158
159#[async_trait]
160impl Handler<Self> for SinkData {
161    async fn handle_message(
162        &mut self,
163        _sender: ActorPath,
164        msg: SinkDataMessage,
165        ctx: &mut ave_actors::ActorContext<Self>,
166    ) -> Result<SinkDataResponse, ActorError> {
167        let (subject_id, schema_id) = msg.get_subject_schema();
168        let msg_type = match &msg {
169            SinkDataMessage::UpdateState(..) => "UpdateState",
170            SinkDataMessage::Event { event, .. } => match &**event {
171                DataToSinkEvent::Create { .. } => "Create",
172                DataToSinkEvent::Fact { .. } => "Fact",
173                DataToSinkEvent::Transfer { .. } => "Transfer",
174                DataToSinkEvent::Confirm { .. } => "Confirm",
175                DataToSinkEvent::Reject { .. } => "Reject",
176                DataToSinkEvent::Eol { .. } => "EOL",
177            },
178        };
179
180        let event = match msg {
181            SinkDataMessage::UpdateState(metadata) => {
182                SinkDataEvent::State(metadata)
183            }
184            SinkDataMessage::Event {
185                event,
186                event_request_timestamp,
187                event_ledger_timestamp,
188            } => SinkDataEvent::Event(Box::new(DataToSink {
189                event: *event,
190                public_key: self.public_key.clone(),
191                event_request_timestamp,
192                event_ledger_timestamp,
193                sink_timestamp: TimeStamp::now().as_nanos(),
194            })),
195        };
196
197        self.on_event(event, ctx).await;
198
199        debug!(
200            msg_type = msg_type,
201            subject_id = %subject_id,
202            schema_id = %schema_id,
203            public_key = %self.public_key,
204            "Sink data event processed"
205        );
206
207        Ok(SinkDataResponse::None)
208    }
209
210    async fn on_event(
211        &mut self,
212        event: SinkDataEvent,
213        ctx: &mut ActorContext<Self>,
214    ) {
215        let (subject_id, schema_id) = match &event {
216            SinkDataEvent::Event(data_to_sink) => {
217                data_to_sink.event.get_subject_schema()
218            }
219            SinkDataEvent::State(metadata) => (
220                metadata.subject_id.to_string(),
221                metadata.schema_id.to_string(),
222            ),
223        };
224        if let Err(e) = ctx.publish_event(event.clone()).await {
225            error!(
226                error = %e,
227                subject_id = %subject_id,
228                schema_id = %schema_id,
229                public_key = %self.public_key,
230                "Failed to publish sink data event"
231            );
232            emit_fail(ctx, e).await;
233        } else {
234            debug!(
235                subject_id = %subject_id,
236                schema_id = %schema_id,
237                "Sink data event published successfully"
238            );
239        }
240    }
241}