giallo-kak 0.2.2

Kakoune syntax highlighter using TextMate grammars
use std::fs;
use std::io;
use std::os::fd::FromRawFd;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread;

use giallo::Registry;
use log;

use crate::config::Config;
use crate::highlight::{highlight_and_send, BufferContext};

pub fn create_fifo(path: &Path) -> io::Result<()> {
    let c_path = std::ffi::CString::new(path.as_os_str().to_string_lossy().as_bytes())
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid fifo path"))?;
    let ret = unsafe { libc::mkfifo(c_path.as_ptr(), 0o644) };
    if ret != 0 {
        return Err(io::Error::last_os_error());
    }
    Ok(())
}

pub fn open_fifo_nonblocking(path: &Path) -> io::Result<std::fs::File> {
    let c_path = std::ffi::CString::new(path.as_os_str().to_string_lossy().as_bytes())
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid fifo path"))?;

    let fd = unsafe { libc::open(c_path.as_ptr(), libc::O_RDONLY | libc::O_NONBLOCK, 0o644) };

    if fd < 0 {
        return Err(io::Error::last_os_error());
    }

    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
    if flags < 0 {
        unsafe { libc::close(fd) };
        return Err(io::Error::last_os_error());
    }

    let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK) };
    if ret < 0 {
        unsafe { libc::close(fd) };
        return Err(io::Error::last_os_error());
    }

    let file = unsafe { std::fs::File::from_raw_fd(fd) };
    Ok(file)
}

pub fn handle_init(token: &str, base_dir: &Path) -> io::Result<(std::path::PathBuf, String)> {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};

    fs::create_dir_all(base_dir)?;
    let mut hasher = DefaultHasher::new();
    token.hash(&mut hasher);
    let hash = format!("{:x}", hasher.finish());
    let req = base_dir.join(format!("{hash}.req.fifo"));
    let sentinel = format!("giallo-{hash}");

    if !req.exists() {
        create_fifo(&req)?;
    }

    Ok((req, sentinel))
}

pub fn run_buffer_fifo(
    req_path: &Path,
    registry: &Registry,
    config: &Config,
    ctx: BufferContext,
    quit_flag: Option<&Arc<AtomicBool>>,
) -> io::Result<()> {
    log::debug!(
        "buffer FIFO: starting for buffer={} sentinel={}",
        ctx.buffer,
        ctx.sentinel
    );

    let (tx, rx): (Sender<String>, Receiver<String>) = channel();

    let ctx_clone = ctx.clone();
    let quit_flag_clone = quit_flag.map(|f| f.clone());
    let req_path_owned = req_path.to_path_buf();

    let reader_handle = thread::spawn(move || {
        let mut buf = String::new();
        let sentinel = ctx_clone.sentinel.clone();

        let mut file = match open_fifo_nonblocking(&req_path_owned) {
            Ok(f) => f,
            Err(err) => {
                log::error!("reader: failed to open FIFO: {}", err);
                return;
            }
        };

        loop {
            if let Some(ref flag) = quit_flag_clone {
                if flag.load(Ordering::Relaxed) {
                    break;
                }
            }

            let mut read_buf = String::new();
            match std::io::Read::read_to_string(&mut file, &mut read_buf) {
                Ok(0) => {
                    thread::sleep(std::time::Duration::from_millis(5));
                    continue;
                }
                Ok(_) => {
                    buf.push_str(&read_buf);
                }
                Err(err) => {
                    if err.kind() == io::ErrorKind::WouldBlock {
                        thread::sleep(std::time::Duration::from_millis(5));
                        continue;
                    } else {
                        log::warn!("reader: read error: {}", err);
                        thread::sleep(std::time::Duration::from_millis(50));
                        continue;
                    }
                }
            }

            while let Some(index) = buf.find(&sentinel) {
                let content = buf[..index].to_string();
                let end_index = index + sentinel.len();
                buf.drain(..end_index);

                if tx.send(content).is_err() {
                    log::debug!("reader: channel closed, exiting");
                    return;
                }
            }
        }
    });

    loop {
        if let Some(flag) = quit_flag {
            if flag.load(Ordering::Relaxed) {
                drop(rx);
                let _ = reader_handle.join();
                break;
            }
        }

        match rx.recv_timeout(std::time::Duration::from_millis(100)) {
            Ok(content) => {
                let lang = ctx.lang.lock().unwrap().clone();
                let theme = ctx.theme.lock().unwrap().clone();

                log::debug!(
                    "processor: received buffer (lang={} theme={} len={})",
                    lang,
                    theme,
                    content.len()
                );

                if !lang.is_empty() {
                    highlight_and_send(&content, &lang, &theme, registry, config, &ctx);
                } else {
                    log::warn!(
                        "processor: empty language, skipping highlight for buffer={}",
                        ctx.buffer
                    );
                }
            }
            Err(_) => {
                continue;
            }
        }
    }

    log::debug!("buffer FIFO: exiting for buffer={}", ctx.buffer);
    Ok(())
}