Skip to main content

ironflow_engine/
log_sender.rs

1//! Live log streaming infrastructure for real-time step output.
2//!
3//! During workflow execution, shell commands and agent invocations produce
4//! output incrementally. This module provides a channel-based mechanism to
5//! stream those lines to an external consumer (e.g. the worker's log pusher)
6//! so that SSE clients can display output in real time.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ShellExecutor ──► StepLogSender ──► LogSender (mpsc) ──► LogPusher ──► API
12//! AgentExecutor ─┘                                          (worker)
13//! ```
14//!
15//! The [`LogSender`] is an unbounded MPSC channel sender. The
16//! [`StepLogSender`] wraps it with step metadata (run ID, step ID, step name)
17//! so that executors can emit lines with a simple `emit(stream, line)` call.
18//!
19//! # Examples
20//!
21//! ```
22//! use ironflow_engine::log_sender::{self, StepLogSender};
23//! use ironflow_engine::notify::LogStream;
24//! use uuid::Uuid;
25//!
26//! let (sender, mut receiver) = log_sender::channel();
27//! let step_sender = StepLogSender::new(
28//!     sender,
29//!     Uuid::now_v7(),
30//!     Uuid::now_v7(),
31//!     "build".to_string(),
32//! );
33//!
34//! step_sender.emit(LogStream::Stdout, "Compiling ironflow v0.1.0");
35//!
36//! let line = receiver.try_recv().unwrap();
37//! assert_eq!(line.line, "Compiling ironflow v0.1.0");
38//! assert_eq!(&*line.step_name, "build");
39//! ```
40
41use std::sync::Arc;
42
43use chrono::{DateTime, Utc};
44use ironflow_core::provider::LogSink;
45use tokio::sync::mpsc;
46use uuid::Uuid;
47
48use crate::notify::LogStream;
49
50/// A single log line emitted during step execution.
51#[derive(Debug, Clone)]
52pub struct LogLine {
53    /// Run that owns this step.
54    pub run_id: Uuid,
55    /// Step that produced the line.
56    pub step_id: Uuid,
57    /// Human-readable step name (shared across lines from the same step).
58    pub step_name: Arc<str>,
59    /// Output stream (stdout, stderr, or system).
60    pub stream: LogStream,
61    /// The line content.
62    pub line: String,
63    /// When the line was emitted.
64    pub at: DateTime<Utc>,
65}
66
67/// Sender half of the log streaming channel.
68pub type LogSender = mpsc::UnboundedSender<LogLine>;
69
70/// Receiver half of the log streaming channel.
71pub type LogReceiver = mpsc::UnboundedReceiver<LogLine>;
72
73/// Create a new log streaming channel.
74///
75/// Returns a `(sender, receiver)` pair. Clone the sender for each
76/// executor; the receiver is consumed by a single log pusher.
77///
78/// # Examples
79///
80/// ```
81/// use ironflow_engine::log_sender;
82///
83/// let (sender, receiver) = log_sender::channel();
84/// drop(receiver);
85/// ```
86pub fn channel() -> (LogSender, LogReceiver) {
87    mpsc::unbounded_channel()
88}
89
90/// Step-scoped log sender with pre-filled metadata.
91///
92/// Wraps a [`LogSender`] with the run ID, step ID, and step name so that
93/// executors can emit lines without repeating metadata on every call.
94///
95/// # Examples
96///
97/// ```
98/// use ironflow_engine::log_sender::{self, StepLogSender};
99/// use ironflow_engine::notify::LogStream;
100/// use uuid::Uuid;
101///
102/// let (sender, _rx) = log_sender::channel();
103/// let step_sender = StepLogSender::new(
104///     sender,
105///     Uuid::now_v7(),
106///     Uuid::now_v7(),
107///     "deploy".to_string(),
108/// );
109///
110/// step_sender.emit(LogStream::Stderr, "warning: deprecated API");
111/// ```
112#[derive(Clone)]
113pub struct StepLogSender {
114    inner: LogSender,
115    run_id: Uuid,
116    step_id: Uuid,
117    step_name: Arc<str>,
118}
119
120impl StepLogSender {
121    /// Create a new step-scoped sender.
122    pub fn new(inner: LogSender, run_id: Uuid, step_id: Uuid, step_name: String) -> Self {
123        Self {
124            inner,
125            run_id,
126            step_id,
127            step_name: Arc::from(step_name),
128        }
129    }
130
131    /// Emit a log line on the given stream.
132    ///
133    /// Silently drops the line if the receiver has been closed.
134    pub fn emit(&self, stream: LogStream, line: &str) {
135        let _ = self.inner.send(LogLine {
136            run_id: self.run_id,
137            step_id: self.step_id,
138            step_name: self.step_name.clone(),
139            stream,
140            line: line.to_string(),
141            at: Utc::now(),
142        });
143    }
144}
145
146impl LogSink for StepLogSender {
147    fn log(&self, stream: &str, line: &str) {
148        let log_stream = match stream {
149            "stderr" => LogStream::Stderr,
150            "system" => LogStream::System,
151            _ => LogStream::Stdout,
152        };
153        self.emit(log_stream, line);
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn channel_send_and_receive() {
163        let (sender, mut receiver) = channel();
164        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "build".to_string());
165
166        step.emit(LogStream::Stdout, "line 1");
167        step.emit(LogStream::Stderr, "error!");
168
169        let l1 = receiver.try_recv().unwrap();
170        assert_eq!(l1.line, "line 1");
171        assert_eq!(l1.stream, LogStream::Stdout);
172        assert_eq!(&*l1.step_name, "build");
173
174        let l2 = receiver.try_recv().unwrap();
175        assert_eq!(l2.line, "error!");
176        assert_eq!(l2.stream, LogStream::Stderr);
177    }
178
179    #[test]
180    fn emit_after_receiver_dropped_does_not_panic() {
181        let (sender, receiver) = channel();
182        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
183        drop(receiver);
184        step.emit(LogStream::System, "should not panic");
185    }
186
187    #[test]
188    fn clone_step_sender_shares_channel() {
189        let (sender, mut receiver) = channel();
190        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
191
192        let cloned = step.clone();
193        step.emit(LogStream::Stdout, "from original");
194        cloned.emit(LogStream::Stdout, "from clone");
195
196        assert_eq!(receiver.try_recv().unwrap().line, "from original");
197        assert_eq!(receiver.try_recv().unwrap().line, "from clone");
198    }
199
200    #[test]
201    fn log_sink_maps_stdout() {
202        let (sender, mut receiver) = channel();
203        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
204
205        LogSink::log(&step, "stdout", "hello");
206
207        let line = receiver.try_recv().unwrap();
208        assert_eq!(line.stream, LogStream::Stdout);
209        assert_eq!(line.line, "hello");
210    }
211
212    #[test]
213    fn log_sink_maps_stderr() {
214        let (sender, mut receiver) = channel();
215        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
216
217        LogSink::log(&step, "stderr", "oops");
218
219        let line = receiver.try_recv().unwrap();
220        assert_eq!(line.stream, LogStream::Stderr);
221        assert_eq!(line.line, "oops");
222    }
223
224    #[test]
225    fn log_sink_maps_system() {
226        let (sender, mut receiver) = channel();
227        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
228
229        LogSink::log(&step, "system", "info");
230
231        let line = receiver.try_recv().unwrap();
232        assert_eq!(line.stream, LogStream::System);
233        assert_eq!(line.line, "info");
234    }
235
236    #[test]
237    fn log_sink_unknown_stream_defaults_to_stdout() {
238        let (sender, mut receiver) = channel();
239        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
240
241        LogSink::log(&step, "unknown", "data");
242
243        let line = receiver.try_recv().unwrap();
244        assert_eq!(line.stream, LogStream::Stdout);
245    }
246
247    #[test]
248    fn log_sink_after_receiver_dropped_does_not_panic() {
249        let (sender, receiver) = channel();
250        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
251        drop(receiver);
252        LogSink::log(&step, "stdout", "should not panic");
253    }
254}