1use std::sync::Arc;
42
43use chrono::{DateTime, Utc};
44use ironflow_core::provider::LogSink;
45use tokio::sync::mpsc;
46use uuid::Uuid;
47
48use crate::notify::LogStream;
49
50#[derive(Debug, Clone)]
52pub struct LogLine {
53 pub run_id: Uuid,
55 pub step_id: Uuid,
57 pub step_name: Arc<str>,
59 pub stream: LogStream,
61 pub line: String,
63 pub at: DateTime<Utc>,
65}
66
67pub type LogSender = mpsc::UnboundedSender<LogLine>;
69
70pub type LogReceiver = mpsc::UnboundedReceiver<LogLine>;
72
73pub fn channel() -> (LogSender, LogReceiver) {
87 mpsc::unbounded_channel()
88}
89
90#[derive(Clone)]
113pub struct StepLogSender {
114 inner: LogSender,
115 run_id: Uuid,
116 step_id: Uuid,
117 step_name: Arc<str>,
118}
119
120impl StepLogSender {
121 pub fn new(inner: LogSender, run_id: Uuid, step_id: Uuid, step_name: String) -> Self {
123 Self {
124 inner,
125 run_id,
126 step_id,
127 step_name: Arc::from(step_name),
128 }
129 }
130
131 pub fn emit(&self, stream: LogStream, line: &str) {
135 let _ = self.inner.send(LogLine {
136 run_id: self.run_id,
137 step_id: self.step_id,
138 step_name: self.step_name.clone(),
139 stream,
140 line: line.to_string(),
141 at: Utc::now(),
142 });
143 }
144}
145
146impl LogSink for StepLogSender {
147 fn log(&self, stream: &str, line: &str) {
148 let log_stream = match stream {
149 "stderr" => LogStream::Stderr,
150 "system" => LogStream::System,
151 _ => LogStream::Stdout,
152 };
153 self.emit(log_stream, line);
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn channel_send_and_receive() {
163 let (sender, mut receiver) = channel();
164 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "build".to_string());
165
166 step.emit(LogStream::Stdout, "line 1");
167 step.emit(LogStream::Stderr, "error!");
168
169 let l1 = receiver.try_recv().unwrap();
170 assert_eq!(l1.line, "line 1");
171 assert_eq!(l1.stream, LogStream::Stdout);
172 assert_eq!(&*l1.step_name, "build");
173
174 let l2 = receiver.try_recv().unwrap();
175 assert_eq!(l2.line, "error!");
176 assert_eq!(l2.stream, LogStream::Stderr);
177 }
178
179 #[test]
180 fn emit_after_receiver_dropped_does_not_panic() {
181 let (sender, receiver) = channel();
182 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
183 drop(receiver);
184 step.emit(LogStream::System, "should not panic");
185 }
186
187 #[test]
188 fn clone_step_sender_shares_channel() {
189 let (sender, mut receiver) = channel();
190 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
191
192 let cloned = step.clone();
193 step.emit(LogStream::Stdout, "from original");
194 cloned.emit(LogStream::Stdout, "from clone");
195
196 assert_eq!(receiver.try_recv().unwrap().line, "from original");
197 assert_eq!(receiver.try_recv().unwrap().line, "from clone");
198 }
199
200 #[test]
201 fn log_sink_maps_stdout() {
202 let (sender, mut receiver) = channel();
203 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
204
205 LogSink::log(&step, "stdout", "hello");
206
207 let line = receiver.try_recv().unwrap();
208 assert_eq!(line.stream, LogStream::Stdout);
209 assert_eq!(line.line, "hello");
210 }
211
212 #[test]
213 fn log_sink_maps_stderr() {
214 let (sender, mut receiver) = channel();
215 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
216
217 LogSink::log(&step, "stderr", "oops");
218
219 let line = receiver.try_recv().unwrap();
220 assert_eq!(line.stream, LogStream::Stderr);
221 assert_eq!(line.line, "oops");
222 }
223
224 #[test]
225 fn log_sink_maps_system() {
226 let (sender, mut receiver) = channel();
227 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
228
229 LogSink::log(&step, "system", "info");
230
231 let line = receiver.try_recv().unwrap();
232 assert_eq!(line.stream, LogStream::System);
233 assert_eq!(line.line, "info");
234 }
235
236 #[test]
237 fn log_sink_unknown_stream_defaults_to_stdout() {
238 let (sender, mut receiver) = channel();
239 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
240
241 LogSink::log(&step, "unknown", "data");
242
243 let line = receiver.try_recv().unwrap();
244 assert_eq!(line.stream, LogStream::Stdout);
245 }
246
247 #[test]
248 fn log_sink_after_receiver_dropped_does_not_panic() {
249 let (sender, receiver) = channel();
250 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
251 drop(receiver);
252 LogSink::log(&step, "stdout", "should not panic");
253 }
254}