jtracing 0.1.7

Tracing utilites.
#[allow(unused)]
use {
    crate::{trace_top_dir, writeln_str_file},
    anyhow::{Error, Result},
    jlogger::{jdebug, jerror, jinfo, jwarn, JloggerBuilder},
    log::{debug, error, info, warn, LevelFilter},
    std::path::Path,
    tokio::{
        fs::File,
        io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, BufReader},
        sync::mpsc::{self, Receiver, Sender},
        task::JoinHandle,
    },
};

pub struct TraceLog {
    thread_handler: Option<JoinHandle<Result<()>>>,
    receiver: Receiver<String>,
    ctrl_sender: Sender<bool>,
}

async fn read_oneline(
    sender: Sender<String>,
    mut reader: BufReader<File>,
    mut ctrl: Receiver<bool>,
) -> Result<()> {
    loop {
        let mut buf = String::new();

        tokio::select! {
            Ok(_) = reader.read_line(&mut buf) => {
                sender.send(buf).await?;
            },

            _ret = ctrl.recv() => {
                break;
            }

        }
    }

    Ok(())
}

impl TraceLog {
    pub async fn new() -> Result<Self> {
        let mut trace_pipe = trace_top_dir().await?;
        trace_pipe.push_str("/trace_pipe");

        let trace_pipe_path = Path::new(&trace_pipe);
        if trace_pipe_path.is_file() {
            let file = File::open(trace_pipe_path).await?;
            let reader = BufReader::new(file);
            let (sender, receiver) = mpsc::channel::<String>(100);
            let (ctrl_sender, ctrl_receiver) = mpsc::channel::<bool>(100);

            let thread_handler =
                tokio::spawn(async move { read_oneline(sender, reader, ctrl_receiver).await });

            return Ok(TraceLog {
                thread_handler: Some(thread_handler),
                receiver,
                ctrl_sender,
            });
        }

        Err(Error::msg("trace_pipe not found"))
    }

    pub async fn trace_print(&mut self) -> Result<String> {
        let log = self
            .receiver
            .recv()
            .await
            .ok_or_else(|| Error::msg("nodata"))?;
        Ok(log)
    }

    pub async fn trace_fields(&mut self) -> Result<(String, u32, u8, String, f32, String)> {
        let task;
        let pid;
        let mut msg = String::new();

        let log = self
            .receiver
            .recv()
            .await
            .ok_or_else(|| Error::msg("Trace terminated"))?;

        let mut to_process = log.as_str();
        let errlog = || Error::msg(format!("Invalid log string : {}", &log));
        let mut spliter = to_process.find('[').ok_or_else(errlog)?;

        {
            let task_pid_str = &to_process[..spliter];
            jdebug!("task_pid_str: {}", task_pid_str);
            let tspliter = task_pid_str.rfind('-').ok_or_else(errlog)?;
            task = String::from(task_pid_str[..tspliter].trim());
            pid = task_pid_str[tspliter + 1..].trim().parse()?;
        }

        to_process = &to_process[spliter + 1..];
        jdebug!("log str: {}", to_process);
        spliter = to_process.find("] ").ok_or_else(errlog)?;
        let cpu = to_process[..spliter].parse()?;

        to_process = &to_process[spliter + 2..];
        spliter = to_process.find(' ').ok_or_else(errlog)?;
        let flag = String::from(to_process[..spliter].trim());

        to_process = &to_process[spliter + 1..];
        spliter = to_process.find(':').ok_or_else(errlog)?;
        let ts = to_process[..spliter].trim().parse()?;

        to_process = &to_process[spliter + 1..];
        msg.push_str(to_process.trim());

        Ok((
            task,
            pid,
            cpu,
            flag,
            ts,
            msg.trim_start_matches("0:")
                .trim_end_matches('\n')
                .trim()
                .to_owned(),
        ))
    }

    pub async fn terminate(mut self) {
        if self.ctrl_sender.send(false).await.is_ok() {
            if let Some(handle) = self.thread_handler.take() {
                let _ = handle.await;
            }
        }
    }
}