ironflow_engine/log_sender.rs
1//! Live log streaming infrastructure for real-time step output.
2//!
3//! During workflow execution, shell commands and agent invocations produce
4//! output incrementally. This module provides a channel-based mechanism to
5//! stream those lines to an external consumer (e.g. the worker's log pusher)
6//! so that SSE clients can display output in real time.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ShellExecutor ──► StepLogSender ──► LogSender (mpsc) ──► LogPusher ──► API
12//! AgentExecutor ─┘ (worker)
13//! ```
14//!
15//! The [`LogSender`] is an unbounded MPSC channel sender. The
16//! [`StepLogSender`] wraps it with step metadata (run ID, step ID, step name)
17//! so that executors can emit lines with a simple `emit(stream, line)` call.
18//!
19//! # Examples
20//!
21//! ```
22//! use ironflow_engine::log_sender::{self, StepLogSender};
23//! use ironflow_engine::notify::LogStream;
24//! use uuid::Uuid;
25//!
26//! let (sender, mut receiver) = log_sender::channel();
27//! let step_sender = StepLogSender::new(
28//! sender,
29//! Uuid::now_v7(),
30//! Uuid::now_v7(),
31//! "build".to_string(),
32//! );
33//!
34//! step_sender.emit(LogStream::Stdout, "Compiling ironflow v0.1.0");
35//!
36//! let line = receiver.try_recv().unwrap();
37//! assert_eq!(line.line, "Compiling ironflow v0.1.0");
38//! assert_eq!(&*line.step_name, "build");
39//! ```
40
41use std::sync::Arc;
42
43use chrono::{DateTime, Utc};
44use tokio::sync::mpsc;
45use uuid::Uuid;
46
47use crate::notify::LogStream;
48
49/// A single log line emitted during step execution.
50#[derive(Debug, Clone)]
51pub struct LogLine {
52 /// Run that owns this step.
53 pub run_id: Uuid,
54 /// Step that produced the line.
55 pub step_id: Uuid,
56 /// Human-readable step name (shared across lines from the same step).
57 pub step_name: Arc<str>,
58 /// Output stream (stdout, stderr, or system).
59 pub stream: LogStream,
60 /// The line content.
61 pub line: String,
62 /// When the line was emitted.
63 pub at: DateTime<Utc>,
64}
65
66/// Sender half of the log streaming channel.
67pub type LogSender = mpsc::UnboundedSender<LogLine>;
68
69/// Receiver half of the log streaming channel.
70pub type LogReceiver = mpsc::UnboundedReceiver<LogLine>;
71
72/// Create a new log streaming channel.
73///
74/// Returns a `(sender, receiver)` pair. Clone the sender for each
75/// executor; the receiver is consumed by a single log pusher.
76///
77/// # Examples
78///
79/// ```
80/// use ironflow_engine::log_sender;
81///
82/// let (sender, receiver) = log_sender::channel();
83/// drop(receiver);
84/// ```
85pub fn channel() -> (LogSender, LogReceiver) {
86 mpsc::unbounded_channel()
87}
88
89/// Step-scoped log sender with pre-filled metadata.
90///
91/// Wraps a [`LogSender`] with the run ID, step ID, and step name so that
92/// executors can emit lines without repeating metadata on every call.
93///
94/// # Examples
95///
96/// ```
97/// use ironflow_engine::log_sender::{self, StepLogSender};
98/// use ironflow_engine::notify::LogStream;
99/// use uuid::Uuid;
100///
101/// let (sender, _rx) = log_sender::channel();
102/// let step_sender = StepLogSender::new(
103/// sender,
104/// Uuid::now_v7(),
105/// Uuid::now_v7(),
106/// "deploy".to_string(),
107/// );
108///
109/// step_sender.emit(LogStream::Stderr, "warning: deprecated API");
110/// ```
111#[derive(Clone)]
112pub struct StepLogSender {
113 inner: LogSender,
114 run_id: Uuid,
115 step_id: Uuid,
116 step_name: Arc<str>,
117}
118
119impl StepLogSender {
120 /// Create a new step-scoped sender.
121 pub fn new(inner: LogSender, run_id: Uuid, step_id: Uuid, step_name: String) -> Self {
122 Self {
123 inner,
124 run_id,
125 step_id,
126 step_name: Arc::from(step_name),
127 }
128 }
129
130 /// Emit a log line on the given stream.
131 ///
132 /// Silently drops the line if the receiver has been closed.
133 pub fn emit(&self, stream: LogStream, line: &str) {
134 let _ = self.inner.send(LogLine {
135 run_id: self.run_id,
136 step_id: self.step_id,
137 step_name: self.step_name.clone(),
138 stream,
139 line: line.to_string(),
140 at: Utc::now(),
141 });
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn channel_send_and_receive() {
151 let (sender, mut receiver) = channel();
152 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "build".to_string());
153
154 step.emit(LogStream::Stdout, "line 1");
155 step.emit(LogStream::Stderr, "error!");
156
157 let l1 = receiver.try_recv().unwrap();
158 assert_eq!(l1.line, "line 1");
159 assert_eq!(l1.stream, LogStream::Stdout);
160 assert_eq!(&*l1.step_name, "build");
161
162 let l2 = receiver.try_recv().unwrap();
163 assert_eq!(l2.line, "error!");
164 assert_eq!(l2.stream, LogStream::Stderr);
165 }
166
167 #[test]
168 fn emit_after_receiver_dropped_does_not_panic() {
169 let (sender, receiver) = channel();
170 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
171 drop(receiver);
172 step.emit(LogStream::System, "should not panic");
173 }
174
175 #[test]
176 fn clone_step_sender_shares_channel() {
177 let (sender, mut receiver) = channel();
178 let step = StepLogSender::new(sender, Uuid::now_v7(), Uuid::now_v7(), "test".to_string());
179
180 let cloned = step.clone();
181 step.emit(LogStream::Stdout, "from original");
182 cloned.emit(LogStream::Stdout, "from clone");
183
184 assert_eq!(receiver.try_recv().unwrap().line, "from original");
185 assert_eq!(receiver.try_recv().unwrap().line, "from clone");
186 }
187}