1use crate::{NodeId, context::signer::SignatureRequest};
2use bincode::{Decode, Encode};
3use chrono::{DateTime, Utc};
4use futures::channel::mpsc;
5use serde::Serialize;
6use value::Value;
7
8#[derive(derive_more::From, actix::Message, Clone, Debug, Serialize)]
9#[rtype(result = "()")]
10#[serde(tag = "event", content = "data")]
11pub enum Event {
12 FlowStart(FlowStart),
13 FlowError(FlowError),
14 FlowLog(FlowLog),
15 FlowFinish(FlowFinish),
16 NodeStart(NodeStart),
17 NodeOutput(NodeOutput),
18 NodeError(NodeError),
19 NodeLog(NodeLog),
20 NodeFinish(NodeFinish),
21 SignatureRequest(SignatureRequest),
22 ApiInput(ApiInput),
23}
24
25impl Event {
26 pub fn time(&self) -> DateTime<Utc> {
27 match self {
28 Event::FlowStart(e) => e.time,
29 Event::FlowError(e) => e.time,
30 Event::FlowLog(e) => e.time,
31 Event::FlowFinish(e) => e.time,
32 Event::NodeStart(e) => e.time,
33 Event::NodeOutput(e) => e.time,
34 Event::NodeError(e) => e.time,
35 Event::NodeLog(e) => e.time,
36 Event::NodeFinish(e) => e.time,
37 Event::SignatureRequest(e) => e.time,
38 Event::ApiInput(e) => e.time,
39 }
40 }
41}
42
43#[derive(Clone, Copy, Debug, Serialize, Default, Encode, Decode)]
44pub enum LogLevel {
45 Trace,
46 Debug,
47 #[default]
48 Info,
49 Warn,
50 Error,
51}
52
53impl std::fmt::Display for LogLevel {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 self.serialize(f)
56 }
57}
58
59impl From<tracing::Level> for LogLevel {
60 fn from(value: tracing::Level) -> Self {
61 match value {
62 tracing::Level::TRACE => LogLevel::Trace,
63 tracing::Level::DEBUG => LogLevel::Debug,
64 tracing::Level::INFO => LogLevel::Info,
65 tracing::Level::WARN => LogLevel::Warn,
66 tracing::Level::ERROR => LogLevel::Error,
67 }
68 }
69}
70
71#[derive(actix::Message, Default, Clone, Debug, Serialize)]
72#[rtype(result = "()")]
73pub struct ApiInput {
74 pub time: DateTime<Utc>,
75 pub url: String,
76}
77
78#[derive(actix::Message, Default, Clone, Debug, Serialize)]
79#[rtype(result = "()")]
80pub struct FlowStart {
81 pub time: DateTime<Utc>,
82}
83
84#[derive(actix::Message, Default, Clone, Debug, Serialize)]
85#[rtype(result = "()")]
86pub struct FlowError {
87 pub time: DateTime<Utc>,
88 pub error: String,
89}
90
91#[derive(actix::Message, Default, Clone, Debug, Serialize)]
92#[rtype(result = "()")]
93pub struct FlowLog {
94 pub time: DateTime<Utc>,
95 pub level: LogLevel,
96 pub module: Option<String>,
97 pub content: String,
98}
99
100#[derive(actix::Message, Default, Clone, Debug, Serialize)]
101#[rtype(result = "()")]
102pub struct FlowFinish {
103 pub time: DateTime<Utc>,
104 pub not_run: Vec<NodeId>,
105 pub output: Value,
106}
107
108#[derive(actix::Message, Default, Clone, Debug, Serialize)]
109#[rtype(result = "()")]
110pub struct NodeStart {
111 pub time: DateTime<Utc>,
112 pub node_id: NodeId,
113 pub times: u32,
114 pub input: Value,
115}
116
117#[derive(actix::Message, Default, Clone, Debug, Serialize)]
118#[rtype(result = "()")]
119pub struct NodeOutput {
120 pub time: DateTime<Utc>,
121 pub node_id: NodeId,
122 pub times: u32,
123 pub output: Value,
124}
125
126#[derive(actix::Message, Default, Clone, Debug, Serialize)]
127#[rtype(result = "()")]
128pub struct NodeError {
129 pub time: DateTime<Utc>,
130 pub node_id: NodeId,
131 pub times: u32,
132 pub error: String,
133}
134
135#[derive(actix::Message, Default, Clone, Debug, Serialize)]
136#[rtype(result = "()")]
137pub struct NodeLog {
138 pub time: DateTime<Utc>,
139 pub node_id: NodeId,
140 pub times: u32,
141 pub level: LogLevel,
142 pub module: Option<String>,
143 pub content: String,
144}
145
146#[derive(Encode, Decode, Debug)]
147pub struct NodeLogContent {
148 #[bincode(with_serde)]
149 pub time: DateTime<Utc>,
150 pub level: LogLevel,
151 pub module: Option<String>,
152 pub content: String,
153}
154
155#[derive(Debug, Clone)]
156pub struct NodeLogSender {
157 node_id: NodeId,
158 times: u32,
159 tx: EventSender,
160}
161
162impl NodeLogSender {
163 pub fn new(tx: EventSender, node_id: NodeId, times: u32) -> Self {
164 Self { node_id, times, tx }
165 }
166
167 pub fn send(
168 &self,
169 NodeLogContent {
170 time,
171 level,
172 module,
173 content,
174 }: NodeLogContent,
175 ) -> Result<(), mpsc::TrySendError<Event>> {
176 self.tx.unbounded_send(Event::NodeLog(NodeLog {
177 time,
178 node_id: self.node_id,
179 times: self.times,
180 level,
181 module,
182 content,
183 }))
184 }
185}
186
187#[derive(actix::Message, Default, Clone, Debug, Serialize)]
188#[rtype(result = "()")]
189pub struct NodeFinish {
190 pub time: DateTime<Utc>,
191 pub node_id: NodeId,
192 pub times: u32,
193}
194
195pub fn channel() -> (EventSender, EventReceiver) {
196 futures::channel::mpsc::unbounded()
197}
198pub type EventSender = futures::channel::mpsc::UnboundedSender<Event>;
199pub type EventReceiver = futures::channel::mpsc::UnboundedReceiver<Event>;
200
201pub fn event_channel() -> (EventSender, EventReceiver) {
202 futures::channel::mpsc::unbounded()
203}
204
205pub const DEFAULT_LOG_FILTER: &str = "info,solana_client=debug";
206pub const FLOW_SPAN_NAME: &str = "flow_logs";
207pub const NODE_SPAN_NAME: &str = "node_logs";