1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6 NotPersistentActor, Response,
7};
8use ave_common::{
9 DataToSink, DataToSinkEvent, identity::TimeStamp, response::SubjectDB,
10};
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13
14use crate::model::common::emit_fail;
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct SinkData {
18 pub public_key: String,
19}
20
21#[derive(
22 Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
23)]
24pub enum SinkTypes {
25 Create,
26 Fact,
27 Transfer,
28 Confirm,
29 Reject,
30 EOL,
31 All,
32}
33
34impl Display for SinkTypes {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::Create => write!(f, "Create"),
38 Self::Fact => write!(f, "Fact"),
39 Self::Transfer => write!(f, "Transfer"),
40 Self::Confirm => write!(f, "Confirm"),
41 Self::Reject => write!(f, "Reject"),
42 Self::EOL => write!(f, "EOL"),
43 Self::All => write!(f, "All"),
44 }
45 }
46}
47
48impl From<&DataToSink> for SinkTypes {
49 fn from(value: &DataToSink) -> Self {
50 match value.payload {
51 DataToSinkEvent::Create { .. } => Self::Create,
52 DataToSinkEvent::FactFull { .. }
53 | DataToSinkEvent::FactOpaque { .. } => Self::Fact,
54 DataToSinkEvent::Transfer { .. } => Self::Transfer,
55 DataToSinkEvent::Confirm { .. } => Self::Confirm,
56 DataToSinkEvent::Reject { .. } => Self::Reject,
57 DataToSinkEvent::Eol { .. } => Self::EOL,
58 }
59 }
60}
61
62impl From<String> for SinkTypes {
63 fn from(value: String) -> Self {
64 match value.trim() {
65 "Create" => Self::Create,
66 "Fact" => Self::Fact,
67 "Transfer" => Self::Transfer,
68 "Confirm" => Self::Confirm,
69 "Reject" => Self::Reject,
70 "EOL" => Self::EOL,
71 _ => Self::All,
72 }
73 }
74}
75
76impl SinkDataMessage {
77 pub fn get_subject_schema(&self) -> (String, String) {
78 match self {
79 Self::UpdateState(metadata) => {
80 (metadata.subject_id.to_string(), metadata.schema_id.clone())
81 }
82 Self::Event { event, .. } => event.get_subject_schema(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SinkDataMessage {
89 UpdateState(Box<SubjectDB>),
90 Event {
91 event: Box<DataToSinkEvent>,
92 event_request_timestamp: u64,
93 event_ledger_timestamp: u64,
94 },
95}
96
97impl Message for SinkDataMessage {}
98
99impl NotPersistentActor for SinkData {}
100
101#[derive(Debug, Clone)]
102pub enum SinkDataResponse {
103 None,
104}
105
106impl Response for SinkDataResponse {}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub enum SinkDataEvent {
110 Event(Box<DataToSink>),
111 State(Box<SubjectDB>),
112}
113
114impl Event for SinkDataEvent {}
115
116#[async_trait]
117impl Actor for SinkData {
118 type Event = SinkDataEvent;
119 type Message = SinkDataMessage;
120 type Response = SinkDataResponse;
121
122 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
123 parent_span.map_or_else(
124 || info_span!("SinkData"),
125 |parent_span| info_span!(parent: parent_span, "SinkData"),
126 )
127 }
128}
129
130#[async_trait]
131impl Handler<Self> for SinkData {
132 async fn handle_message(
133 &mut self,
134 _sender: ActorPath,
135 msg: SinkDataMessage,
136 ctx: &mut ave_actors::ActorContext<Self>,
137 ) -> Result<SinkDataResponse, ActorError> {
138 let (subject_id, schema_id) = msg.get_subject_schema();
139 let msg_type = match &msg {
140 SinkDataMessage::UpdateState(..) => "UpdateState",
141 SinkDataMessage::Event { event, .. } => match &**event {
142 DataToSinkEvent::Create { .. } => "Create",
143 DataToSinkEvent::FactFull { .. } => "FactFull",
144 DataToSinkEvent::FactOpaque { .. } => "FactOpaque",
145 DataToSinkEvent::Transfer { .. } => "Transfer",
146 DataToSinkEvent::Confirm { .. } => "Confirm",
147 DataToSinkEvent::Reject { .. } => "Reject",
148 DataToSinkEvent::Eol { .. } => "EOL",
149 },
150 };
151
152 let event = match msg {
153 SinkDataMessage::UpdateState(metadata) => {
154 SinkDataEvent::State(metadata)
155 }
156 SinkDataMessage::Event {
157 event,
158 event_request_timestamp,
159 event_ledger_timestamp,
160 } => SinkDataEvent::Event(Box::new(DataToSink {
161 payload: *event,
162 public_key: self.public_key.clone(),
163 event_request_timestamp,
164 event_ledger_timestamp,
165 sink_timestamp: TimeStamp::now().as_nanos(),
166 })),
167 };
168
169 self.on_event(event, ctx).await;
170
171 debug!(
172 msg_type = msg_type,
173 subject_id = %subject_id,
174 schema_id = %schema_id,
175 public_key = %self.public_key,
176 "Sink data event processed"
177 );
178
179 Ok(SinkDataResponse::None)
180 }
181
182 async fn on_event(
183 &mut self,
184 event: SinkDataEvent,
185 ctx: &mut ActorContext<Self>,
186 ) {
187 let (subject_id, schema_id) = match &event {
188 SinkDataEvent::Event(data_to_sink) => {
189 data_to_sink.payload.get_subject_schema()
190 }
191 SinkDataEvent::State(metadata) => {
192 (metadata.subject_id.to_string(), metadata.schema_id.clone())
193 }
194 };
195 if let Err(e) = ctx.publish_event(event.clone()).await {
196 error!(
197 error = %e,
198 subject_id = %subject_id,
199 schema_id = %schema_id,
200 public_key = %self.public_key,
201 "Failed to publish sink data event"
202 );
203 emit_fail(ctx, e).await;
204 } else {
205 debug!(
206 subject_id = %subject_id,
207 schema_id = %schema_id,
208 "Sink data event published successfully"
209 );
210 }
211 }
212}