logappend/
logappend.rs

1use crate::buffer::Buffer;
2use crate::buffer::BufferError;
3use crate::logsink;
4use crate::logsink::Logsink;
5use crate::logsink::SinkError;
6use crate::parse::parse_lines;
7use crate::signals;
8use crate::signals::SignalError;
9use io::Read;
10use std::io;
11use std::path;
12use std::path::PathBuf;
13use std::process;
14use std::process::ExitStatus;
15use std::sync::atomic::AtomicUsize;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20use std::time::Instant;
21use thiserror::Error;
22
23const LONGLINE_CUT: usize = 512;
24
25#[allow(unused)]
26pub const DO_TRACE: bool = false;
27
28#[allow(unused)]
29pub const DO_DEBUG: bool = false || DO_TRACE;
30
31#[allow(unused)]
32#[macro_export]
33macro_rules! trace {
34    ($($arg:tt)*) => { if $crate::logappend::DO_TRACE { eprintln!($($arg)*) } };
35}
36
37#[allow(unused)]
38#[macro_export]
39macro_rules! debug {
40    ($($arg:tt)*) => { if $crate::logappend::DO_DEBUG { eprintln!($($arg)*) } };
41}
42
43#[allow(unused)]
44#[macro_export]
45macro_rules! error {
46    ($($arg:tt)*) => { if true { eprintln!($($arg)*) } };
47}
48
49#[derive(Debug, Error)]
50pub enum LogAppendError {
51    #[error("IOError({0})")]
52    IO(#[from] io::Error),
53    #[error("SinkError({0})")]
54    Sink(#[from] SinkError),
55    #[error("BufferError({0})")]
56    Buffer(#[from] BufferError),
57    #[error("LogicError({0})")]
58    Logic(String),
59    #[error("SystemSignalError({0})")]
60    SystemSignal(#[from] SignalError),
61}
62
63#[derive(Debug, Error)]
64pub enum BufDropError {
65    #[error("AdvanceError({0})")]
66    Advance(#[from] BufferError),
67}
68
69#[test]
70fn buf_drop_error() {
71    let e: Result<(), _> = Err(BufferError::RpAdv(6, 4, 3)).map_err(BufDropError::from);
72    let s = e.unwrap_err().to_string();
73    assert_eq!("AdvanceError(ReadPointerAdvanceError(6, 4, 3))", s);
74}
75
76fn buf_drop_until_newline(buf: &mut Buffer) -> Result<usize, LogAppendError> {
77    let bb = buf.readable();
78    let mut n = bb.len();
79    for (i, b) in bb.iter().enumerate() {
80        if *b == 0xa {
81            n = 1 + i;
82            break;
83        }
84    }
85    buf.advance(n)?;
86    Ok(n)
87}
88
89fn push_lines_to_sinks(lines: &[&[u8]], sinks: &mut [&mut Logsink]) -> Result<(), LogAppendError> {
90    for sink in sinks.iter_mut() {
91        sink.push_lines(&lines)?;
92    }
93    Ok(())
94}
95
96fn output_append_running<INP: Read>(
97    dir: PathBuf,
98    total_max: usize,
99    file_max: usize,
100    mut inp: INP,
101    prefix: &str,
102) -> Result<(), LogAppendError> {
103    let mut sink_all = Logsink::new(
104        dir.clone(),
105        logsink::Filter::All,
106        prefix,
107        total_max,
108        file_max,
109    )?;
110    let mut buf = Buffer::new(1024 * 16);
111    loop {
112        let bmut = buf.writable();
113        let nread = inp.read(bmut)?;
114        if nread > bmut.len() {
115            error!("ERROR read returned more than buffer space");
116            return Err(LogAppendError::Logic(
117                "read returned more than buffer space".into(),
118            ));
119        }
120        buf.adv_wp(nread)?;
121        sink_all.push_data(buf.readable())?;
122        buf.reset();
123        sink_all.flush()?;
124        if nread == 0 {
125            break;
126        }
127    }
128    Ok(())
129}
130
131fn log_append_running<INP: Read>(
132    dir: PathBuf,
133    total_max: usize,
134    file_max: usize,
135    mut inp: INP,
136) -> Result<(), LogAppendError> {
137    let mut sink_all = Logsink::new(
138        dir.clone(),
139        logsink::Filter::All,
140        "all",
141        total_max,
142        file_max,
143    )?;
144    let mut sink_info = Logsink::new(dir, logsink::Filter::Info, "info", total_max, file_max)?;
145    let mut sinks = [&mut sink_all, &mut sink_info];
146    let mut buf = Buffer::new(1024 * 16);
147    loop {
148        if buf.free_len() == 0 {
149            let x = buf_drop_until_newline(&mut buf)?;
150            debug!("[BUF-FULL-DROP {}]", x);
151        }
152        let bmut = buf.writable();
153        let nread = inp.read(bmut)?;
154        if nread > bmut.len() {
155            error!("ERROR read returned more than buffer space");
156            return Err(LogAppendError::Logic(
157                "ERROR read returned more than buffer space".into(),
158            ));
159        }
160        buf.adv_wp(nread)?;
161        trace!("[READ {:5}  HAVE {:5}]", nread, buf.len());
162        let (lines, n2) = parse_lines(buf.readable());
163        trace!("[PARSED-LINES {}]", lines.len());
164        push_lines_to_sinks(&lines, &mut sinks)?;
165        buf.advance(n2)?;
166        if buf.len() > LONGLINE_CUT {
167            debug!("[TRUNCATED]");
168            let lines = [buf.readable()[..LONGLINE_CUT].as_ref()];
169            push_lines_to_sinks(&lines, &mut sinks)?;
170            let x = buf_drop_until_newline(&mut buf)?;
171            debug!("[TRUNC-DROP {}]", x);
172        }
173        if nread == 0 {
174            if buf.len() > 0 {
175                let lines = [buf.readable()];
176                push_lines_to_sinks(&lines, &mut sinks)?;
177            }
178            break;
179        }
180        for sink in sinks.iter_mut() {
181            sink.flush()?;
182        }
183    }
184    Ok(())
185}
186
187pub enum InputStreamType {
188    FeedThrough,
189    LogSplit,
190}
191
192pub fn logappend_wrap(
193    dirname: &str,
194    total_max: usize,
195    file_max: usize,
196    exe: String,
197    args: Vec<String>,
198    stdout_stream_type: InputStreamType,
199    stderr_stream_type: InputStreamType,
200) -> Result<ExitStatus, LogAppendError> {
201    debug!("exe {exe:?}");
202    debug!("args {args:?}");
203    let dir = path::PathBuf::from(dirname);
204
205    signals::init();
206    signals::ignore_signal(libc::SIGINT)?;
207    signals::ignore_signal(libc::SIGTERM).unwrap();
208    signals::ignore_signal(libc::SIGHUP).unwrap();
209
210    // TODO make sure that this chikd process receives our std input.
211
212    let mut proc = process::Command::new(exe)
213        .args(args)
214        .stdout(process::Stdio::piped())
215        .stderr(process::Stdio::piped())
216        .spawn()
217        .unwrap();
218    let chout = proc.stdout.take().unwrap();
219    let cherr = proc.stderr.take().unwrap();
220
221    let count_running = Arc::new(AtomicUsize::new(2));
222
223    // TODO handle possible error from thread
224    let jh1 = thread::Builder::new()
225        .spawn({
226            let count_running = count_running.clone();
227            let dir = dir.to_owned();
228            move || {
229                match stdout_stream_type {
230                    InputStreamType::FeedThrough => {
231                        output_append_running(dir, total_max, file_max, chout, "stdout").unwrap();
232                    }
233                    InputStreamType::LogSplit => {
234                        log_append_running(dir, total_max, file_max, chout).unwrap();
235                    }
236                }
237                count_running.fetch_sub(1, Ordering::SeqCst);
238            }
239        })
240        .unwrap();
241    let jh2 = thread::Builder::new()
242        .spawn({
243            let count_running = count_running.clone();
244            let dir = dir.to_owned();
245            move || {
246                match stderr_stream_type {
247                    InputStreamType::FeedThrough => {
248                        output_append_running(dir, total_max, file_max, cherr, "stderr").unwrap();
249                    }
250                    InputStreamType::LogSplit => {
251                        log_append_running(dir, total_max, file_max, cherr).unwrap();
252                    }
253                }
254                count_running.fetch_sub(1, Ordering::SeqCst);
255            }
256        })
257        .unwrap();
258
259    let recv = signals::receiver();
260    let deadline = Instant::now() + Duration::from_millis(30000);
261    loop {
262        let h = count_running.load(Ordering::SeqCst);
263        trace!("msg loop {h}");
264        let tsnow = Instant::now();
265        if false && tsnow >= deadline {
266            debug!("msg timeout break");
267            break;
268        }
269        let _timeout = deadline - tsnow;
270        let timeout = Duration::from_millis(100);
271        match recv.recv_timeout(timeout) {
272            Ok(e) => {
273                debug!("msg: {e}");
274            }
275            Err(_e) => {
276                if count_running.load(Ordering::SeqCst) == 0 {
277                    break;
278                }
279            }
280        }
281    }
282    let ec = proc.wait().unwrap();
283    jh1.join().unwrap();
284    jh2.join().unwrap();
285    Ok(ec)
286}
287
288// Only as playground.
289pub fn play_signals() -> Result<(), LogAppendError> {
290    let count_running = Arc::new(AtomicUsize::new(0));
291    signals::init();
292    if false {
293        signals::set_signal_handler(libc::SIGUSR1).unwrap();
294    }
295    signals::ignore_signal(libc::SIGINT).unwrap();
296    signals::ignore_signal(libc::SIGTERM).unwrap();
297    signals::ignore_signal(libc::SIGHUP).unwrap();
298    let proc = process::Command::new("/bin/bash")
299        .args(&[
300            "-c",
301            "while true; do date; >&2 echo stderr; sleep 0.3; done",
302        ])
303        .stdout(process::Stdio::piped())
304        .stderr(process::Stdio::piped())
305        .spawn()
306        .unwrap();
307    count_running.fetch_add(2, Ordering::SeqCst);
308    let mut chout = proc.stdout.unwrap();
309    let mut cherr = proc.stderr.unwrap();
310    let jh1 = thread::Builder::new()
311        .spawn({
312            let count_running = count_running.clone();
313            move || {
314                let mut buf = vec![0; 256];
315                loop {
316                    debug!("thread-1 read");
317                    let n = chout.read(&mut buf).unwrap();
318                    debug!("thread-1 read done");
319                    if n == 0 {
320                        debug!("thread-1 break");
321                        break;
322                    } else {
323                        signals::sender()
324                            .send(format!(
325                                "thread-1 {}",
326                                std::str::from_utf8(&buf[..n]).unwrap()
327                            ))
328                            .unwrap();
329                    }
330                }
331                count_running.fetch_sub(1, Ordering::SeqCst);
332            }
333        })
334        .unwrap();
335    let jh2 = thread::Builder::new()
336        .spawn({
337            let count_running = count_running.clone();
338            move || {
339                let mut buf = vec![0; 256];
340                loop {
341                    debug!("thread-2 read");
342                    let n = cherr.read(&mut buf).unwrap();
343                    debug!("thread-2 read done");
344                    if n == 0 {
345                        debug!("thread-2 break");
346                        break;
347                    } else {
348                        signals::sender()
349                            .send(format!(
350                                "thread-2 {}",
351                                std::str::from_utf8(&buf[..n]).unwrap()
352                            ))
353                            .unwrap();
354                    }
355                }
356                count_running.fetch_sub(1, Ordering::SeqCst);
357            }
358        })
359        .unwrap();
360    if false {
361        thread::sleep(Duration::from_millis(1000));
362        debug!("sending USR1");
363        unsafe { libc::kill(0, libc::SIGUSR1) };
364        debug!("sending USR1 done");
365    }
366    let recv = signals::receiver();
367    let deadline = Instant::now() + Duration::from_millis(30000);
368    loop {
369        let tsnow = Instant::now();
370        if tsnow >= deadline {
371            debug!("msg timeout break");
372            break;
373        }
374        let _timeout = deadline - tsnow;
375        let timeout = Duration::from_millis(100);
376        match recv.recv_timeout(timeout) {
377            Ok(e) => {
378                debug!("msg: {e}");
379            }
380            Err(_e) => {
381                if count_running.load(Ordering::SeqCst) == 0 {
382                    break;
383                }
384            }
385        }
386    }
387    debug!("await join handles");
388    jh1.join().unwrap();
389    jh2.join().unwrap();
390    debug!("DONE");
391    Ok(())
392}