1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 NotPersistentActor, Response,
7};
8use ave_common::{
9 DataToSink, DataToSinkEvent, identity::TimeStamp, response::SubjectDB,
10};
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13
14use crate::model::common::emit_fail;
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct SinkData {
18 pub public_key: String,
19}
20
21#[derive(
22 Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
23)]
24pub enum SinkTypes {
25 Create,
26 Fact,
27 Transfer,
28 Confirm,
29 Reject,
30 EOL,
31 All,
32}
33
34impl Display for SinkTypes {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::Create => write!(f, "Create"),
38 Self::Fact => write!(f, "Fact"),
39 Self::Transfer => write!(f, "Transfer"),
40 Self::Confirm => write!(f, "Confirm"),
41 Self::Reject => write!(f, "Reject"),
42 Self::EOL => write!(f, "EOL"),
43 Self::All => write!(f, "All"),
44 }
45 }
46}
47
48impl From<&DataToSink> for SinkTypes {
49 fn from(value: &DataToSink) -> Self {
50 match value.payload {
51 DataToSinkEvent::Create { .. } => Self::Create,
52 DataToSinkEvent::FactFull { .. }
53 | DataToSinkEvent::FactOpaque { .. } => Self::Fact,
54 DataToSinkEvent::Transfer { .. } => Self::Transfer,
55 DataToSinkEvent::Confirm { .. } => Self::Confirm,
56 DataToSinkEvent::Reject { .. } => Self::Reject,
57 DataToSinkEvent::Eol { .. } => Self::EOL,
58 }
59 }
60}
61
62impl From<String> for SinkTypes {
63 fn from(value: String) -> Self {
64 match value.trim() {
65 "Create" => Self::Create,
66 "Fact" => Self::Fact,
67 "Transfer" => Self::Transfer,
68 "Confirm" => Self::Confirm,
69 "Reject" => Self::Reject,
70 "EOL" => Self::EOL,
71 _ => Self::All,
72 }
73 }
74}
75
76impl SinkDataMessage {
77 pub fn get_subject_schema(&self) -> (String, String) {
78 match self {
79 Self::UpdateState(metadata) => {
80 (metadata.subject_id.to_string(), metadata.schema_id.clone())
81 }
82 Self::Event { event, .. } => event.get_subject_schema(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SinkDataMessage {
89 UpdateState(Box<SubjectDB>),
90 Event {
91 event: Box<DataToSinkEvent>,
92 event_request_timestamp: u64,
93 event_ledger_timestamp: u64,
94 },
95}
96
97impl Message for SinkDataMessage {
98 fn is_critical(&self) -> bool {
99 true
100 }
101}
102
103impl NotPersistentActor for SinkData {}
104
105#[derive(Debug, Clone)]
106pub enum SinkDataResponse {
107 None,
108}
109
110impl Response for SinkDataResponse {}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum SinkDataEvent {
114 Event(Box<DataToSink>),
115 State(Box<SubjectDB>),
116}
117
118impl Event for SinkDataEvent {}
119
120#[async_trait]
121impl Actor for SinkData {
122 type Event = SinkDataEvent;
123 type Message = SinkDataMessage;
124 type Response = SinkDataResponse;
125
126 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
127 parent_span.map_or_else(
128 || info_span!("SinkData"),
129 |parent_span| info_span!(parent: parent_span, "SinkData"),
130 )
131 }
132}
133
134#[async_trait]
135impl Handler<Self> for SinkData {
136 async fn handle_message(
137 &mut self,
138 _sender: ActorPath,
139 msg: SinkDataMessage,
140 ctx: &mut ave_actors::ActorContext<Self>,
141 ) -> Result<SinkDataResponse, ActorError> {
142 let (subject_id, schema_id) = msg.get_subject_schema();
143 let msg_type = match &msg {
144 SinkDataMessage::UpdateState(..) => "UpdateState",
145 SinkDataMessage::Event { event, .. } => match &**event {
146 DataToSinkEvent::Create { .. } => "Create",
147 DataToSinkEvent::FactFull { .. } => "FactFull",
148 DataToSinkEvent::FactOpaque { .. } => "FactOpaque",
149 DataToSinkEvent::Transfer { .. } => "Transfer",
150 DataToSinkEvent::Confirm { .. } => "Confirm",
151 DataToSinkEvent::Reject { .. } => "Reject",
152 DataToSinkEvent::Eol { .. } => "EOL",
153 },
154 };
155
156 let event = match msg {
157 SinkDataMessage::UpdateState(metadata) => {
158 SinkDataEvent::State(metadata)
159 }
160 SinkDataMessage::Event {
161 event,
162 event_request_timestamp,
163 event_ledger_timestamp,
164 } => SinkDataEvent::Event(Box::new(DataToSink {
165 payload: *event,
166 public_key: self.public_key.clone(),
167 event_request_timestamp,
168 event_ledger_timestamp,
169 sink_timestamp: TimeStamp::now().as_nanos(),
170 })),
171 };
172
173 self.on_event(event, ctx).await;
174
175 debug!(
176 msg_type = msg_type,
177 subject_id = %subject_id,
178 schema_id = %schema_id,
179 public_key = %self.public_key,
180 "Sink data event processed"
181 );
182
183 Ok(SinkDataResponse::None)
184 }
185
186 async fn on_event(
187 &mut self,
188 event: SinkDataEvent,
189 ctx: &mut ActorContext<Self>,
190 ) {
191 let (subject_id, schema_id) = match &event {
192 SinkDataEvent::Event(data_to_sink) => {
193 data_to_sink.payload.get_subject_schema()
194 }
195 SinkDataEvent::State(metadata) => {
196 (metadata.subject_id.to_string(), metadata.schema_id.clone())
197 }
198 };
199 if let Err(e) = ctx.publish_event(event.clone()).await {
200 error!(
201 error = %e,
202 subject_id = %subject_id,
203 schema_id = %schema_id,
204 public_key = %self.public_key,
205 "Failed to publish sink data event"
206 );
207 emit_fail(ctx, e).await;
208 } else {
209 debug!(
210 subject_id = %subject_id,
211 schema_id = %schema_id,
212 "Sink data event published successfully"
213 );
214 }
215 }
216}