use crate::buffer::Buffer;
use crate::err::Error;
use crate::logsink;
use crate::logsink::Logsink;
use crate::signals;
use io::Read;
use io::Stdin;
use io::Write;
use std::io;
use std::path;
use std::path::PathBuf;
use std::process;
use std::process::ExitStatus;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
#[allow(unused)]
const DO_TRACE: bool = false;
#[allow(unused)]
const DO_DEBUG: bool = false || DO_TRACE;
#[allow(unused)]
macro_rules! trace {
($($arg:tt)*) => { if DO_TRACE { eprintln!($($arg)*) } };
}
#[allow(unused)]
macro_rules! debug {
($($arg:tt)*) => { if DO_DEBUG { eprintln!($($arg)*) } };
}
#[allow(unused)]
macro_rules! error {
($($arg:tt)*) => { if true { eprintln!($($arg)*) } };
}
const LONGLINE_CUT: usize = 512;
fn parse_lines(buf: &[u8]) -> Result<(Vec<&[u8]>, usize), Error> {
let mut ret = Vec::new();
let mut x = 0;
let mut skip = false;
for (i, b) in buf.iter().enumerate() {
if skip {
skip = false;
continue;
}
if *b == 0xa {
let s = &buf[x..i];
ret.push(s);
x = 1 + i;
skip = true;
}
}
Ok((ret, x))
}
#[test]
fn test_parse_00() {
let ret = parse_lines(b"line00\nline01\nline02\n").unwrap();
assert_eq!(ret.0, vec![b"line00", b"line01", b"line02"]);
}
#[test]
fn test_parse_01() {
let ret = parse_lines(b"").unwrap();
assert_eq!(ret.0.len(), 0);
}
#[test]
fn test_parse_02() {
let ret = parse_lines(b"line00").unwrap();
assert_eq!(ret.0.len(), 0);
}
#[test]
fn test_parse_03() {
let ret = parse_lines(b"line00\nline01\nline02").unwrap();
assert_eq!(ret.0, vec![b"line00", b"line01"]);
}
fn buf_drop_until_newline(buf: &mut Buffer) -> Result<usize, Error> {
let bb = buf.readable();
let mut n = bb.len();
for (i, b) in bb.iter().enumerate() {
if *b == 0xa {
n = 1 + i;
break;
}
}
buf.advance(n)?;
Ok(n)
}
fn push_lines_to_sinks(lines: &[&[u8]], sinks: &mut [&mut Logsink]) -> Result<(), Error> {
for sink in sinks.iter_mut() {
sink.push_lines(&lines)?;
}
Ok(())
}
fn append_inner(
dir: PathBuf,
total_max: usize,
file_max: usize,
mut stdin: Stdin,
) -> Result<(), Error> {
let mut errout = io::stderr();
let mut sink_all = Logsink::new(
dir.clone(),
logsink::Filter::All,
"all",
total_max,
file_max,
)?;
let mut sink_info = Logsink::new(dir, logsink::Filter::Info, "info", total_max, file_max)?;
let mut sinks = [&mut sink_all, &mut sink_info];
let mut buf = Buffer::new(1024 * 16);
loop {
if buf.free_len() == 0 {
let x = buf_drop_until_newline(&mut buf)?;
write!(&mut errout, "[BUF-FULL-DROP {}]\n", x)?;
}
let bmut = buf.writable();
let nread = stdin.read(bmut)?;
if nread > bmut.len() {
write!(&mut errout, "ERROR read returned more than buffer space\n")?;
return Err(Error::with_msg(
"ERROR read returned more than buffer space",
));
}
buf.adv_wp(nread)?;
if DO_TRACE {
write!(&mut errout, "[READ {:5} HAVE {:5}]\n", nread, buf.len())?;
}
match parse_lines(buf.readable()) {
Ok((lines, n2)) => {
if DO_TRACE {
write!(&mut errout, "[PARSED-LINES {}]\n", lines.len())?;
}
push_lines_to_sinks(&lines, &mut sinks)?;
buf.advance(n2)?;
if buf.len() > LONGLINE_CUT {
write!(&mut errout, "[TRUNCATED]\n")?;
let lines = [buf.readable()[..LONGLINE_CUT].as_ref()];
push_lines_to_sinks(&lines, &mut sinks)?;
let x = buf_drop_until_newline(&mut buf)?;
write!(&mut errout, "[TRUNC-DROP {}]\n", x)?;
}
}
Err(e) => {
write!(&mut errout, "[APPEND-PARSE-ERROR]: {e}\n")?;
return Ok(());
}
}
for sink in sinks.iter_mut() {
sink.flush()?;
}
if nread == 0 {
if buf.len() > 0 {
let lines = [buf.readable()];
push_lines_to_sinks(&lines, &mut sinks)?;
}
break Ok(());
}
}
}
pub fn logappend(
dirname: &str,
total_mb: usize,
file_mb: usize,
stdin: Stdin,
) -> Result<(), Error> {
match append_inner(dirname.into(), total_mb, file_mb, stdin) {
Ok(x) => Ok(x),
Err(e) => {
error!("ERROR from append_inner: {e}");
Err(e)
}
}
}
fn output_append_running<INP: Read>(
dir: PathBuf,
total_max: usize,
file_max: usize,
mut inp: INP,
) -> Result<(), Error> {
let mut sink_all = Logsink::new(
dir.clone(),
logsink::Filter::All,
"stdout",
total_max,
file_max,
)?;
let mut buf = Buffer::new(1024 * 16);
loop {
let bmut = buf.writable();
let nread = inp.read(bmut)?;
if nread > bmut.len() {
error!("ERROR read returned more than buffer space");
return Err(Error::with_msg(
"ERROR read returned more than buffer space",
));
}
buf.adv_wp(nread)?;
sink_all.push_data(buf.readable())?;
buf.reset();
sink_all.flush()?;
if nread == 0 {
break;
}
}
Ok(())
}
fn log_append_running<INP: Read>(
dir: PathBuf,
total_max: usize,
file_max: usize,
mut inp: INP,
) -> Result<(), Error> {
let mut sink_all = Logsink::new(
dir.clone(),
logsink::Filter::All,
"all",
total_max,
file_max,
)?;
let mut sink_info = Logsink::new(dir, logsink::Filter::Info, "info", total_max, file_max)?;
let mut sinks = [&mut sink_all, &mut sink_info];
let mut buf = Buffer::new(1024 * 16);
loop {
if buf.free_len() == 0 {
let x = buf_drop_until_newline(&mut buf)?;
debug!("[BUF-FULL-DROP {}]", x);
}
let bmut = buf.writable();
let nread = inp.read(bmut)?;
if nread > bmut.len() {
error!("ERROR read returned more than buffer space");
return Err(Error::with_msg(
"ERROR read returned more than buffer space",
));
}
buf.adv_wp(nread)?;
trace!("[READ {:5} HAVE {:5}]", nread, buf.len());
match parse_lines(buf.readable()) {
Ok((lines, n2)) => {
trace!("[PARSED-LINES {}]", lines.len());
push_lines_to_sinks(&lines, &mut sinks)?;
buf.advance(n2)?;
if buf.len() > LONGLINE_CUT {
debug!("[TRUNCATED]");
let lines = [buf.readable()[..LONGLINE_CUT].as_ref()];
push_lines_to_sinks(&lines, &mut sinks)?;
let x = buf_drop_until_newline(&mut buf)?;
debug!("[TRUNC-DROP {}]", x);
}
}
Err(e) => {
error!("[APPEND-PARSE-ERROR]: {e}");
return Ok(());
}
}
if nread == 0 {
if buf.len() > 0 {
let lines = [buf.readable()];
push_lines_to_sinks(&lines, &mut sinks)?;
}
break;
}
for sink in sinks.iter_mut() {
sink.flush()?;
}
}
Ok(())
}
pub fn logappend_wrap(
dirname: &str,
total_max: usize,
file_max: usize,
exe: String,
args: Vec<String>,
) -> Result<ExitStatus, Error> {
debug!("exe {exe:?}");
debug!("args {args:?}");
let dir = path::PathBuf::from(dirname);
signals::init();
signals::ignore_signal(libc::SIGINT).unwrap();
signals::ignore_signal(libc::SIGTERM).unwrap();
signals::ignore_signal(libc::SIGHUP).unwrap();
let mut proc = process::Command::new(exe)
.args(args)
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.spawn()
.unwrap();
let chout = proc.stdout.take().unwrap();
let cherr = proc.stderr.take().unwrap();
let count_running = Arc::new(AtomicUsize::new(2));
let jh1 = thread::Builder::new()
.spawn({
let count_running = count_running.clone();
let dir = dir.to_owned();
move || {
output_append_running(dir, total_max, file_max, chout).unwrap();
count_running.fetch_sub(1, Ordering::SeqCst);
}
})
.unwrap();
let jh2 = thread::Builder::new()
.spawn({
let count_running = count_running.clone();
let dir = dir.to_owned();
move || {
log_append_running(dir, total_max, file_max, cherr).unwrap();
count_running.fetch_sub(1, Ordering::SeqCst);
}
})
.unwrap();
let recv = signals::receiver();
let deadline = Instant::now() + Duration::from_millis(30000);
loop {
let h = count_running.load(Ordering::SeqCst);
trace!("msg loop {h}");
let tsnow = Instant::now();
if false && tsnow >= deadline {
debug!("msg timeout break");
break;
}
let _timeout = deadline - tsnow;
let timeout = Duration::from_millis(100);
match recv.recv_timeout(timeout) {
Ok(e) => {
debug!("msg: {e}");
}
Err(_e) => {
if count_running.load(Ordering::SeqCst) == 0 {
break;
}
}
}
}
let ec = proc.wait().unwrap();
jh1.join().unwrap();
jh2.join().unwrap();
Ok(ec)
}
pub fn play_signals() -> Result<(), Error> {
let count_running = Arc::new(AtomicUsize::new(0));
signals::init();
signals::ignore_signal(libc::SIGINT).unwrap();
signals::ignore_signal(libc::SIGTERM).unwrap();
signals::ignore_signal(libc::SIGHUP).unwrap();
let proc = process::Command::new("/bin/bash")
.args(&[
"-c",
"while true; do date; >&2 echo stderr; sleep 0.3; done",
])
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.spawn()
.unwrap();
count_running.fetch_add(2, Ordering::SeqCst);
let mut chout = proc.stdout.unwrap();
let mut cherr = proc.stderr.unwrap();
let jh1 = thread::Builder::new()
.spawn({
let count_running = count_running.clone();
move || {
let mut buf = vec![0; 256];
loop {
debug!("thread-1 read");
let n = chout.read(&mut buf).unwrap();
debug!("thread-1 read done");
if n == 0 {
debug!("thread-1 break");
break;
} else {
signals::sender()
.send(format!(
"thread-1 {}",
std::str::from_utf8(&buf[..n]).unwrap()
))
.unwrap();
}
}
count_running.fetch_sub(1, Ordering::SeqCst);
}
})
.unwrap();
let jh2 = thread::Builder::new()
.spawn({
let count_running = count_running.clone();
move || {
let mut buf = vec![0; 256];
loop {
debug!("thread-2 read");
let n = cherr.read(&mut buf).unwrap();
debug!("thread-2 read done");
if n == 0 {
debug!("thread-2 break");
break;
} else {
signals::sender()
.send(format!(
"thread-2 {}",
std::str::from_utf8(&buf[..n]).unwrap()
))
.unwrap();
}
}
count_running.fetch_sub(1, Ordering::SeqCst);
}
})
.unwrap();
if false {
thread::sleep(Duration::from_millis(1000));
debug!("sending USR1");
unsafe { libc::kill(0, libc::SIGUSR1) };
debug!("sending USR1 done");
}
let recv = signals::receiver();
let deadline = Instant::now() + Duration::from_millis(30000);
loop {
let tsnow = Instant::now();
if tsnow >= deadline {
debug!("msg timeout break");
break;
}
let _timeout = deadline - tsnow;
let timeout = Duration::from_millis(100);
match recv.recv_timeout(timeout) {
Ok(e) => {
debug!("msg: {e}");
}
Err(_e) => {
if count_running.load(Ordering::SeqCst) == 0 {
break;
}
}
}
}
debug!("await join handles");
jh1.join().unwrap();
jh2.join().unwrap();
debug!("DONE");
Ok(())
}