Skip to main content

kaish_kernel/scheduler/
stderr_stream.rs

1//! Kernel-level stderr stream for real-time error output from pipeline stages.
2//!
3//! In bash, stderr from ALL pipeline stages streams to the terminal simultaneously.
4//! kaish achieves this with `StderrStream` — a cloneable, Send+Sync handle backed
5//! by an unbounded mpsc channel. Pipeline stages and external commands write to it
6//! concurrently; the kernel drains it to the actual stderr sink.
7//!
8//! The channel carries raw bytes (`Vec<u8>`), not strings. This avoids UTF-8
9//! corruption when multi-byte characters are split across chunk boundaries in
10//! external command stderr reads. Lossy decode happens once at the kernel drain
11//! site — the presentation boundary.
12//!
13//! ```text
14//!   Stage 1 ──┐
15//!   Stage 2 ──┼──▶ StderrStream (mpsc, bytes) ──▶ drain ──▶ from_utf8_lossy ──▶ output
16//!   Stage 3 ──┘
17//! ```
18
19use tokio::sync::mpsc;
20
21/// Cloneable handle to the kernel's stderr output stream.
22///
23/// Multiple pipeline stages can write concurrently. The receiver side
24/// is drained by the kernel after each statement (or by a background task).
25///
26/// Uses `UnboundedSender` which is `Clone + Send + Sync` — safe across
27/// `tokio::spawn` boundaries without `Arc<Mutex<..>>`.
28#[derive(Clone, Debug)]
29pub struct StderrStream {
30    sender: mpsc::UnboundedSender<Vec<u8>>,
31}
32
33/// Receiving end of the stderr stream.
34///
35/// Owned by the kernel. Call `drain_lossy()` to collect all pending bytes
36/// as a UTF-8 string (with lossy decode at this boundary).
37pub struct StderrReceiver {
38    receiver: mpsc::UnboundedReceiver<Vec<u8>>,
39}
40
41/// Create a new stderr stream pair.
42pub fn stderr_stream() -> (StderrStream, StderrReceiver) {
43    let (sender, receiver) = mpsc::unbounded_channel();
44    (
45        StderrStream { sender },
46        StderrReceiver { receiver },
47    )
48}
49
50impl StderrStream {
51    /// Write raw bytes to the stderr stream.
52    ///
53    /// Non-blocking. If the receiver has been dropped, the data is silently
54    /// discarded (same as writing to a closed pipe).
55    pub fn write(&self, data: &[u8]) {
56        if !data.is_empty() {
57            // Ignore send errors — receiver dropped means nobody is listening
58            let _ = self.sender.send(data.to_vec());
59        }
60    }
61
62    /// Write a UTF-8 string to the stderr stream.
63    ///
64    /// Convenience for builtins that produce string stderr.
65    pub fn write_str(&self, msg: &str) {
66        self.write(msg.as_bytes());
67    }
68}
69
70impl StderrReceiver {
71    /// Drain all pending bytes and decode as UTF-8 (lossy).
72    ///
73    /// This is the single presentation boundary where bytes become a String.
74    /// Multi-byte characters that were split across chunks are reassembled
75    /// here because all chunks are concatenated before decoding.
76    ///
77    /// Returns an empty string if no messages are pending.
78    /// Non-blocking — returns immediately with whatever is available.
79    pub fn drain_lossy(&mut self) -> String {
80        let mut buf = Vec::new();
81        while let Ok(chunk) = self.receiver.try_recv() {
82            buf.extend_from_slice(&chunk);
83        }
84        if buf.is_empty() {
85            String::new()
86        } else {
87            String::from_utf8_lossy(&buf).into_owned()
88        }
89    }
90}