1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 NotPersistentActor, Response,
7};
8use ave_common::{DataToSink, DataToSinkEvent, identity::TimeStamp};
9use serde::{Deserialize, Serialize};
10use tracing::{Span, debug, error, info_span};
11
12use crate::{model::common::emit_fail, subject::Metadata};
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct SinkData {
16 pub public_key: String,
17}
18
19#[derive(
20 Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
21)]
22pub enum SinkTypes {
23 Create,
24 Fact,
25 Transfer,
26 Confirm,
27 Reject,
28 EOL,
29 All,
30}
31
32impl Display for SinkTypes {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 Self::Create => write!(f, "Create"),
36 Self::Fact => write!(f, "Fact"),
37 Self::Transfer => write!(f, "Transfer"),
38 Self::Confirm => write!(f, "Confirm"),
39 Self::Reject => write!(f, "Reject"),
40 Self::EOL => write!(f, "EOL"),
41 Self::All => write!(f, "All"),
42 }
43 }
44}
45
46impl From<&DataToSink> for SinkTypes {
47 fn from(value: &DataToSink) -> Self {
48 match value.event {
49 DataToSinkEvent::Create { .. } => Self::Create,
50 DataToSinkEvent::Fact { .. } => Self::Fact,
51 DataToSinkEvent::Transfer { .. } => Self::Transfer,
52 DataToSinkEvent::Confirm { .. } => Self::Confirm,
53 DataToSinkEvent::Reject { .. } => Self::Reject,
54 DataToSinkEvent::Eol { .. } => Self::EOL,
55 }
56 }
57}
58
59impl From<String> for SinkTypes {
60 fn from(value: String) -> Self {
61 match value.trim() {
62 "Create" => Self::Create,
63 "Fact" => Self::Fact,
64 "Transfer" => Self::Transfer,
65 "Confirm" => Self::Confirm,
66 "Reject" => Self::Reject,
67 "EOL" => Self::EOL,
68 _ => Self::All,
69 }
70 }
71}
72
73impl SinkDataMessage {
74 pub fn get_subject_schema(&self) -> (String, String) {
75 match self {
76 Self::UpdateState(metadata) => (
77 metadata.subject_id.to_string(),
78 metadata.schema_id.to_string(),
79 ),
80 Self::Event { event, .. } => match &**event {
81 DataToSinkEvent::Create {
82 subject_id,
83 schema_id,
84 ..
85 }
86 | DataToSinkEvent::Fact {
87 subject_id,
88 schema_id,
89 ..
90 }
91 | DataToSinkEvent::Transfer {
92 subject_id,
93 schema_id,
94 ..
95 }
96 | DataToSinkEvent::Confirm {
97 subject_id,
98 schema_id,
99 ..
100 }
101 | DataToSinkEvent::Reject {
102 subject_id,
103 schema_id,
104 ..
105 }
106 | DataToSinkEvent::Eol {
107 subject_id,
108 schema_id,
109 ..
110 } => (subject_id.clone(), schema_id.to_string()),
111 },
112 }
113 }
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub enum SinkDataMessage {
118 UpdateState(Box<Metadata>),
119 Event {
120 event: Box<DataToSinkEvent>,
121 event_request_timestamp: u64,
122 event_ledger_timestamp: u64,
123 },
124}
125
126impl Message for SinkDataMessage {}
127
128impl NotPersistentActor for SinkData {}
129
130#[derive(Debug, Clone)]
131pub enum SinkDataResponse {
132 None,
133}
134
135impl Response for SinkDataResponse {}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub enum SinkDataEvent {
139 Event(Box<DataToSink>),
140 State(Box<Metadata>),
141}
142
143impl Event for SinkDataEvent {}
144
145#[async_trait]
146impl Actor for SinkData {
147 type Event = SinkDataEvent;
148 type Message = SinkDataMessage;
149 type Response = SinkDataResponse;
150
151 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
152 parent_span.map_or_else(
153 || info_span!("SinkData"),
154 |parent_span| info_span!(parent: parent_span, "SinkData"),
155 )
156 }
157}
158
159#[async_trait]
160impl Handler<Self> for SinkData {
161 async fn handle_message(
162 &mut self,
163 _sender: ActorPath,
164 msg: SinkDataMessage,
165 ctx: &mut ave_actors::ActorContext<Self>,
166 ) -> Result<SinkDataResponse, ActorError> {
167 let (subject_id, schema_id) = msg.get_subject_schema();
168 let msg_type = match &msg {
169 SinkDataMessage::UpdateState(..) => "UpdateState",
170 SinkDataMessage::Event { event, .. } => match &**event {
171 DataToSinkEvent::Create { .. } => "Create",
172 DataToSinkEvent::Fact { .. } => "Fact",
173 DataToSinkEvent::Transfer { .. } => "Transfer",
174 DataToSinkEvent::Confirm { .. } => "Confirm",
175 DataToSinkEvent::Reject { .. } => "Reject",
176 DataToSinkEvent::Eol { .. } => "EOL",
177 },
178 };
179
180 let event = match msg {
181 SinkDataMessage::UpdateState(metadata) => {
182 SinkDataEvent::State(metadata)
183 }
184 SinkDataMessage::Event {
185 event,
186 event_request_timestamp,
187 event_ledger_timestamp,
188 } => SinkDataEvent::Event(Box::new(DataToSink {
189 event: *event,
190 public_key: self.public_key.clone(),
191 event_request_timestamp,
192 event_ledger_timestamp,
193 sink_timestamp: TimeStamp::now().as_nanos(),
194 })),
195 };
196
197 self.on_event(event, ctx).await;
198
199 debug!(
200 msg_type = msg_type,
201 subject_id = %subject_id,
202 schema_id = %schema_id,
203 public_key = %self.public_key,
204 "Sink data event processed"
205 );
206
207 Ok(SinkDataResponse::None)
208 }
209
210 async fn on_event(
211 &mut self,
212 event: SinkDataEvent,
213 ctx: &mut ActorContext<Self>,
214 ) {
215 let (subject_id, schema_id) = match &event {
216 SinkDataEvent::Event(data_to_sink) => {
217 data_to_sink.event.get_subject_schema()
218 }
219 SinkDataEvent::State(metadata) => (
220 metadata.subject_id.to_string(),
221 metadata.schema_id.to_string(),
222 ),
223 };
224 if let Err(e) = ctx.publish_event(event.clone()).await {
225 error!(
226 error = %e,
227 subject_id = %subject_id,
228 schema_id = %schema_id,
229 public_key = %self.public_key,
230 "Failed to publish sink data event"
231 );
232 emit_fail(ctx, e).await;
233 } else {
234 debug!(
235 subject_id = %subject_id,
236 schema_id = %schema_id,
237 "Sink data event published successfully"
238 );
239 }
240 }
241}