Skip to main content

rusty_ts/
pipeline.rs

1//! The stdin → format → stdout line pipeline.
2//!
3//! Per `spec.md` FR-001, FR-002, FR-005, FR-006, FR-007, FR-010, FR-011 and
4//! `plan.md` HINT-004 (broken-pipe-on-stdout = clean exit 0).
5//!
6//! Two pipeline shapes are exported:
7//!
8//! - `run_prefix` — the canonical prefix-each-line pipeline. Handles
9//!   absolute timestamps (default) and elapsed modes (`-i`, `-s`).
10//! - `run_relative` — the `-r` rewriter pipeline that converts in-line
11//!   timestamps to relative form via `relative::RelativeRewriter`.
12//!
13//! Both pipelines read `BufRead` line-by-line, write byte-faithful output
14//! to `Write`, and treat broken-pipe-on-stdout as a clean EOF (exit 0).
15
16use crate::relative::RelativeRewriter;
17use crate::time::clock::Clock;
18use crate::time::format;
19use crate::time::tz::TimezoneSource;
20use chrono::{DateTime, Utc};
21use std::io::{BufRead, ErrorKind, Write};
22
23/// Selects which time value is rendered into each line's prefix.
24#[derive(Debug, Clone)]
25pub enum PrefixSource {
26    /// Absolute wall-clock time (FR-001, FR-003 default).
27    Absolute,
28    /// Elapsed since the previous input line (FR-005, `-i`). The "previous"
29    /// anchor starts at program start for the first line, matching moreutils.
30    SincePreviousLine,
31    /// Elapsed since program start (FR-006, `-s`).
32    SinceProgramStart,
33}
34
35/// Configuration for `run_prefix`.
36pub struct PrefixConfig<'a> {
37    /// strftime format spec.
38    pub format: &'a str,
39    /// Timezone source resolved per FR-017/018/019.
40    pub tz: &'a TimezoneSource,
41    /// Time source for absolute mode and the elapsed anchors.
42    pub clock: &'a dyn Clock,
43    /// Which prefix value to render.
44    pub source: PrefixSource,
45}
46
47/// Configuration for `run_relative`.
48pub struct RelativeConfig<'a> {
49    /// Compiled rewriter (Default-subset or Strict full-set).
50    pub rewriter: &'a RelativeRewriter,
51    /// Reference instant for relative computations (typically `clock.now()`
52    /// at startup; for snapshot tests, a `Fixed` clock pins this).
53    pub reference: DateTime<Utc>,
54}
55
56/// Run the prefix-each-line pipeline. Returns `ExitCode` semantics: 0 on
57/// clean stdin EOF, non-zero on IO error (excluding broken-pipe-on-stdout
58/// which is treated as clean exit per HINT-004).
59pub fn run_prefix<R: BufRead, W: Write>(
60    mut reader: R,
61    mut writer: W,
62    cfg: &PrefixConfig<'_>,
63) -> std::io::Result<()> {
64    let program_start = cfg.clock.now();
65    let mut previous_line_at = program_start;
66    let mut line = Vec::with_capacity(256);
67
68    loop {
69        line.clear();
70        match reader.read_until(b'\n', &mut line) {
71            Ok(0) => return Ok(()), // clean EOF
72            Ok(_) => {
73                let now = cfg.clock.now();
74                let prefix = match cfg.source {
75                    PrefixSource::Absolute => format::format_with(cfg.format, now, cfg.tz),
76                    PrefixSource::SincePreviousLine => {
77                        let elapsed = (now - previous_line_at).to_std().unwrap_or_default();
78                        previous_line_at = now;
79                        render_elapsed(cfg.format, elapsed)
80                    }
81                    PrefixSource::SinceProgramStart => {
82                        let elapsed = (now - program_start).to_std().unwrap_or_default();
83                        render_elapsed(cfg.format, elapsed)
84                    }
85                };
86
87                if let Err(err) = writer
88                    .write_all(prefix.as_bytes())
89                    .and_then(|_| writer.write_all(b" "))
90                    .and_then(|_| writer.write_all(&line))
91                    .and_then(|_| writer.flush())
92                {
93                    if err.kind() == ErrorKind::BrokenPipe {
94                        return Ok(());
95                    }
96                    return Err(err);
97                }
98            }
99            Err(err) => {
100                if err.kind() == ErrorKind::BrokenPipe {
101                    return Ok(());
102                }
103                return Err(err);
104            }
105        }
106    }
107}
108
109/// Run the `-r` relative-mode pipeline. Each line is passed through the
110/// rewriter; recognized timestamps become relative form, everything else
111/// passes through unchanged.
112pub fn run_relative<R: BufRead, W: Write>(
113    mut reader: R,
114    mut writer: W,
115    cfg: &RelativeConfig<'_>,
116) -> std::io::Result<()> {
117    let mut line = Vec::with_capacity(256);
118    loop {
119        line.clear();
120        match reader.read_until(b'\n', &mut line) {
121            Ok(0) => return Ok(()),
122            Ok(_) => {
123                // Lossy UTF-8 conversion is acceptable here: timestamps in
124                // `-r` mode are ASCII-only, and the surrounding payload is
125                // passed through. Non-UTF-8 bytes in payload become U+FFFD
126                // in the rewrite path; this is the documented compatibility
127                // boundary for `-r` specifically (per FR-009 / FR-025).
128                let text = String::from_utf8_lossy(&line);
129                let rewritten = cfg.rewriter.rewrite(&text, cfg.reference);
130
131                if let Err(err) = writer
132                    .write_all(rewritten.as_bytes())
133                    .and_then(|_| writer.flush())
134                {
135                    if err.kind() == ErrorKind::BrokenPipe {
136                        return Ok(());
137                    }
138                    return Err(err);
139                }
140            }
141            Err(err) => {
142                if err.kind() == ErrorKind::BrokenPipe {
143                    return Ok(());
144                }
145                return Err(err);
146            }
147        }
148    }
149}
150
151/// Render an elapsed `Duration` through the format string. moreutils ts
152/// treats the format string as time-of-day when rendering elapsed durations;
153/// e.g., `ts -i '%H:%M:%S'` shows `00:00:03` for a 3-second elapsed window.
154/// We do the same by constructing a `DateTime<Utc>` from epoch + elapsed and
155/// rendering it.
156fn render_elapsed(spec: &str, elapsed: std::time::Duration) -> String {
157    let secs = elapsed.as_secs() as i64;
158    let nsecs = elapsed.subsec_nanos();
159    let synthetic = chrono::DateTime::<Utc>::from_timestamp(secs, nsecs).unwrap_or_else(|| {
160        // 64-bit timestamp out of range — vanishingly unlikely for elapsed
161        // duration. Fallback to zero.
162        chrono::DateTime::<Utc>::from_timestamp(0, 0).expect("epoch is in range")
163    });
164    format::format_with(spec, synthetic, &TimezoneSource::Utc)
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::time::clock::Fixed;
171    use chrono::TimeZone;
172    use std::io::Cursor;
173
174    fn fixed_clock() -> Fixed {
175        Fixed::new(Utc.with_ymd_and_hms(2026, 5, 22, 14, 30, 45).unwrap())
176    }
177
178    #[test]
179    fn absolute_default_format() {
180        let clock = fixed_clock();
181        let tz = TimezoneSource::Utc;
182        let cfg = PrefixConfig {
183            format: format::DEFAULT_FORMAT,
184            tz: &tz,
185            clock: &clock,
186            source: PrefixSource::Absolute,
187        };
188        let mut out = Vec::new();
189        run_prefix(Cursor::new("hello\nworld\n"), &mut out, &cfg).expect("ok");
190        let s = String::from_utf8(out).expect("utf-8");
191        assert_eq!(s, "May 22 14:30:45 hello\nMay 22 14:30:45 world\n");
192    }
193
194    #[test]
195    fn since_program_start_renders_zero_on_first_line() {
196        let clock = fixed_clock();
197        let tz = TimezoneSource::Utc;
198        let cfg = PrefixConfig {
199            format: "%H:%M:%S",
200            tz: &tz,
201            clock: &clock,
202            source: PrefixSource::SinceProgramStart,
203        };
204        let mut out = Vec::new();
205        run_prefix(Cursor::new("a\n"), &mut out, &cfg).expect("ok");
206        let s = String::from_utf8(out).expect("utf-8");
207        assert!(
208            s.starts_with("00:00:00 "),
209            "expected elapsed-zero prefix; got {s:?}",
210        );
211    }
212
213    #[test]
214    fn empty_stdin_produces_no_output() {
215        let clock = fixed_clock();
216        let tz = TimezoneSource::Utc;
217        let cfg = PrefixConfig {
218            format: format::DEFAULT_FORMAT,
219            tz: &tz,
220            clock: &clock,
221            source: PrefixSource::Absolute,
222        };
223        let mut out = Vec::new();
224        run_prefix(Cursor::new(""), &mut out, &cfg).expect("ok");
225        assert!(out.is_empty(), "expected no output; got {:?}", out);
226    }
227
228    #[test]
229    fn partial_final_line_is_emitted_without_added_newline() {
230        let clock = fixed_clock();
231        let tz = TimezoneSource::Utc;
232        let cfg = PrefixConfig {
233            format: format::DEFAULT_FORMAT,
234            tz: &tz,
235            clock: &clock,
236            source: PrefixSource::Absolute,
237        };
238        let mut out = Vec::new();
239        run_prefix(Cursor::new("incomplete"), &mut out, &cfg).expect("ok");
240        let s = String::from_utf8(out).expect("utf-8");
241        assert_eq!(s, "May 22 14:30:45 incomplete");
242        assert!(!s.ends_with('\n'));
243    }
244
245    #[test]
246    fn binary_payload_passes_through() {
247        // Two lines with a non-UTF-8 byte (0xFF) in the payload. The prefix
248        // is locale-rendered ASCII; the payload bytes are emitted verbatim.
249        let input: &[u8] = b"hello\xff\nworld\n";
250        let clock = fixed_clock();
251        let tz = TimezoneSource::Utc;
252        let cfg = PrefixConfig {
253            format: format::DEFAULT_FORMAT,
254            tz: &tz,
255            clock: &clock,
256            source: PrefixSource::Absolute,
257        };
258        let mut out = Vec::new();
259        run_prefix(Cursor::new(input), &mut out, &cfg).expect("ok");
260        // The 0xFF byte must appear verbatim in the output between the
261        // first prefix and the second one's newline.
262        assert!(
263            out.contains(&0xFF),
264            "expected 0xFF byte to pass through; got {:?}",
265            out,
266        );
267    }
268}