Skip to main content

ironflow_engine/
log_sender.rs

1//! Live log streaming infrastructure for real-time step output.
2//!
3//! During workflow execution, shell commands and agent invocations produce
4//! output incrementally. This module provides a channel-based mechanism to
5//! stream those lines to an external consumer (e.g. the worker's log pusher)
6//! so that SSE clients can display output in real time.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ShellExecutor ──► StepLogSender ──► LogSender (mpsc) ──► LogPusher ──► API
12//! AgentExecutor ─┘                                          (worker)
13//! ```
14//!
15//! The [`LogSender`] is an unbounded MPSC channel sender. The
16//! [`StepLogSender`] wraps it with step metadata (run ID, step ID, step name)
17//! so that executors can emit lines with a simple `emit(stream, line)` call.
18//!
19//! # Examples
20//!
21//! ```
22//! use ironflow_engine::log_sender::{self, StepLogSender};
23//! use ironflow_engine::notify::LogStream;
24//! use uuid::Uuid;
25//!
26//! let (sender, mut receiver) = log_sender::channel();
27//! let step_sender = StepLogSender::new(
28//!     sender,
29//!     Uuid::now_v7(),
30//!     Uuid::now_v7(),
31//!     "build".to_string(),
32//! );
33//!
34//! step_sender.emit(LogStream::Stdout, "Compiling ironflow v0.1.0");
35//!
36//! let line = receiver.try_recv().unwrap();
37//! assert_eq!(line.line, "Compiling ironflow v0.1.0");
38//! assert_eq!(&*line.step_name, "build");
39//! ```
40
41use std::sync::Arc;
42
43use chrono::{DateTime, Utc};
44use tokio::sync::mpsc;
45use uuid::Uuid;
46
47use crate::notify::LogStream;
48
49/// A single log line emitted during step execution.
50#[derive(Debug, Clone)]
51pub struct LogLine {
52    /// Run that owns this step.
53    pub run_id: Uuid,
54    /// Step that produced the line.
55    pub step_id: Uuid,
56    /// Human-readable step name (shared across lines from the same step).
57    pub step_name: Arc<str>,
58    /// Output stream (stdout, stderr, or system).
59    pub stream: LogStream,
60    /// The line content.
61    pub line: String,
62    /// When the line was emitted.
63    pub at: DateTime<Utc>,
64}
65
66/// Sender half of the log streaming channel.
67pub type LogSender = mpsc::UnboundedSender<LogLine>;
68
69/// Receiver half of the log streaming channel.
70pub type LogReceiver = mpsc::UnboundedReceiver<LogLine>;
71
72/// Create a new log streaming channel.
73///
74/// Returns a `(sender, receiver)` pair. Clone the sender for each
75/// executor; the receiver is consumed by a single log pusher.
76///
77/// # Examples
78///
79/// ```
80/// use ironflow_engine::log_sender;
81///
82/// let (sender, receiver) = log_sender::channel();
83/// drop(receiver);
84/// ```
85pub fn channel() -> (LogSender, LogReceiver) {
86    mpsc::unbounded_channel()
87}
88
89/// Step-scoped log sender with pre-filled metadata.
90///
91/// Wraps a [`LogSender`] with the run ID, step ID, and step name so that
92/// executors can emit lines without repeating metadata on every call.
93///
94/// # Examples
95///
96/// ```
97/// use ironflow_engine::log_sender::{self, StepLogSender};
98/// use ironflow_engine::notify::LogStream;
99/// use uuid::Uuid;
100///
101/// let (sender, _rx) = log_sender::channel();
102/// let step_sender = StepLogSender::new(
103///     sender,
104///     Uuid::now_v7(),
105///     Uuid::now_v7(),
106///     "deploy".to_string(),
107/// );
108///
109/// step_sender.emit(LogStream::Stderr, "warning: deprecated API");
110/// ```
111#[derive(Clone)]
112pub struct StepLogSender {
113    inner: LogSender,
114    run_id: Uuid,
115    step_id: Uuid,
116    step_name: Arc<str>,
117}
118
119impl StepLogSender {
120    /// Create a new step-scoped sender.
121    pub fn new(inner: LogSender, run_id: Uuid, step_id: Uuid, step_name: String) -> Self {
122        Self {
123            inner,
124            run_id,
125            step_id,
126            step_name: Arc::from(step_name),
127        }
128    }
129
130    /// Emit a log line on the given stream.
131    ///
132    /// Silently drops the line if the receiver has been closed.
133    pub fn emit(&self, stream: LogStream, line: &str) {
134        let _ = self.inner.send(LogLine {
135            run_id: self.run_id,
136            step_id: self.step_id,
137            step_name: self.step_name.clone(),
138            stream,
139            line: line.to_string(),
140            at: Utc::now(),
141        });
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn channel_send_and_receive() {
151        let (sender, mut receiver) = channel();
152        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "build".to_string());
153
154        step.emit(LogStream::Stdout, "line 1");
155        step.emit(LogStream::Stderr, "error!");
156
157        let l1 = receiver.try_recv().unwrap();
158        assert_eq!(l1.line, "line 1");
159        assert_eq!(l1.stream, LogStream::Stdout);
160        assert_eq!(&*l1.step_name, "build");
161
162        let l2 = receiver.try_recv().unwrap();
163        assert_eq!(l2.line, "error!");
164        assert_eq!(l2.stream, LogStream::Stderr);
165    }
166
167    #[test]
168    fn emit_after_receiver_dropped_does_not_panic() {
169        let (sender, receiver) = channel();
170        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
171        drop(receiver);
172        step.emit(LogStream::System, "should not panic");
173    }
174
175    #[test]
176    fn clone_step_sender_shares_channel() {
177        let (sender, mut receiver) = channel();
178        let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
179
180        let cloned = step.clone();
181        step.emit(LogStream::Stdout, "from original");
182        cloned.emit(LogStream::Stdout, "from clone");
183
184        assert_eq!(receiver.try_recv().unwrap().line, "from original");
185        assert_eq!(receiver.try_recv().unwrap().line, "from clone");
186    }
187}