watchlog 1.230.0

A command-line utility to help you see how a log is moving.
use std::io::Write as _;

const ANSII_SAVE: &[u8] = b"\x1B[s";
const ANSII_DIM: &[u8] = b"\x1B[2m";
const ANSII_LINE_CLEAR: &[u8] = b"\x1B[2K";
const ANSII_LINE_CLEAR_END: &[u8] = b"\x1B[K";
const ANSII_RESTORE: &[u8] = b"\x1B[u";
const ANSII_MOVE_UP: &[u8] = b"\x1B[A";
const ANSII_MOVE_COL_1: &[u8] = b"\x1B[G";

fn maybe_duration<'d, D: serde::Deserializer<'d>>(d: D)
	-> Result<Option<Option<std::time::Duration>>, D::Error>
{
	struct V;

	impl<'v> serde::de::Visitor<'v> for V {
		type Value = Option<std::time::Duration>;

		fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
			formatter.write_str("expecting duration or \"never\"")
		}

		fn visit_str<E: serde::de::Error>(self, s: &str) -> Result<Self::Value, E> {
			if s == "never" {
				Ok(None)
			} else {
				humanize_rs::duration::parse(s)
					.map(Some)
					.map_err(|_| serde::de::Error::invalid_value(serde::de::Unexpected::Str(s), &self))
			}
		}
	}

	d.deserialize_str(V).map(Some)
}

fn write_elapsed(
	mut buf: impl std::io::Write,
	duration: std::time::Duration)
-> std::io::Result<()>
{
	write!(buf, "Last output ")?;
	write_duration(&mut buf, duration)?;
	write!(&mut buf, " ago.")
}

fn write_duration(
	mut buf: impl std::io::Write,
	duration: std::time::Duration)
-> std::io::Result<()>
{
	let mut sec = duration.as_secs();
	if duration.as_secs() >= 24*3600 {
		let days = sec / 3600 / 24;
		sec -= days * 24*3600;
		write!(buf, "{}d ", days)?;
	}
	if duration.as_secs() >= 3600 {
		let hours = sec / 3600;
		sec -= hours * 3600;
		write!(buf, "{:2}h ", hours)?;
	}
	if duration.as_secs() >= 60 {
		let min = sec / 60;
		sec -= min * 60;
		write!(buf, "{:2}min ", min)?;
	}
	write!(buf, "{:2}s", sec)
}

#[test]
fn test_format_duration() {
	let mut buf = Vec::new();

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(0)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 0s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(1)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 1s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(59)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), "59s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(60)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 1min  0s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(61)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 1min  1s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(119)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 1min 59s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(3662)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), " 1h  1min  2s");

	buf.clear();
	write_duration(&mut buf, std::time::Duration::from_secs(76*3600 + 18)).unwrap();
	assert_eq!(std::str::from_utf8(&buf).unwrap(), "3d  4h  0min 18s");
}

#[derive(Default,Debug,serde_derive::Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all="kebab-case")]
struct Config {
	/// Enable watchlog when there is no output for this long.
	#[serde(default)]
	#[serde(with="serde_humanize_rs")]
	delay: Option<std::time::Duration>,

	/// Insert a timestamp into the output after a delay of at least this long.
	///
	/// The literal string "never" can be used to disable this feature.
	///
	/// Note that the timestamp is never inserted after breaks which occur mid-line.
	#[serde(default)]
	#[serde(deserialize_with="maybe_duration")]
	permanent_delay: Option<Option<std::time::Duration>>,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
	assert_eq!(std::env::args().len(), 1, "No argument are accepted. Pipe into stdin.");

	let paths = standard_paths::default_paths!()
		.locate_all(
			standard_paths::LocationType::AppConfigLocation,
			"config.scfg",
			standard_paths::LocateOption::LocateFile)?;

	let mut cfg = Config::default();
	for path in paths.into_iter().flatten() {
		let Config{delay, permanent_delay} = simple_config::from_file(path)?;
		cfg.delay = cfg.delay.or(delay);
		cfg.permanent_delay = cfg.permanent_delay.or(permanent_delay);
	}

