Skip to main content

ave_core/subject/
sinkdata.rs

1use std::fmt::Display;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
6    NotPersistentActor, Response,
7};
8use ave_common::{
9    DataToSink, DataToSinkEvent, identity::TimeStamp, response::SubjectDB,
10};
11use serde::{Deserialize, Serialize};
12use tracing::{Span, debug, error, info_span};
13
14use crate::model::common::emit_fail;
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct SinkData {
18    pub public_key: String,
19}
20
21#[derive(
22    Debug, Clone, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd,
23)]
24pub enum SinkTypes {
25    Create,
26    Fact,
27    Transfer,
28    Confirm,
29    Reject,
30    EOL,
31    All,
32}
33
34impl Display for SinkTypes {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::Create => write!(f, "Create"),
38            Self::Fact => write!(f, "Fact"),
39            Self::Transfer => write!(f, "Transfer"),
40            Self::Confirm => write!(f, "Confirm"),
41            Self::Reject => write!(f, "Reject"),
42            Self::EOL => write!(f, "EOL"),
43            Self::All => write!(f, "All"),
44        }
45    }
46}
47
48impl From<&DataToSink> for SinkTypes {
49    fn from(value: &DataToSink) -> Self {
50        match value.payload {
51            DataToSinkEvent::Create { .. } => Self::Create,
52            DataToSinkEvent::FactFull { .. }
53            | DataToSinkEvent::FactOpaque { .. } => Self::Fact,
54            DataToSinkEvent::Transfer { .. } => Self::Transfer,
55            DataToSinkEvent::Confirm { .. } => Self::Confirm,
56            DataToSinkEvent::Reject { .. } => Self::Reject,
57            DataToSinkEvent::Eol { .. } => Self::EOL,
58        }
59    }
60}
61
62impl From<String> for SinkTypes {
63    fn from(value: String) -> Self {
64        match value.trim() {
65            "Create" => Self::Create,
66            "Fact" => Self::Fact,
67            "Transfer" => Self::Transfer,
68            "Confirm" => Self::Confirm,
69            "Reject" => Self::Reject,
70            "EOL" => Self::EOL,
71            _ => Self::All,
72        }
73    }
74}
75
76impl SinkDataMessage {
77    pub fn get_subject_schema(&self) -> (String, String) {
78        match self {
79            Self::UpdateState(metadata) => {
80                (metadata.subject_id.to_string(), metadata.schema_id.clone())
81            }
82            Self::Event { event, .. } => event.get_subject_schema(),
83        }
84    }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SinkDataMessage {
89    UpdateState(Box<SubjectDB>),
90    Event {
91        event: Box<DataToSinkEvent>,
92        event_request_timestamp: u64,
93        event_ledger_timestamp: u64,
94    },
95}
96
97impl Message for SinkDataMessage {}
98
99impl NotPersistentActor for SinkData {}
100
101#[derive(Debug, Clone)]
102pub enum SinkDataResponse {
103    None,
104}
105
106impl Response for SinkDataResponse {}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub enum SinkDataEvent {
110    Event(Box<DataToSink>),
111    State(Box<SubjectDB>),
112}
113
114impl Event for SinkDataEvent {}
115
116#[async_trait]
117impl Actor for SinkData {
118    type Event = SinkDataEvent;
119    type Message = SinkDataMessage;
120    type Response = SinkDataResponse;
121
122    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
123        parent_span.map_or_else(
124            || info_span!("SinkData"),
125            |parent_span| info_span!(parent: parent_span, "SinkData"),
126        )
127    }
128}
129
130#[async_trait]
131impl Handler<Self> for SinkData {
132    async fn handle_message(
133        &mut self,
134        _sender: ActorPath,
135        msg: SinkDataMessage,
136        ctx: &mut ave_actors::ActorContext<Self>,
137    ) -> Result<SinkDataResponse, ActorError> {
138        let (subject_id, schema_id) = msg.get_subject_schema();
139        let msg_type = match &msg {
140            SinkDataMessage::UpdateState(..) => "UpdateState",
141            SinkDataMessage::Event { event, .. } => match &**event {
142                DataToSinkEvent::Create { .. } => "Create",
143                DataToSinkEvent::FactFull { .. } => "FactFull",
144                DataToSinkEvent::FactOpaque { .. } => "FactOpaque",
145                DataToSinkEvent::Transfer { .. } => "Transfer",
146                DataToSinkEvent::Confirm { .. } => "Confirm",
147                DataToSinkEvent::Reject { .. } => "Reject",
148                DataToSinkEvent::Eol { .. } => "EOL",
149            },
150        };
151
152        let event = match msg {
153            SinkDataMessage::UpdateState(metadata) => {
154                SinkDataEvent::State(metadata)
155            }
156            SinkDataMessage::Event {
157                event,
158                event_request_timestamp,
159                event_ledger_timestamp,
160            } => SinkDataEvent::Event(Box::new(DataToSink {
161                payload: *event,
162                public_key: self.public_key.clone(),
163                event_request_timestamp,
164                event_ledger_timestamp,
165                sink_timestamp: TimeStamp::now().as_nanos(),
166            })),
167        };
168
169        self.on_event(event, ctx).await;
170
171        debug!(
172            msg_type = msg_type,
173            subject_id = %subject_id,
174            schema_id = %schema_id,
175            public_key = %self.public_key,
176            "Sink data event processed"
177        );
178
179        Ok(SinkDataResponse::None)
180    }
181
182    async fn on_event(
183        &mut self,
184        event: SinkDataEvent,
185        ctx: &mut ActorContext<Self>,
186    ) {
187        let (subject_id, schema_id) = match &event {
188            SinkDataEvent::Event(data_to_sink) => {
189                data_to_sink.payload.get_subject_schema()
190            }
191            SinkDataEvent::State(metadata) => {
192                (metadata.subject_id.to_string(), metadata.schema_id.clone())
193            }
194        };
195        if let Err(e) = ctx.publish_event(event.clone()).await {
196            error!(
197                error = %e,
198                subject_id = %subject_id,
199                schema_id = %schema_id,
200                public_key = %self.public_key,
201                "Failed to publish sink data event"
202            );
203            emit_fail(ctx, e).await;
204        } else {
205            debug!(
206                subject_id = %subject_id,
207                schema_id = %schema_id,
208                "Sink data event published successfully"
209            );
210        }
211    }
212}