kaish_kernel/scheduler/stderr_stream.rs
1//! Kernel-level stderr stream for real-time error output from pipeline stages.
2//!
3//! In bash, stderr from ALL pipeline stages streams to the terminal simultaneously.
4//! kaish achieves this with `StderrStream` — a cloneable, Send+Sync handle backed
5//! by an unbounded mpsc channel. Pipeline stages and external commands write to it
6//! concurrently; the kernel drains it to the actual stderr sink.
7//!
8//! The channel carries raw bytes (`Vec<u8>`), not strings. This avoids UTF-8
9//! corruption when multi-byte characters are split across chunk boundaries in
10//! external command stderr reads. Lossy decode happens once at the kernel drain
11//! site — the presentation boundary.
12//!
13//! ```text
14//! Stage 1 ──┐
15//! Stage 2 ──┼──▶ StderrStream (mpsc, bytes) ──▶ drain ──▶ from_utf8_lossy ──▶ output
16//! Stage 3 ──┘
17//! ```
18
19use tokio::sync::mpsc;
20
21/// Cloneable handle to the kernel's stderr output stream.
22///
23/// Multiple pipeline stages can write concurrently. The receiver side
24/// is drained by the kernel after each statement (or by a background task).
25///
26/// Uses `UnboundedSender` which is `Clone + Send + Sync` — safe across
27/// `tokio::spawn` boundaries without `Arc<Mutex<..>>`.
28#[derive(Clone, Debug)]
29pub struct StderrStream {
30 sender: mpsc::UnboundedSender<Vec<u8>>,
31}
32
33/// Receiving end of the stderr stream.
34///
35/// Owned by the kernel. Call `drain_lossy()` to collect all pending bytes
36/// as a UTF-8 string (with lossy decode at this boundary).
37pub struct StderrReceiver {
38 receiver: mpsc::UnboundedReceiver<Vec<u8>>,
39}
40
41/// Create a new stderr stream pair.
42pub fn stderr_stream() -> (StderrStream, StderrReceiver) {
43 let (sender, receiver) = mpsc::unbounded_channel();
44 (
45 StderrStream { sender },
46 StderrReceiver { receiver },
47 )
48}
49
50impl StderrStream {
51 /// Write raw bytes to the stderr stream.
52 ///
53 /// Non-blocking. If the receiver has been dropped, the data is silently
54 /// discarded (same as writing to a closed pipe).
55 pub fn write(&self, data: &[u8]) {
56 if !data.is_empty() {
57 // Ignore send errors — receiver dropped means nobody is listening
58 let _ = self.sender.send(data.to_vec());
59 }
60 }
61
62 /// Write a UTF-8 string to the stderr stream.
63 ///
64 /// Convenience for builtins that produce string stderr.
65 pub fn write_str(&self, msg: &str) {
66 self.write(msg.as_bytes());
67 }
68}
69
70impl StderrReceiver {
71 /// Drain all pending bytes and decode as UTF-8 (lossy).
72 ///
73 /// This is the single presentation boundary where bytes become a String.
74 /// Multi-byte characters that were split across chunks are reassembled
75 /// here because all chunks are concatenated before decoding.
76 ///
77 /// Returns an empty string if no messages are pending.
78 /// Non-blocking — returns immediately with whatever is available.
79 pub fn drain_lossy(&mut self) -> String {
80 let mut buf = Vec::new();
81 while let Ok(chunk) = self.receiver.try_recv() {
82 buf.extend_from_slice(&chunk);
83 }
84 if buf.is_empty() {
85 String::new()
86 } else {
87 String::from_utf8_lossy(&buf).into_owned()
88 }
89 }
90}