flow_lib/
flow_run_events.rs

1use crate::{NodeId, context::signer::SignatureRequest};
2use bincode::{Decode, Encode};
3use chrono::{DateTime, Utc};
4use futures::channel::mpsc;
5use serde::Serialize;
6use value::Value;
7
8#[derive(derive_more::From, actix::Message, Clone, Debug, Serialize)]
9#[rtype(result = "()")]
10#[serde(tag = "event", content = "data")]
11pub enum Event {
12    FlowStart(FlowStart),
13    FlowError(FlowError),
14    FlowLog(FlowLog),
15    FlowFinish(FlowFinish),
16    NodeStart(NodeStart),
17    NodeOutput(NodeOutput),
18    NodeError(NodeError),
19    NodeLog(NodeLog),
20    NodeFinish(NodeFinish),
21    SignatureRequest(SignatureRequest),
22    ApiInput(ApiInput),
23}
24
25impl Event {
26    pub fn time(&self) -> DateTime<Utc> {
27        match self {
28            Event::FlowStart(e) => e.time,
29            Event::FlowError(e) => e.time,
30            Event::FlowLog(e) => e.time,
31            Event::FlowFinish(e) => e.time,
32            Event::NodeStart(e) => e.time,
33            Event::NodeOutput(e) => e.time,
34            Event::NodeError(e) => e.time,
35            Event::NodeLog(e) => e.time,
36            Event::NodeFinish(e) => e.time,
37            Event::SignatureRequest(e) => e.time,
38            Event::ApiInput(e) => e.time,
39        }
40    }
41}
42
43#[derive(Clone, Copy, Debug, Serialize, Default, Encode, Decode)]
44pub enum LogLevel {
45    Trace,
46    Debug,
47    #[default]
48    Info,
49    Warn,
50    Error,
51}
52
53impl std::fmt::Display for LogLevel {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        self.serialize(f)
56    }
57}
58
59impl From<tracing::Level> for LogLevel {
60    fn from(value: tracing::Level) -> Self {
61        match value {
62            tracing::Level::TRACE => LogLevel::Trace,
63            tracing::Level::DEBUG => LogLevel::Debug,
64            tracing::Level::INFO => LogLevel::Info,
65            tracing::Level::WARN => LogLevel::Warn,
66            tracing::Level::ERROR => LogLevel::Error,
67        }
68    }
69}
70
71#[derive(actix::Message, Default, Clone, Debug, Serialize)]
72#[rtype(result = "()")]
73pub struct ApiInput {
74    pub time: DateTime<Utc>,
75    pub url: String,
76}
77
78#[derive(actix::Message, Default, Clone, Debug, Serialize)]
79#[rtype(result = "()")]
80pub struct FlowStart {
81    pub time: DateTime<Utc>,
82}
83
84#[derive(actix::Message, Default, Clone, Debug, Serialize)]
85#[rtype(result = "()")]
86pub struct FlowError {
87    pub time: DateTime<Utc>,
88    pub error: String,
89}
90
91#[derive(actix::Message, Default, Clone, Debug, Serialize)]
92#[rtype(result = "()")]
93pub struct FlowLog {
94    pub time: DateTime<Utc>,
95    pub level: LogLevel,
96    pub module: Option<String>,
97    pub content: String,
98}
99
100#[derive(actix::Message, Default, Clone, Debug, Serialize)]
101#[rtype(result = "()")]
102pub struct FlowFinish {
103    pub time: DateTime<Utc>,
104    pub not_run: Vec<NodeId>,
105    pub output: Value,
106}
107
108#[derive(actix::Message, Default, Clone, Debug, Serialize)]
109#[rtype(result = "()")]
110pub struct NodeStart {
111    pub time: DateTime<Utc>,
112    pub node_id: NodeId,
113    pub times: u32,
114    pub input: Value,
115}
116
117#[derive(actix::Message, Default, Clone, Debug, Serialize)]
118#[rtype(result = "()")]
119pub struct NodeOutput {
120    pub time: DateTime<Utc>,
121    pub node_id: NodeId,
122    pub times: u32,
123    pub output: Value,
124}
125
126#[derive(actix::Message, Default, Clone, Debug, Serialize)]
127#[rtype(result = "()")]
128pub struct NodeError {
129    pub time: DateTime<Utc>,
130    pub node_id: NodeId,
131    pub times: u32,
132    pub error: String,
133}
134
135#[derive(actix::Message, Default, Clone, Debug, Serialize)]
136#[rtype(result = "()")]
137pub struct NodeLog {
138    pub time: DateTime<Utc>,
139    pub node_id: NodeId,
140    pub times: u32,
141    pub level: LogLevel,
142    pub module: Option<String>,
143    pub content: String,
144}
145
146#[derive(Encode, Decode, Debug)]
147pub struct NodeLogContent {
148    #[bincode(with_serde)]
149    pub time: DateTime<Utc>,
150    pub level: LogLevel,
151    pub module: Option<String>,
152    pub content: String,
153}
154
155#[derive(Debug, Clone)]
156pub struct NodeLogSender {
157    node_id: NodeId,
158    times: u32,
159    tx: EventSender,
160}
161
162impl NodeLogSender {
163    pub fn new(tx: EventSender, node_id: NodeId, times: u32) -> Self {
164        Self { node_id, times, tx }
165    }
166
167    pub fn send(
168        &self,
169        NodeLogContent {
170            time,
171            level,
172            module,
173            content,
174        }: NodeLogContent,
175    ) -> Result<(), mpsc::TrySendError<Event>> {
176        self.tx.unbounded_send(Event::NodeLog(NodeLog {
177            time,
178            node_id: self.node_id,
179            times: self.times,
180            level,
181            module,
182            content,
183        }))
184    }
185}
186
187#[derive(actix::Message, Default, Clone, Debug, Serialize)]
188#[rtype(result = "()")]
189pub struct NodeFinish {
190    pub time: DateTime<Utc>,
191    pub node_id: NodeId,
192    pub times: u32,
193}
194
195pub fn channel() -> (EventSender, EventReceiver) {
196    futures::channel::mpsc::unbounded()
197}
198pub type EventSender = futures::channel::mpsc::UnboundedSender<Event>;
199pub type EventReceiver = futures::channel::mpsc::UnboundedReceiver<Event>;
200
201pub fn event_channel() -> (EventSender, EventReceiver) {
202    futures::channel::mpsc::unbounded()
203}
204
205pub const DEFAULT_LOG_FILTER: &str = "info,solana_client=debug";
206pub const FLOW_SPAN_NAME: &str = "flow_logs";
207pub const NODE_SPAN_NAME: &str = "node_logs";