1use crate::infra::channel::Sender;
2use crate::preview::structured_log;
3use crate::preview::structured_log::{parse_line, LogEntry, LogFormat};
4use crate::runtime::config::RunConfig;
5use std::io::{self, BufRead, Read};
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StartupMode {
12 Headless,
13 InteractiveDirectDiff,
14 InteractiveTerminal,
15 InteractiveSearchPipe,
16 InteractiveLogPipe,
17 InteractiveLogFile,
18}
19
20pub type LogPipeReader = Box<dyn Read + Send>;
21
22pub struct PreparedInteractiveInput {
23 pub stdin_items: Option<Vec<String>>,
24 pub log_pipe: Option<LogPipeReader>,
25 pub log_files: Vec<PathBuf>,
26}
27
28pub fn classify_input_mode_with_run_config(run_config: &RunConfig) -> StartupMode {
29 if run_config.headless {
30 StartupMode::Headless
31 } else if run_config.diff.is_some() {
32 StartupMode::InteractiveDirectDiff
33 } else if run_config.log && !run_config.log_files.is_empty() {
34 StartupMode::InteractiveLogFile
35 } else if run_config.log && run_config.stdin {
36 StartupMode::InteractiveLogPipe
37 } else if run_config.stdin {
38 StartupMode::InteractiveSearchPipe
39 } else {
40 StartupMode::InteractiveTerminal
41 }
42}
43
44pub fn prepare_headless_input_with_run_config(
45 run_config: &RunConfig,
46) -> anyhow::Result<Option<Vec<String>>> {
47 if !run_config.stdin {
48 return Ok(None);
49 }
50
51 let raw = read_stdin_lines()?;
52 Ok(Some(parse_stdin_items(raw, run_config.split.as_deref())))
53}
54
55pub fn prepare_interactive_input_with_run_config(
56 run_config: &RunConfig,
57) -> anyhow::Result<PreparedInteractiveInput> {
58 match classify_input_mode_with_run_config(run_config) {
59 StartupMode::Headless
60 | StartupMode::InteractiveDirectDiff
61 | StartupMode::InteractiveTerminal => Ok(PreparedInteractiveInput {
62 stdin_items: None,
63 log_pipe: None,
64 log_files: vec![],
65 }),
66 StartupMode::InteractiveSearchPipe => Ok(PreparedInteractiveInput {
67 stdin_items: Some(read_interactive_search_items(run_config.split.as_deref())?),
68 log_pipe: None,
69 log_files: vec![],
70 }),
71 StartupMode::InteractiveLogPipe => Ok(PreparedInteractiveInput {
72 stdin_items: None,
73 log_pipe: Some(take_interactive_log_pipe()?),
74 log_files: vec![],
75 }),
76 StartupMode::InteractiveLogFile => Ok(PreparedInteractiveInput {
77 stdin_items: None,
78 log_pipe: None,
79 log_files: run_config.log_files.clone(),
80 }),
81 }
82}
83
84pub fn spawn_log_stdin_reader(
85 pipe: LogPipeReader,
86 tx_log: impl Sender<(String, Vec<LogEntry>)>,
87) -> std::thread::JoinHandle<()> {
88 std::thread::spawn(move || {
89 let mut format: Option<LogFormat> = None;
90 let mut batch: Vec<LogEntry> = Vec::with_capacity(256);
91 let mut last_flush = std::time::Instant::now();
92 const BATCH_SIZE: usize = 500;
93 const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
94
95 fn flush(
96 batch: &mut Vec<LogEntry>,
97 tx: &impl Sender<(String, Vec<LogEntry>)>,
98 last_flush: &mut std::time::Instant,
99 ) -> bool {
100 if batch.is_empty() {
101 return true;
102 }
103 let ok = tx
104 .send((
105 structured_log::STDIN_STREAM_PATH.to_string(),
106 std::mem::replace(batch, Vec::with_capacity(256)),
107 ))
108 .is_ok();
109 *last_flush = std::time::Instant::now();
110 ok
111 }
112
113 for line in std::io::BufReader::new(pipe).lines() {
114 let Ok(line) = line else { break };
115 let trimmed = line.trim();
116 if trimmed.is_empty() {
117 continue;
118 }
119 if format.is_none() {
120 format = Some(
121 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
122 LogFormat::Jsonl
123 } else {
124 LogFormat::Logfmt
125 },
126 );
127 }
128 if let Some(entry) = parse_line(trimmed, format.as_ref().expect("log format set")) {
129 batch.push(entry);
130 if batch.len() >= BATCH_SIZE || last_flush.elapsed() >= FLUSH_INTERVAL {
131 if !flush(&mut batch, &tx_log, &mut last_flush) {
132 break; }
134 }
135 }
136 }
137
138 let _ = flush(&mut batch, &tx_log, &mut last_flush);
139 })
140}
141
142pub fn spawn_log_file_watchers(
143 files: &[PathBuf],
144 tx_log: impl Sender<(String, Vec<LogEntry>)> + Clone + 'static,
145) {
146 let format = detect_format_from_files(files).unwrap_or(LogFormat::Jsonl);
147 for file in files {
148 let stop = Arc::new(AtomicBool::new(false));
149 structured_log::watcher::spawn_log_watcher(
150 file.display().to_string(),
151 format.clone(),
152 0, stop,
154 tx_log.clone(),
155 );
156 }
157}
158
159fn detect_format_from_files(files: &[PathBuf]) -> Option<LogFormat> {
160 for file in files {
161 let file = std::fs::File::open(file).ok()?;
162 let reader = std::io::BufReader::new(file);
163 for line in reader.lines() {
164 let Ok(line) = line else { continue };
165 let trimmed = line.trim();
166 if trimmed.is_empty() {
167 continue;
168 }
169 return Some(
170 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
171 LogFormat::Jsonl
172 } else {
173 LogFormat::Logfmt
174 },
175 );
176 }
177 }
178 None
179}
180
181fn read_interactive_search_items(split: Option<&str>) -> anyhow::Result<Vec<String>> {
182 let raw = read_piped_stdin_lines_with_tty_restore()?;
183 Ok(parse_stdin_items(raw, split))
184}
185
186#[cfg(unix)]
187fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
188 let pipe = std::fs::File::open("/dev/stdin")?;
189 restore_stdin_to_real_tty()?;
190 Ok(Box::new(pipe))
191}
192
193#[cfg(not(unix))]
194fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
195 Ok(Box::new(std::io::stdin()))
196}
197
198#[cfg(unix)]
199fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
200 let pipe_stdin = std::fs::File::open("/dev/stdin")?;
201 let items = read_lines_from(pipe_stdin);
202 restore_stdin_to_real_tty()?;
203 Ok(items)
204}
205
206#[cfg(not(unix))]
207fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
208 read_stdin_lines()
209}
210
211#[cfg(unix)]
212fn restore_stdin_to_real_tty() -> anyhow::Result<()> {
213 use std::os::fd::AsRawFd;
214
215 let tty = open_real_tty()?;
216 let ret = unsafe { libc::dup2(tty.as_raw_fd(), libc::STDIN_FILENO) };
217 if ret == -1 {
218 return Err(std::io::Error::last_os_error().into());
219 }
220 Ok(())
221}
222
223#[cfg(unix)]
224fn open_real_tty() -> anyhow::Result<std::fs::File> {
225 use std::ffi::CStr;
226
227 for fd in [libc::STDOUT_FILENO, libc::STDERR_FILENO] {
228 let name = unsafe { libc::ttyname(fd) };
229 if !name.is_null() {
230 let path = unsafe { CStr::from_ptr(name) };
231 if let Ok(path_str) = path.to_str() {
232 if let Ok(file) = std::fs::File::open(path_str) {
233 return Ok(file);
234 }
235 }
236 }
237 }
238
239 Ok(std::fs::File::open("/dev/tty")?)
240}
241
242fn read_stdin_lines() -> anyhow::Result<Vec<String>> {
243 Ok(read_lines_from(io::stdin().lock()))
244}
245
246fn read_lines_from(reader: impl Read) -> Vec<String> {
247 io::BufReader::new(reader)
248 .lines()
249 .map_while(Result::ok)
250 .collect()
251}
252
253fn parse_stdin_items(raw: Vec<String>, split: Option<&str>) -> Vec<String> {
254 raw.into_iter()
255 .flat_map(|line| match split {
256 Some(delim) => line.split(delim).map(str::to_string).collect::<Vec<_>>(),
257 None => vec![line],
258 })
259 .filter(|s| !s.trim().is_empty())
260 .collect()
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::cli::args::OutputFormat;
267 use crate::runtime::config::RunConfig;
268
269 fn run_config() -> RunConfig {
270 RunConfig {
271 headless: false,
272 output_format: OutputFormat::Plain,
273 output_file: None,
274 stdin: false,
275 log: false,
276 log_files: vec![],
277 diff: None,
278 preview_command: None,
279 preview_delimiter: ":".to_string(),
280 split: None,
281 }
282 }
283
284 #[test]
285 fn headless_mode_classification_is_stable() {
286 let mut run_config = run_config();
287 run_config.headless = true;
288 run_config.stdin = true;
289 assert_eq!(
290 classify_input_mode_with_run_config(&run_config),
291 StartupMode::Headless
292 );
293 }
294
295 #[test]
296 fn piped_stdin_search_mode_classification_is_stable() {
297 let mut run_config = run_config();
298 run_config.stdin = true;
299 assert_eq!(
300 classify_input_mode_with_run_config(&run_config),
301 StartupMode::InteractiveSearchPipe
302 );
303 }
304
305 #[test]
306 fn piped_stdin_log_mode_classification_is_stable() {
307 let mut run_config = run_config();
308 run_config.stdin = true;
309 run_config.log = true;
310 assert_eq!(
311 classify_input_mode_with_run_config(&run_config),
312 StartupMode::InteractiveLogPipe
313 );
314 }
315
316 #[test]
317 fn direct_diff_mode_classification_is_stable() {
318 let mut run_config = run_config();
319 run_config.diff = Some(["left.txt".into(), "right.txt".into()]);
320 assert_eq!(
321 classify_input_mode_with_run_config(&run_config),
322 StartupMode::InteractiveDirectDiff
323 );
324 }
325
326 #[test]
327 fn split_parsing_drops_empty_segments() {
328 let items = parse_stdin_items(
329 vec!["a,b,,c".to_string(), " ".to_string(), "d".to_string()],
330 Some(","),
331 );
332 assert_eq!(items, vec!["a", "b", "c", "d"]);
333 }
334}