Skip to main content

lefthk_core/
ipc.rs

1use crate::config::Command;
2use crate::config::command;
3use crate::config::command::utils::normalized_command::NormalizedCommand;
4use crate::errors::Result;
5use std::path::{Path, PathBuf};
6use tokio::{
7    fs,
8    io::{AsyncBufReadExt, BufReader},
9    sync::mpsc,
10};
11
12pub struct Pipe {
13    pipe_file: PathBuf,
14    rx: mpsc::UnboundedReceiver<NormalizedCommand>,
15}
16
17impl Drop for Pipe {
18    fn drop(&mut self) {
19        use std::os::unix::fs::OpenOptionsExt;
20        self.rx.close();
21
22        // Open fifo for write to unblock pending open for read operation that prevents tokio runtime
23        // from shutting down.
24        let _unplock_pending_open = std::fs::OpenOptions::new()
25            .write(true)
26            .custom_flags(nix::fcntl::OFlag::O_NONBLOCK.bits())
27            .open(self.pipe_file.clone());
28    }
29}
30
31impl Pipe {
32    /// Create and listen to the named pipe.
33    /// # Errors
34    ///
35    /// Will error if unable to `mkfifo`, likely a filesystem issue
36    /// such as inadequate permissions.
37    pub async fn new(pipe_file: PathBuf) -> Result<Self> {
38        let _pipe_reset = fs::remove_file(pipe_file.as_path()).await;
39        nix::unistd::mkfifo(&pipe_file, nix::sys::stat::Mode::S_IRWXU)?;
40
41        let path = pipe_file.clone();
42        let (tx, rx) = mpsc::unbounded_channel();
43        tokio::spawn(async move {
44            while !tx.is_closed() {
45                read_from_pipe(&path, &tx).await;
46            }
47            fs::remove_file(path).await.ok();
48        });
49
50        Ok(Self { pipe_file, rx })
51    }
52
53    #[must_use]
54    pub fn pipe_name() -> PathBuf {
55        let display = std::env::var("DISPLAY")
56            .ok()
57            .and_then(|d| d.rsplit_once(':').map(|(_, r)| r.to_owned()))
58            .unwrap_or_else(|| "0".to_string());
59
60        PathBuf::from(format!("command-{display}.pipe"))
61    }
62
63    pub async fn get_next_command(&mut self) -> Option<Box<dyn Command>> {
64        if let Some(normalized_command) = self.rx.recv().await {
65            return command::denormalize(&normalized_command).ok();
66        }
67        None
68    }
69}
70
71async fn read_from_pipe(pipe_file: &Path, tx: &mpsc::UnboundedSender<NormalizedCommand>) {
72    if let Ok(file) = fs::File::open(pipe_file).await {
73        let mut lines = BufReader::new(file).lines();
74
75        while let Ok(line) = lines.next_line().await {
76            if let Some(content) = line
77                && let Ok(normalized_command) = NormalizedCommand::try_from(content)
78                && command::denormalize(&normalized_command.clone()).is_ok()
79                && let Err(err) = tx.send(normalized_command)
80            {
81                tracing::error!("{}", err);
82            }
83        }
84    }
85}