1use crate::infra::channel::Sender;
2use crate::preview::structured_log;
3use crate::preview::structured_log::{parse_line, LogEntry, LogFormat};
4use crate::runtime::config::RunConfig;
5use std::io::{self, BufRead, Read};
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StartupMode {
12 Headless,
13 InteractiveDirectDiff,
14 InteractiveTerminal,
15 InteractiveSearchPipe,
16 InteractiveLogPipe,
17 InteractiveLogFile,
18}
19
20pub type LogPipeReader = Box<dyn Read + Send>;
21
22pub struct PreparedInteractiveInput {
23 pub stdin_items: Option<Vec<String>>,
24 pub log_pipe: Option<LogPipeReader>,
25 pub log_files: Vec<PathBuf>,
26}
27
28pub fn classify_input_mode_with_run_config(run_config: &RunConfig) -> StartupMode {
29 if run_config.headless {
30 StartupMode::Headless
31 } else if run_config.diff.is_some() {
32 StartupMode::InteractiveDirectDiff
33 } else if run_config.log && !run_config.log_files.is_empty() {
34 StartupMode::InteractiveLogFile
35 } else if run_config.log && run_config.stdin {
36 StartupMode::InteractiveLogPipe
37 } else if run_config.stdin {
38 StartupMode::InteractiveSearchPipe
39 } else {
40 StartupMode::InteractiveTerminal
41 }
42}
43
44pub fn prepare_headless_input_with_run_config(
45 run_config: &RunConfig,
46) -> anyhow::Result<Option<Vec<String>>> {
47 if !run_config.stdin {
48 return Ok(None);
49 }
50
51 let raw = read_stdin_lines()?;
52 Ok(Some(parse_stdin_items(raw, run_config.split.as_deref())))
53}
54
55pub fn prepare_interactive_input_with_run_config(
56 run_config: &RunConfig,
57) -> anyhow::Result<PreparedInteractiveInput> {
58 match classify_input_mode_with_run_config(run_config) {
59 StartupMode::Headless
60 | StartupMode::InteractiveDirectDiff
61 | StartupMode::InteractiveTerminal => Ok(PreparedInteractiveInput {
62 stdin_items: None,
63 log_pipe: None,
64 log_files: vec![],
65 }),
66 StartupMode::InteractiveSearchPipe => Ok(PreparedInteractiveInput {
67 stdin_items: Some(read_interactive_search_items(run_config.split.as_deref())?),
68 log_pipe: None,
69 log_files: vec![],
70 }),
71 StartupMode::InteractiveLogPipe => Ok(PreparedInteractiveInput {
72 stdin_items: None,
73 log_pipe: Some(take_interactive_log_pipe()?),
74 log_files: vec![],
75 }),
76 StartupMode::InteractiveLogFile => Ok(PreparedInteractiveInput {
77 stdin_items: None,
78 log_pipe: None,
79 log_files: run_config.log_files.clone(),
80 }),
81 }
82}
83
84pub fn spawn_log_stdin_reader(
85 pipe: LogPipeReader,
86 tx_log: impl Sender<(String, Vec<LogEntry>)>,
87) -> std::thread::JoinHandle<()> {
88 std::thread::spawn(move || {
89 let mut format: Option<LogFormat> = None;
90 let mut batch: Vec<LogEntry> = Vec::with_capacity(256);
91 let mut last_flush = std::time::Instant::now();
92 const BATCH_SIZE: usize = 500;
93 const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
94
95 fn flush(
96 batch: &mut Vec<LogEntry>,
97 tx: &impl Sender<(String, Vec<LogEntry>)>,
98 last_flush: &mut std::time::Instant,
99 ) -> bool {
100 if batch.is_empty() {
101 return true;
102 }
103 let ok = tx
104 .send((
105 structured_log::STDIN_STREAM_PATH.to_string(),
106 std::mem::replace(batch, Vec::with_capacity(256)),
107 ))
108 .is_ok();
109 *last_flush = std::time::Instant::now();
110 ok
111 }
112
113 for line in std::io::BufReader::new(pipe).lines() {
114 let Ok(line) = line else { break };
115 let trimmed = line.trim();
116 if trimmed.is_empty() {
117 continue;
118 }
119 if format.is_none() {
120 format = Some(
121 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
122 LogFormat::Jsonl
123 } else {
124 LogFormat::Logfmt
125 },
126 );
127 }
128 if let Some(entry) = parse_line(trimmed, format.as_ref().expect("log format set")) {
129 batch.push(entry);
130 if batch.len() >= BATCH_SIZE || last_flush.elapsed() >= FLUSH_INTERVAL {
131 if !flush(&mut batch, &tx_log, &mut last_flush) {
132 break; }
134 }
135 }
136 }
137
138 let _ = flush(&mut batch, &tx_log, &mut last_flush);
139 })
140}
141
142pub fn spawn_log_file_watchers(
143 files: &[PathBuf],
144 tx_log: impl Sender<(String, Vec<LogEntry>)> + Clone + 'static,
145) {
146 let format = detect_format_from_files(files).unwrap_or(LogFormat::Jsonl);
147 for file in files {
148 let stop = Arc::new(AtomicBool::new(false));
149 structured_log::watcher::spawn_log_watcher(
150 file.display().to_string(),
151 format.clone(),
152 0, stop,
154 tx_log.clone(),
155 );
156 }
157}
158
159fn detect_format_from_files(files: &[PathBuf]) -> Option<LogFormat> {
160 for file in files {
161 let file = std::fs::File::open(file).ok()?;
162 let reader = std::io::BufReader::new(file);
163 for line in reader.lines() {
164 let Ok(line) = line else { continue };
165 let trimmed = line.trim();
166 if trimmed.is_empty() {
167 continue;
168 }
169 return Some(if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
170 LogFormat::Jsonl
171 } else {
172 LogFormat::Logfmt
173 });
174 }
175 }
176 None
177}
178
179fn read_interactive_search_items(split: Option<&str>) -> anyhow::Result<Vec<String>> {
180 let raw = read_piped_stdin_lines_with_tty_restore()?;
181 Ok(parse_stdin_items(raw, split))
182}
183
184#[cfg(unix)]
185fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
186 let pipe = std::fs::File::open("/dev/stdin")?;
187 restore_stdin_to_real_tty()?;
188 Ok(Box::new(pipe))
189}
190
191#[cfg(not(unix))]
192fn take_interactive_log_pipe() -> anyhow::Result<LogPipeReader> {
193 Ok(Box::new(std::io::stdin()))
194}
195
196#[cfg(unix)]
197fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
198 let pipe_stdin = std::fs::File::open("/dev/stdin")?;
199 let items = read_lines_from(pipe_stdin);
200 restore_stdin_to_real_tty()?;
201 Ok(items)
202}
203
204#[cfg(not(unix))]
205fn read_piped_stdin_lines_with_tty_restore() -> anyhow::Result<Vec<String>> {
206 read_stdin_lines()
207}
208
209#[cfg(unix)]
210fn restore_stdin_to_real_tty() -> anyhow::Result<()> {
211 use std::os::fd::AsRawFd;
212
213 let tty = open_real_tty()?;
214 let ret = unsafe { libc::dup2(tty.as_raw_fd(), libc::STDIN_FILENO) };
215 if ret == -1 {
216 return Err(std::io::Error::last_os_error().into());
217 }
218 Ok(())
219}
220
221#[cfg(unix)]
222fn open_real_tty() -> anyhow::Result<std::fs::File> {
223 use std::ffi::CStr;
224
225 for fd in [libc::STDOUT_FILENO, libc::STDERR_FILENO] {
226 let name = unsafe { libc::ttyname(fd) };
227 if !name.is_null() {
228 let path = unsafe { CStr::from_ptr(name) };
229 if let Ok(path_str) = path.to_str() {
230 if let Ok(file) = std::fs::File::open(path_str) {
231 return Ok(file);
232 }
233 }
234 }
235 }
236
237 Ok(std::fs::File::open("/dev/tty")?)
238}
239
240fn read_stdin_lines() -> anyhow::Result<Vec<String>> {
241 Ok(read_lines_from(io::stdin().lock()))
242}
243
244fn read_lines_from(reader: impl Read) -> Vec<String> {
245 io::BufReader::new(reader)
246 .lines()
247 .map_while(Result::ok)
248 .collect()
249}
250
251fn parse_stdin_items(raw: Vec<String>, split: Option<&str>) -> Vec<String> {
252 raw.into_iter()
253 .flat_map(|line| match split {
254 Some(delim) => line.split(delim).map(str::to_string).collect::<Vec<_>>(),
255 None => vec![line],
256 })
257 .filter(|s| !s.trim().is_empty())
258 .collect()
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::cli::args::OutputFormat;
265 use crate::runtime::config::RunConfig;
266
267 fn run_config() -> RunConfig {
268 RunConfig {
269 headless: false,
270 output_format: OutputFormat::Plain,
271 stdin: false,
272 log: false,
273 log_files: vec![],
274 diff: None,
275 preview_command: None,
276 preview_delimiter: ":".to_string(),
277 split: None,
278 }
279 }
280
281 #[test]
282 fn headless_mode_classification_is_stable() {
283 let mut run_config = run_config();
284 run_config.headless = true;
285 run_config.stdin = true;
286 assert_eq!(
287 classify_input_mode_with_run_config(&run_config),
288 StartupMode::Headless
289 );
290 }
291
292 #[test]
293 fn piped_stdin_search_mode_classification_is_stable() {
294 let mut run_config = run_config();
295 run_config.stdin = true;
296 assert_eq!(
297 classify_input_mode_with_run_config(&run_config),
298 StartupMode::InteractiveSearchPipe
299 );
300 }
301
302 #[test]
303 fn piped_stdin_log_mode_classification_is_stable() {
304 let mut run_config = run_config();
305 run_config.stdin = true;
306 run_config.log = true;
307 assert_eq!(
308 classify_input_mode_with_run_config(&run_config),
309 StartupMode::InteractiveLogPipe
310 );
311 }
312
313 #[test]
314 fn direct_diff_mode_classification_is_stable() {
315 let mut run_config = run_config();
316 run_config.diff = Some(["left.txt".into(), "right.txt".into()]);
317 assert_eq!(
318 classify_input_mode_with_run_config(&run_config),
319 StartupMode::InteractiveDirectDiff
320 );
321 }
322
323 #[test]
324 fn split_parsing_drops_empty_segments() {
325 let items = parse_stdin_items(
326 vec!["a,b,,c".to_string(), " ".to_string(), "d".to_string()],
327 Some(","),
328 );
329 assert_eq!(items, vec!["a", "b", "c", "d"]);
330 }
331}