Skip to main content

binocular/runtime/
startup.rs

1use crate::infra::channel::Sender;
2use crate::preview::structured_log;
3use crate::preview::structured_log::{parse_line, LogEntry, LogFormat};
4use crate::runtime::config::RunConfig;
5use std::io::{self, BufRead, Read};
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StartupMode {
12    Headless,
13    InteractiveDirectDiff,
14    InteractiveTerminal,
15    InteractiveSearchPipe,
16    InteractiveLogPipe,
17    InteractiveLogFile,
18}
19
20pub type LogPipeReader = Box<dyn Read + Send>;
21
22pub struct PreparedInteractiveInput {
23    pub stdin_items: Option<Vec<String>>,
24    pub log_pipe: Option<LogPipeReader>,
25    pub log_files: Vec<PathBuf>,
26}
27
28pub fn classify_input_mode_with_run_config(run_config: &RunConfig) -> StartupMode {
29    if run_config.headless {
30        StartupMode::Headless
31    } else if run_config.diff.is_some() {
32        StartupMode::InteractiveDirectDiff
33    } else if run_config.log && !run_config.log_files.is_empty() {
34        StartupMode::InteractiveLogFile
35    } else if run_config.log && run_config.stdin {
36        StartupMode::InteractiveLogPipe
37    } else if run_config.stdin {
38        StartupMode::InteractiveSearchPipe
39    } else {
40        StartupMode::InteractiveTerminal
41    }
42}
43
44pub fn prepare_headless_input_with_run_config(
45    run_config: &RunConfig,
46) -> anyhow::Result<Option<Vec<String>>> {
47    if !run_config.stdin {
48        return Ok(None);
49    }
50
51    let raw = read_stdin_lines()?;
52    Ok(Some(parse_stdin_items(raw, run_config.split.as_deref())))
53}
54
55pub fn prepare_interactive_input_with_run_config(
56    run_config: &RunConfig,
57) -> anyhow::Result<PreparedInteractiveInput> {
58    match classify_input_mode_with_run_config(run_config) {
59        StartupMode::Headless
60        | StartupMode::InteractiveDirectDiff
61        | StartupMode::InteractiveTerminal => Ok(PreparedInteractiveInput {
62            stdin_items: None,
63            log_pipe: None,
64            log_files: vec![],
65        }),
66        StartupMode::InteractiveSearchPipe => Ok(PreparedInteractiveInput {
67            stdin_items: Some(read_interactive_search_items(run_config.split.as_deref())?),
68            log_pipe: None,
69            log_files: vec![],
70        }),
71        StartupMode::InteractiveLogPipe => Ok(PreparedInteractiveInput {
72            stdin_items: None,
73            log_pipe: Some(take_interactive_log_pipe()?),
74            log_files: vec![],
75        }),
76        StartupMode::InteractiveLogFile => Ok(PreparedInteractiveInput {
77            stdin_items: None,
78            log_pipe: None,
79            log_files: run_config.log_files.clone(),
80        }),
81    }
82}
83
84pub fn spawn_log_stdin_reader(
85    pipe: LogPipeReader,
86    tx_log: impl Sender<(String, Vec<LogEntry>)>,
87) -> std::thread::JoinHandle<()> {
88    std::thread::spawn(move || {
89        let mut format: Option<LogFormat> = None;
90        let mut batch: Vec<LogEntry> = Vec::with_capacity(256);
91        let mut last_flush = std::time::Instant::now();
92        const BATCH_SIZE: usize = 500;
93        const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
94
95        fn flush(
96            batch: &mut Vec<LogEntry>,
97            tx: &impl Sender<(String, Vec<LogEntry>)>,
98            last_flush: &mut std::time::Instant,
99        ) -> bool {
100            if batch.is_empty() {
101                return true;
102            }
103            let ok = tx
104                .send((
105                    structured_log::STDIN_STREAM_PATH.to_string(),
106                    std::mem::replace(batch, Vec::with_capacity(256)),
107                ))
108                .is_ok();
109            *last_flush = std::time::Instant::now();
110            ok
111        }
112
113        for line in std::io::BufReader::new(pipe).lines() {
114            let Ok(line) = line else { break };
115            let trimmed = line.trim();
116            if trimmed.is_empty() {
117                continue;
118            }
119            if format.is_none() {
120                format = Some(
121                    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
122                        LogFormat::Jsonl
123                    } else {
124                        LogFormat::Logfmt
125                    },
126                );
127            }
128            if let Some(entry) = parse_line(trimmed, format.as_ref().expect("log format set")) {
129                batch.push(entry);
130                if batch.len() >= BATCH_SIZE || last_flush.elapsed() >= FLUSH_INTERVAL {
131                    if !flush(&mut batch, &tx_log, &mut last_flush) {
132                        break; // Receiver gone, app is closing.
133                    }
134                }
135            }
136        }
137
138        let _ = flush(&mut batch, &tx_log, &mut last_flush);
139    })
140}
141
142pub fn spawn_log_file_watchers(
143    files: &[PathBuf],
144    tx_log: impl Sender<(String, Vec<LogEntry>)> + Clone + 'static,
145) {
146    let format = detect_format_from_files(files).unwrap_or(LogFormat::Jsonl);
147    for file in files {
148        let stop = Arc::new(AtomicBool::new(false));
149        structured_log::watcher::spawn_log_watcher(
150            file.display().to_string(),
151            format.clone(),
152            0, // start from beginning (read whole file, then tail)
153            stop,
154            tx_log.clone(),
155        );
156    }
157}
158
159fn detect_format_from_files(files: &[PathBuf]) -> Option<LogFormat> {
160    for file in files {
161        let file = std::fs::File::open(file).ok()?;
162        let reader = std::io::BufReader::new(file);
163        for line in reader.lines() {
164            let Ok(line) = line else { continue };
165            let trimmed = line.trim();
166            if trimmed.is_empty() {
167                continue;
168            }
169            return Some(if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
170                LogFormat::Jsonl
171            } else {
172                LogFormat::Logfmt
173            });
174        }
175    }
176    None
177}
178
179fn read_interactive_search_items(split: Option<&str>) -> anyhow::Result<Vec<String>> {
180    let raw = read_piped_stdin_lines_with_tty_restore()?;
181    Ok(parse_stdin_items(raw, split))
182}
183
184#[cfg(unix)]
185fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
186    let pipe = std::fs::File::open("/dev/stdin")?;
187    restore_stdin_to_real_tty()?;
188    Ok(Box::new(pipe))
189}
190
191#[cfg(not(unix))]
192fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
193    Ok(Box::new(std::io::stdin()))
194}
195
196#[cfg(unix)]
197fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
198    let pipe_stdin = std::fs::File::open("/dev/stdin")?;
199    let items = read_lines_from(pipe_stdin);
200    restore_stdin_to_real_tty()?;
201    Ok(items)
202}
203
204#[cfg(not(unix))]
205fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
206    read_stdin_lines()
207}
208
209#[cfg(unix)]
210fn restore_stdin_to_real_tty() -> anyhow::Result<()> {
211    use std::os::fd::AsRawFd;
212
213    let tty = open_real_tty()?;
214    let ret = unsafe { libc::dup2(tty.as_raw_fd(), libc::STDIN_FILENO) };
215    if ret == -1 {
216        return Err(std::io::Error::last_os_error().into());
217    }
218    Ok(())
219}
220
221#[cfg(unix)]
222fn open_real_tty() -> anyhow::Result<std::fs::File> {
223    use std::ffi::CStr;
224
225    for fd in [libc::STDOUT_FILENO, libc::STDERR_FILENO] {
226        let name = unsafe { libc::ttyname(fd) };
227        if !name.is_null() {
228            let path = unsafe { CStr::from_ptr(name) };
229            if let Ok(path_str) = path.to_str() {
230                if let Ok(file) = std::fs::File::open(path_str) {
231                    return Ok(file);
232                }
233            }
234        }
235    }
236
237    Ok(std::fs::File::open("/dev/tty")?)
238}
239
240fn read_stdin_lines() -> anyhow::Result<Vec<String>> {
241    Ok(read_lines_from(io::stdin().lock()))
242}
243
244fn read_lines_from(reader: impl Read) -> Vec<String> {
245    io::BufReader::new(reader)
246        .lines()
247        .map_while(Result::ok)
248        .collect()
249}
250
251fn parse_stdin_items(raw: Vec<String>, split: Option<&str>) -> Vec<String> {
252    raw.into_iter()
253        .flat_map(|line| match split {
254            Some(delim) => line.split(delim).map(str::to_string).collect::<Vec<_>>(),
255            None => vec![line],
256        })
257        .filter(|s| !s.trim().is_empty())
258        .collect()
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::cli::args::OutputFormat;
265    use crate::runtime::config::RunConfig;
266
267    fn run_config() -> RunConfig {
268        RunConfig {
269            headless: false,
270            output_format: OutputFormat::Plain,
271            stdin: false,
272            log: false,
273            log_files: vec![],
274            diff: None,
275            preview_command: None,
276            preview_delimiter: ":".to_string(),
277            split: None,
278        }
279    }
280
281    #[test]
282    fn headless_mode_classification_is_stable() {
283        let mut run_config = run_config();
284        run_config.headless = true;
285        run_config.stdin = true;
286        assert_eq!(
287            classify_input_mode_with_run_config(&run_config),
288            StartupMode::Headless
289        );
290    }
291
292    #[test]
293    fn piped_stdin_search_mode_classification_is_stable() {
294        let mut run_config = run_config();
295        run_config.stdin = true;
296        assert_eq!(
297            classify_input_mode_with_run_config(&run_config),
298            StartupMode::InteractiveSearchPipe
299        );
300    }
301
302    #[test]
303    fn piped_stdin_log_mode_classification_is_stable() {
304        let mut run_config = run_config();
305        run_config.stdin = true;
306        run_config.log = true;
307        assert_eq!(
308            classify_input_mode_with_run_config(&run_config),
309            StartupMode::InteractiveLogPipe
310        );
311    }
312
313    #[test]
314    fn direct_diff_mode_classification_is_stable() {
315        let mut run_config = run_config();
316        run_config.diff = Some(["left.txt".into(), "right.txt".into()]);
317        assert_eq!(
318            classify_input_mode_with_run_config(&run_config),
319            StartupMode::InteractiveDirectDiff
320        );
321    }
322
323    #[test]
324    fn split_parsing_drops_empty_segments() {
325        let items = parse_stdin_items(
326            vec!["a,b,,c".to_string(), "  ".to_string(), "d".to_string()],
327            Some(","),
328        );
329        assert_eq!(items, vec!["a", "b", "c", "d"]);
330    }
331}