	let Config{delay, permanent_delay} = cfg;
	let delay = delay.unwrap_or(std::time::Duration::from_secs(3));
	let permanent_delay = permanent_delay.unwrap_or(Some(std::time::Duration::from_secs(10)));

	// Note that we can't use Rust's stdin framework because it does internal buffering which is invisible to us. This means that we have no way to check if data is ready. Poll may say that nothing is ready because Rust has buffered it for us but we can't check that buffer without risk of blocking. Therefore we just lock stdin, take it's fd, and use that directly.
	let stdin = std::io::stdin();
	let stdin = stdin.lock();
	let stdin_fd = std::os::unix::io::AsRawFd::as_raw_fd(&stdin);

	let stdout = std::io::stdout();
	let mut stdout = stdout.lock();

	let mut pollfd = [
		nix::poll::PollFd::new(&stdin, nix::poll::PollFlags::POLLIN),
	];

	let mut buf = std::io::Cursor::new([0; 16*1024]);

	loop {
		let pos: usize = buf.position().try_into().unwrap();
		let size = nix::unistd::read(stdin_fd, &mut buf.get_mut()[pos..])?;
		let pos = pos + size;
		stdout.write_all(&buf.get_ref()[..pos])?;
		if size == 0 {
			return Ok(()) // EOF
		}
		stdout.flush()?;
		let new_line = buf.get_ref()[pos-1] == b'\n';
		buf.set_position(0);
		let last_read = std::time::Instant::now();

		let mut target_time = delay;

		nix::poll::poll(&mut pollfd, target_time.as_millis() as i32)?;
		if pollfd[0].revents() == Some(nix::poll::PollFlags::POLLIN) ||
			pollfd[0].revents() == Some(nix::poll::PollFlags::POLLHUP) {
			continue
		} else if pollfd[0].revents() != Some(nix::poll::PollFlags::empty()) {
			return Err(string_error::into_err(
				format!("Unexpected events in poll {:?}", pollfd[0].revents())))
		}

		buf.write_all(ANSII_SAVE).unwrap();
		buf.write_all(ANSII_DIM).unwrap();
		if !new_line {
			buf.write_all("\n".as_bytes()).unwrap();
		}
		write_elapsed(&mut buf, last_read.elapsed())?;
		let pos: usize = buf.position().try_into().unwrap();
		stdout.write_all(&buf.get_ref()[..pos])?;
		stdout.flush()?;
		buf.set_position(0);

		loop {
			target_time += std::time::Duration::from_secs(1);
			let elapsed = last_read.elapsed();
			let wait = target_time - elapsed.min(target_time);

			nix::poll::poll(&mut pollfd, wait.as_millis() as i32)?;
			if pollfd[0].revents() == Some(nix::poll::PollFlags::POLLIN) ||
				pollfd[0].revents() == Some(nix::poll::PollFlags::POLLHUP) {
				if permanent_delay.is_some()
					&& new_line
					&& last_read.elapsed() > permanent_delay.unwrap()
				{
					buf.write_all(ANSII_MOVE_COL_1).unwrap();
					buf.write_all(ANSII_LINE_CLEAR_END).unwrap();
					write!(&mut buf,
						"\t{}\n",
						chrono::offset::Local::now().format("%F %R:%S"))?;
				} else {
					buf.write_all(ANSII_LINE_CLEAR).unwrap();
				}
				buf.write_all(ANSII_RESTORE).unwrap();
				if !new_line {
					buf.write_all(ANSII_MOVE_UP).unwrap();
					buf.write_all(ANSII_LINE_CLEAR_END).unwrap();
				}
				break
			} else if pollfd[0].revents() != Some(nix::poll::PollFlags::empty()) {
				return Err(string_error::into_err(
					format!("Unexpected events in poll {:?}", pollfd[0].revents())))
			}

			buf.write_all(ANSII_MOVE_COL_1).unwrap();
			buf.write_all(ANSII_LINE_CLEAR_END).unwrap();
			write_elapsed(&mut buf, last_read.elapsed())?;
			let pos: usize = buf.position().try_into().unwrap();
			stdout.write_all(&buf.get_ref()[..pos])?;
			stdout.flush()?;
			buf.set_position(0);
		}
	}
}