use std::fs::{read_dir, remove_file, File};
use std::io::{self, prelude::*, Read, SeekFrom};
use std::path::{Path, PathBuf};
use log::error;
use rev_buf_reader::RevBufReader;
use snap::write::FrameEncoder;
use crate::error::Error;
pub fn get_log_path(task_id: usize, path: &Path) -> PathBuf {
let task_log_dir = path.join("task_logs");
task_log_dir.join(format!("{task_id}.log"))
}
pub fn create_log_file_handles(task_id: usize, path: &Path) -> Result<(File, File), Error> {
let log_path = get_log_path(task_id, path);
let stdout_handle = File::create(&log_path)
.map_err(|err| Error::IoPathError(log_path, "getting stdout handle", err))?;
let stderr_handle = stdout_handle
.try_clone()
.map_err(|err| Error::IoError("cloning stderr handle".to_string(), err))?;
Ok((stdout_handle, stderr_handle))
}
pub fn get_log_file_handle(task_id: usize, path: &Path) -> Result<File, Error> {
let path = get_log_path(task_id, path);
let handle = File::open(&path)
.map_err(|err| Error::IoPathError(path, "getting log file handle", err))?;
Ok(handle)
}
pub fn clean_log_handles(task_id: usize, path: &Path) {
let path = get_log_path(task_id, path);
if path.exists() {
if let Err(err) = remove_file(path) {
error!("Failed to remove stdout file for task {task_id} with error {err:?}");
};
}
}
pub fn read_and_compress_log_file(
task_id: usize,
path: &Path,
lines: Option<usize>,
) -> Result<(Vec<u8>, bool), Error> {
let mut file = get_log_file_handle(task_id, path)?;
let mut content = Vec::new();
let mut output_complete = true;
if let Some(lines) = lines {
output_complete = seek_to_last_lines(&mut file, lines)?;
}
{
let mut compressor = FrameEncoder::new(&mut content);
io::copy(&mut file, &mut compressor)
.map_err(|err| Error::IoError("compressing log output".to_string(), err))?;
}
Ok((content, output_complete))
}
pub fn read_last_log_file_lines(
task_id: usize,
path: &Path,
lines: usize,
) -> Result<String, Error> {
let mut file = get_log_file_handle(task_id, path)?;
Ok(read_last_lines(&mut file, lines))
}
pub fn reset_task_log_directory(path: &Path) -> Result<(), Error> {
let task_log_dir = path.join("task_logs");
let files = read_dir(&task_log_dir)
.map_err(|err| Error::IoPathError(task_log_dir, "reading task log files", err))?;
for file in files.flatten() {
if let Err(err) = remove_file(file.path()) {
error!("Failed to delete log file: {err}");
}
}
Ok(())
}
#[allow(clippy::needless_collect)]
pub fn read_last_lines(file: &mut File, amount: usize) -> String {
let reader = RevBufReader::new(file);
let lines: Vec<String> = reader
.lines()
.take(amount)
.map(|line| line.unwrap_or_else(|_| "Pueue: Failed to read line.".to_string()))
.collect();
lines.into_iter().rev().collect::<Vec<String>>().join("\n")
}
pub fn seek_to_last_lines(file: &mut File, amount: usize) -> Result<bool, Error> {
let mut reader = RevBufReader::new(file);
let start_position = reader
.get_mut()
.seek(SeekFrom::Current(0))
.map_err(|err| Error::IoError("seeking to start of file".to_string(), err))?;
let start_position: i64 = start_position.try_into().map_err(|_| {
Error::Generic("Failed to convert start cursor position to i64".to_string())
})?;
let mut total_read_bytes: i64 = 0;
let mut found_lines = 0;
'outer: loop {
let mut buffer = vec![0; 4096];
let read_bytes = reader
.read(&mut buffer)
.map_err(|err| Error::IoError("reading next log chunk".to_string(), err))?;
if read_bytes == 0 {
return Ok(true);
}
for byte in buffer[0..read_bytes].iter().rev() {
total_read_bytes += 1;
if *byte != b'\n' {
continue;
}
found_lines += 1;
if found_lines != amount + 1 {
continue;
}
let distance_to_file_start = start_position - total_read_bytes + 1;
let distance_to_file_start: u64 = distance_to_file_start.try_into().unwrap_or(0);
if distance_to_file_start < start_position.try_into().unwrap() {
let file = reader.get_mut();
file.seek(SeekFrom::Start(distance_to_file_start))
.map_err(|err| {
Error::IoError("seeking to correct position".to_string(), err)
})?;
}
break 'outer;
}
}
Ok(false)
}