Skip to main content

binocular/runtime/
startup.rs

1use crate::infra::channel::Sender;
2use crate::preview::structured_log;
3use crate::preview::structured_log::{parse_line, LogEntry, LogFormat};
4use crate::runtime::config::RunConfig;
5use std::io::{self, BufRead, Read};
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StartupMode {
12    Headless,
13    InteractiveDirectDiff,
14    InteractiveTerminal,
15    InteractiveSearchPipe,
16    InteractiveLogPipe,
17    InteractiveLogFile,
18}
19
20pub type LogPipeReader = Box<dyn Read + Send>;
21
22pub struct PreparedInteractiveInput {
23    pub stdin_items: Option<Vec<String>>,
24    pub log_pipe: Option<LogPipeReader>,
25    pub log_files: Vec<PathBuf>,
26}
27
28pub fn classify_input_mode_with_run_config(run_config: &RunConfig) -> StartupMode {
29    if run_config.headless {
30        StartupMode::Headless
31    } else if run_config.diff.is_some() {
32        StartupMode::InteractiveDirectDiff
33    } else if run_config.log && !run_config.log_files.is_empty() {
34        StartupMode::InteractiveLogFile
35    } else if run_config.log && run_config.stdin {
36        StartupMode::InteractiveLogPipe
37    } else if run_config.stdin {
38        StartupMode::InteractiveSearchPipe
39    } else {
40        StartupMode::InteractiveTerminal
41    }
42}
43
44pub fn prepare_headless_input_with_run_config(
45    run_config: &RunConfig,
46) -> anyhow::Result<Option<Vec<String>>> {
47    if !run_config.stdin {
48        return Ok(None);
49    }
50
51    let raw = read_stdin_lines()?;
52    Ok(Some(parse_stdin_items(raw, run_config.split.as_deref())))
53}
54
55pub fn prepare_interactive_input_with_run_config(
56    run_config: &RunConfig,
57) -> anyhow::Result<PreparedInteractiveInput> {
58    match classify_input_mode_with_run_config(run_config) {
59        StartupMode::Headless
60        | StartupMode::InteractiveDirectDiff
61        | StartupMode::InteractiveTerminal => Ok(PreparedInteractiveInput {
62            stdin_items: None,
63            log_pipe: None,
64            log_files: vec![],
65        }),
66        StartupMode::InteractiveSearchPipe => Ok(PreparedInteractiveInput {
67            stdin_items: Some(read_interactive_search_items(run_config.split.as_deref())?),
68            log_pipe: None,
69            log_files: vec![],
70        }),
71        StartupMode::InteractiveLogPipe => Ok(PreparedInteractiveInput {
72            stdin_items: None,
73            log_pipe: Some(take_interactive_log_pipe()?),
74            log_files: vec![],
75        }),
76        StartupMode::InteractiveLogFile => Ok(PreparedInteractiveInput {
77            stdin_items: None,
78            log_pipe: None,
79            log_files: run_config.log_files.clone(),
80        }),
81    }
82}
83
84pub fn spawn_log_stdin_reader(
85    pipe: LogPipeReader,
86    tx_log: impl Sender<(String, Vec<LogEntry>)>,
87) -> std::thread::JoinHandle<()> {
88    std::thread::spawn(move || {
89        let mut format: Option<LogFormat> = None;
90        let mut batch: Vec<LogEntry> = Vec::with_capacity(256);
91        let mut last_flush = std::time::Instant::now();
92        const BATCH_SIZE: usize = 500;
93        const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
94
95        fn flush(
96            batch: &mut Vec<LogEntry>,
97            tx: &impl Sender<(String, Vec<LogEntry>)>,
98            last_flush: &mut std::time::Instant,
99        ) -> bool {
100            if batch.is_empty() {
101                return true;
102            }
103            let ok = tx
104                .send((
105                    structured_log::STDIN_STREAM_PATH.to_string(),
106                    std::mem::replace(batch, Vec::with_capacity(256)),
107                ))
108                .is_ok();
109            *last_flush = std::time::Instant::now();
110            ok
111        }
112
113        for line in std::io::BufReader::new(pipe).lines() {
114            let Ok(line) = line else { break };
115            let trimmed = line.trim();
116            if trimmed.is_empty() {
117                continue;
118            }
119            if format.is_none() {
120                format = Some(
121                    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
122                        LogFormat::Jsonl
123                    } else {
124                        LogFormat::Logfmt
125                    },
126                );
127            }
128            if let Some(entry) = parse_line(trimmed, format.as_ref().expect("log format set")) {
129                batch.push(entry);
130                if batch.len() >= BATCH_SIZE || last_flush.elapsed() >= FLUSH_INTERVAL {
131                    if !flush(&mut batch, &tx_log, &mut last_flush) {
132                        break; // Receiver gone, app is closing.
133                    }
134                }
135            }
136        }
137
138        let _ = flush(&mut batch, &tx_log, &mut last_flush);
139    })
140}
141
142pub fn spawn_log_file_watchers(
143    files: &[PathBuf],
144    tx_log: impl Sender<(String, Vec<LogEntry>)> + Clone + 'static,
145) {
146    let format = detect_format_from_files(files).unwrap_or(LogFormat::Jsonl);
147    for file in files {
148        let stop = Arc::new(AtomicBool::new(false));
149        structured_log::watcher::spawn_log_watcher(
150            file.display().to_string(),
151            format.clone(),
152            0, // start from beginning (read whole file, then tail)
153            stop,
154            tx_log.clone(),
155        );
156    }
157}
158
159fn detect_format_from_files(files: &[PathBuf]) -> Option<LogFormat> {
160    for file in files {
161        let file = std::fs::File::open(file).ok()?;
162        let reader = std::io::BufReader::new(file);
163        for line in reader.lines() {
164            let Ok(line) = line else { continue };
165            let trimmed = line.trim();
166            if trimmed.is_empty() {
167                continue;
168            }
169            return Some(
170                if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
171                    LogFormat::Jsonl
172                } else {
173                    LogFormat::Logfmt
174                },
175            );
176        }
177    }
178    None
179}
180
181fn read_interactive_search_items(split: Option<&str>) -> anyhow::Result<Vec<String>> {
182    let raw = read_piped_stdin_lines_with_tty_restore()?;
183    Ok(parse_stdin_items(raw, split))
184}
185
186#[cfg(unix)]
187fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
188    let pipe = std::fs::File::open("/dev/stdin")?;
189    restore_stdin_to_real_tty()?;
190    Ok(Box::new(pipe))
191}
192
193#[cfg(not(unix))]
194fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
195    Ok(Box::new(std::io::stdin()))
196}
197
198#[cfg(unix)]
199fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
200    let pipe_stdin = std::fs::File::open("/dev/stdin")?;
201    let items = read_lines_from(pipe_stdin);
202    restore_stdin_to_real_tty()?;
203    Ok(items)
204}
205
206#[cfg(not(unix))]
207fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
208    read_stdin_lines()
209}
210
211#[cfg(unix)]
212fn restore_stdin_to_real_tty() -> anyhow::Result<()> {
213    use std::os::fd::AsRawFd;
214
215    let tty = open_real_tty()?;
216    let ret = unsafe { libc::dup2(tty.as_raw_fd(), libc::STDIN_FILENO) };
217    if ret == -1 {
218        return Err(std::io::Error::last_os_error().into());
219    }
220    Ok(())
221}
222
223#[cfg(unix)]
224fn open_real_tty() -> anyhow::Result<std::fs::File> {
225    use std::ffi::CStr;
226
227    for fd in [libc::STDOUT_FILENO, libc::STDERR_FILENO] {
228        let name = unsafe { libc::ttyname(fd) };
229        if !name.is_null() {
230            let path = unsafe { CStr::from_ptr(name) };
231            if let Ok(path_str) = path.to_str() {
232                if let Ok(file) = std::fs::File::open(path_str) {
233                    return Ok(file);
234                }
235            }
236        }
237    }
238
239    Ok(std::fs::File::open("/dev/tty")?)
240}
241
242fn read_stdin_lines() -> anyhow::Result<Vec<String>> {
243    Ok(read_lines_from(io::stdin().lock()))
244}
245
246fn read_lines_from(reader: impl Read) -> Vec<String> {
247    io::BufReader::new(reader)
248        .lines()
249        .map_while(Result::ok)
250        .collect()
251}
252
253fn parse_stdin_items(raw: Vec<String>, split: Option<&str>) -> Vec<String> {
254    raw.into_iter()
255        .flat_map(|line| match split {
256            Some(delim) => line.split(delim).map(str::to_string).collect::<Vec<_>>(),
257            None => vec![line],
258        })
259        .filter(|s| !s.trim().is_empty())
260        .collect()
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::cli::args::OutputFormat;
267    use crate::runtime::config::RunConfig;
268
269    fn run_config() -> RunConfig {
270        RunConfig {
271            headless: false,
272            output_format: OutputFormat::Plain,
273            output_file: None,
274            stdin: false,
275            log: false,
276            log_files: vec![],
277            diff: None,
278            preview_command: None,
279            preview_delimiter: ":".to_string(),
280            split: None,
281        }
282    }
283
284    #[test]
285    fn headless_mode_classification_is_stable() {
286        let mut run_config = run_config();
287        run_config.headless = true;
288        run_config.stdin = true;
289        assert_eq!(
290            classify_input_mode_with_run_config(&run_config),
291            StartupMode::Headless
292        );
293    }
294
295    #[test]
296    fn piped_stdin_search_mode_classification_is_stable() {
297        let mut run_config = run_config();
298        run_config.stdin = true;
299        assert_eq!(
300            classify_input_mode_with_run_config(&run_config),
301            StartupMode::InteractiveSearchPipe
302        );
303    }
304
305    #[test]
306    fn piped_stdin_log_mode_classification_is_stable() {
307        let mut run_config = run_config();
308        run_config.stdin = true;
309        run_config.log = true;
310        assert_eq!(
311            classify_input_mode_with_run_config(&run_config),
312            StartupMode::InteractiveLogPipe
313        );
314    }
315
316    #[test]
317    fn direct_diff_mode_classification_is_stable() {
318        let mut run_config = run_config();
319        run_config.diff = Some(["left.txt".into(), "right.txt".into()]);
320        assert_eq!(
321            classify_input_mode_with_run_config(&run_config),
322            StartupMode::InteractiveDirectDiff
323        );
324    }
325
326    #[test]
327    fn split_parsing_drops_empty_segments() {
328        let items = parse_stdin_items(
329            vec!["a,b,,c".to_string(), "  ".to_string(), "d".to_string()],
330            Some(","),
331        );
332        assert_eq!(items, vec!["a", "b", "c", "d"]);
333    }
334}