1use crate::buffer::Buffer;
2use crate::buffer::BufferError;
3use crate::logsink;
4use crate::logsink::Logsink;
5use crate::logsink::SinkError;
6use crate::parse::parse_lines;
7use crate::signals;
8use crate::signals::SignalError;
9use io::Read;
10use std::io;
11use std::path;
12use std::path::PathBuf;
13use std::process;
14use std::process::ExitStatus;
15use std::sync::atomic::AtomicUsize;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20use std::time::Instant;
21use thiserror::Error;
22
23const LONGLINE_CUT: usize = 512;
24
25#[allow(unused)]
26pub const DO_TRACE: bool = false;
27
28#[allow(unused)]
29pub const DO_DEBUG: bool = false || DO_TRACE;
30
31#[allow(unused)]
32#[macro_export]
33macro_rules! trace {
34 ($($arg:tt)*) => { if $crate::logappend::DO_TRACE { eprintln!($($arg)*) } };
35}
36
37#[allow(unused)]
38#[macro_export]
39macro_rules! debug {
40 ($($arg:tt)*) => { if $crate::logappend::DO_DEBUG { eprintln!($($arg)*) } };
41}
42
43#[allow(unused)]
44#[macro_export]
45macro_rules! error {
46 ($($arg:tt)*) => { if true { eprintln!($($arg)*) } };
47}
48
49#[derive(Debug, Error)]
50pub enum LogAppendError {
51 #[error("IOError({0})")]
52 IO(#[from] io::Error),
53 #[error("SinkError({0})")]
54 Sink(#[from] SinkError),
55 #[error("BufferError({0})")]
56 Buffer(#[from] BufferError),
57 #[error("LogicError({0})")]
58 Logic(String),
59 #[error("SystemSignalError({0})")]
60 SystemSignal(#[from] SignalError),
61}
62
63#[derive(Debug, Error)]
64pub enum BufDropError {
65 #[error("AdvanceError({0})")]
66 Advance(#[from] BufferError),
67}
68
69#[test]
70fn buf_drop_error() {
71 let e: Result<(), _> = Err(BufferError::RpAdv(6, 4, 3)).map_err(BufDropError::from);
72 let s = e.unwrap_err().to_string();
73 assert_eq!("AdvanceError(ReadPointerAdvanceError(6, 4, 3))", s);
74}
75
76fn buf_drop_until_newline(buf: &mut Buffer) -> Result<usize, LogAppendError> {
77 let bb = buf.readable();
78 let mut n = bb.len();
79 for (i, b) in bb.iter().enumerate() {
80 if *b == 0xa {
81 n = 1 + i;
82 break;
83 }
84 }
85 buf.advance(n)?;
86 Ok(n)
87}
88
89fn push_lines_to_sinks(lines: &[&[u8]], sinks: &mut [&mut Logsink]) -> Result<(), LogAppendError> {
90 for sink in sinks.iter_mut() {
91 sink.push_lines(&lines)?;
92 }
93 Ok(())
94}
95
96fn output_append_running<INP: Read>(
97 dir: PathBuf,
98 total_max: usize,
99 file_max: usize,
100 mut inp: INP,
101 prefix: &str,
102) -> Result<(), LogAppendError> {
103 let mut sink_all = Logsink::new(
104 dir.clone(),
105 logsink::Filter::All,
106 prefix,
107 total_max,
108 file_max,
109 )?;
110 let mut buf = Buffer::new(1024 * 16);
111 loop {
112 let bmut = buf.writable();
113 let nread = inp.read(bmut)?;
114 if nread > bmut.len() {
115 error!("ERROR read returned more than buffer space");
116 return Err(LogAppendError::Logic(
117 "read returned more than buffer space".into(),
118 ));
119 }
120 buf.adv_wp(nread)?;
121 sink_all.push_data(buf.readable())?;
122 buf.reset();
123 sink_all.flush()?;
124 if nread == 0 {
125 break;
126 }
127 }
128 Ok(())
129}
130
131fn log_append_running<INP: Read>(
132 dir: PathBuf,
133 total_max: usize,
134 file_max: usize,
135 mut inp: INP,
136) -> Result<(), LogAppendError> {
137 let mut sink_all = Logsink::new(
138 dir.clone(),
139 logsink::Filter::All,
140 "all",
141 total_max,
142 file_max,
143 )?;
144 let mut sink_info = Logsink::new(dir, logsink::Filter::Info, "info", total_max, file_max)?;
145 let mut sinks = [&mut sink_all, &mut sink_info];
146 let mut buf = Buffer::new(1024 * 16);
147 loop {
148 if buf.free_len() == 0 {
149 let x = buf_drop_until_newline(&mut buf)?;
150 debug!("[BUF-FULL-DROP {}]", x);
151 }
152 let bmut = buf.writable();
153 let nread = inp.read(bmut)?;
154 if nread > bmut.len() {
155 error!("ERROR read returned more than buffer space");
156 return Err(LogAppendError::Logic(
157 "ERROR read returned more than buffer space".into(),
158 ));
159 }
160 buf.adv_wp(nread)?;
161 trace!("[READ {:5} HAVE {:5}]", nread, buf.len());
162 let (lines, n2) = parse_lines(buf.readable());
163 trace!("[PARSED-LINES {}]", lines.len());
164 push_lines_to_sinks(&lines, &mut sinks)?;
165 buf.advance(n2)?;
166 if buf.len() > LONGLINE_CUT {
167 debug!("[TRUNCATED]");
168 let lines = [buf.readable()[..LONGLINE_CUT].as_ref()];
169 push_lines_to_sinks(&lines, &mut sinks)?;
170 let x = buf_drop_until_newline(&mut buf)?;
171 debug!("[TRUNC-DROP {}]", x);
172 }
173 if nread == 0 {
174 if buf.len() > 0 {
175 let lines = [buf.readable()];
176 push_lines_to_sinks(&lines, &mut sinks)?;
177 }
178 break;
179 }
180 for sink in sinks.iter_mut() {
181 sink.flush()?;
182 }
183 }
184 Ok(())
185}
186
187pub enum InputStreamType {
188 FeedThrough,
189 LogSplit,
190}
191
192pub fn logappend_wrap(
193 dirname: &str,
194 total_max: usize,
195 file_max: usize,
196 exe: String,
197 args: Vec<String>,
198 stdout_stream_type: InputStreamType,
199 stderr_stream_type: InputStreamType,
200) -> Result<ExitStatus, LogAppendError> {
201 debug!("exe {exe:?}");
202 debug!("args {args:?}");
203 let dir = path::PathBuf::from(dirname);
204
205 signals::init();
206 signals::ignore_signal(libc::SIGINT)?;
207 signals::ignore_signal(libc::SIGTERM).unwrap();
208 signals::ignore_signal(libc::SIGHUP).unwrap();
209
210 let mut proc = process::Command::new(exe)
213 .args(args)
214 .stdout(process::Stdio::piped())
215 .stderr(process::Stdio::piped())
216 .spawn()
217 .unwrap();
218 let chout = proc.stdout.take().unwrap();
219 let cherr = proc.stderr.take().unwrap();
220
221 let count_running = Arc::new(AtomicUsize::new(2));
222
223 let jh1 = thread::Builder::new()
225 .spawn({
226 let count_running = count_running.clone();
227 let dir = dir.to_owned();
228 move || {
229 match stdout_stream_type {
230 InputStreamType::FeedThrough => {
231 output_append_running(dir, total_max, file_max, chout, "stdout").unwrap();
232 }
233 InputStreamType::LogSplit => {
234 log_append_running(dir, total_max, file_max, chout).unwrap();
235 }
236 }
237 count_running.fetch_sub(1, Ordering::SeqCst);
238 }
239 })
240 .unwrap();
241 let jh2 = thread::Builder::new()
242 .spawn({
243 let count_running = count_running.clone();
244 let dir = dir.to_owned();
245 move || {
246 match stderr_stream_type {
247 InputStreamType::FeedThrough => {
248 output_append_running(dir, total_max, file_max, cherr, "stderr").unwrap();
249 }
250 InputStreamType::LogSplit => {
251 log_append_running(dir, total_max, file_max, cherr).unwrap();
252 }
253 }
254 count_running.fetch_sub(1, Ordering::SeqCst);
255 }
256 })
257 .unwrap();
258
259 let recv = signals::receiver();
260 let deadline = Instant::now() + Duration::from_millis(30000);
261 loop {
262 let h = count_running.load(Ordering::SeqCst);
263 trace!("msg loop {h}");
264 let tsnow = Instant::now();
265 if false && tsnow >= deadline {
266 debug!("msg timeout break");
267 break;
268 }
269 let _timeout = deadline - tsnow;
270 let timeout = Duration::from_millis(100);
271 match recv.recv_timeout(timeout) {
272 Ok(e) => {
273 debug!("msg: {e}");
274 }
275 Err(_e) => {
276 if count_running.load(Ordering::SeqCst) == 0 {
277 break;
278 }
279 }
280 }
281 }
282 let ec = proc.wait().unwrap();
283 jh1.join().unwrap();
284 jh2.join().unwrap();
285 Ok(ec)
286}
287
288pub fn play_signals() -> Result<(), LogAppendError> {
290 let count_running = Arc::new(AtomicUsize::new(0));
291 signals::init();
292 if false {
293 signals::set_signal_handler(libc::SIGUSR1).unwrap();
294 }
295 signals::ignore_signal(libc::SIGINT).unwrap();
296 signals::ignore_signal(libc::SIGTERM).unwrap();
297 signals::ignore_signal(libc::SIGHUP).unwrap();
298 let proc = process::Command::new("/bin/bash")
299 .args(&[
300 "-c",
301 "while true; do date; >&2 echo stderr; sleep 0.3; done",
302 ])
303 .stdout(process::Stdio::piped())
304 .stderr(process::Stdio::piped())
305 .spawn()
306 .unwrap();
307 count_running.fetch_add(2, Ordering::SeqCst);
308 let mut chout = proc.stdout.unwrap();
309 let mut cherr = proc.stderr.unwrap();
310 let jh1 = thread::Builder::new()
311 .spawn({
312 let count_running = count_running.clone();
313 move || {
314 let mut buf = vec![0; 256];
315 loop {
316 debug!("thread-1 read");
317 let n = chout.read(&mut buf).unwrap();
318 debug!("thread-1 read done");
319 if n == 0 {
320 debug!("thread-1 break");
321 break;
322 } else {
323 signals::sender()
324 .send(format!(
325 "thread-1 {}",
326 std::str::from_utf8(&buf[..n]).unwrap()
327 ))
328 .unwrap();
329 }
330 }
331 count_running.fetch_sub(1, Ordering::SeqCst);
332 }
333 })
334 .unwrap();
335 let jh2 = thread::Builder::new()
336 .spawn({
337 let count_running = count_running.clone();
338 move || {
339 let mut buf = vec![0; 256];
340 loop {
341 debug!("thread-2 read");
342 let n = cherr.read(&mut buf).unwrap();
343 debug!("thread-2 read done");
344 if n == 0 {
345 debug!("thread-2 break");
346 break;
347 } else {
348 signals::sender()
349 .send(format!(
350 "thread-2 {}",
351 std::str::from_utf8(&buf[..n]).unwrap()
352 ))
353 .unwrap();
354 }
355 }
356 count_running.fetch_sub(1, Ordering::SeqCst);
357 }
358 })
359 .unwrap();
360 if false {
361 thread::sleep(Duration::from_millis(1000));
362 debug!("sending USR1");
363 unsafe { libc::kill(0, libc::SIGUSR1) };
364 debug!("sending USR1 done");
365 }
366 let recv = signals::receiver();
367 let deadline = Instant::now() + Duration::from_millis(30000);
368 loop {
369 let tsnow = Instant::now();
370 if tsnow >= deadline {
371 debug!("msg timeout break");
372 break;
373 }
374 let _timeout = deadline - tsnow;
375 let timeout = Duration::from_millis(100);
376 match recv.recv_timeout(timeout) {
377 Ok(e) => {
378 debug!("msg: {e}");
379 }
380 Err(_e) => {
381 if count_running.load(Ordering::SeqCst) == 0 {
382 break;
383 }
384 }
385 }
386 }
387 debug!("await join handles");
388 jh1.join().unwrap();
389 jh2.join().unwrap();
390 debug!("DONE");
391 Ok(())
392}