1use crate::config::Command;
2use crate::config::command;
3use crate::config::command::utils::normalized_command::NormalizedCommand;
4use crate::errors::Result;
5use std::path::{Path, PathBuf};
6use tokio::{
7 fs,
8 io::{AsyncBufReadExt, BufReader},
9 sync::mpsc,
10};
11
12pub struct Pipe {
13 pipe_file: PathBuf,
14 rx: mpsc::UnboundedReceiver<NormalizedCommand>,
15}
16
17impl Drop for Pipe {
18 fn drop(&mut self) {
19 use std::os::unix::fs::OpenOptionsExt;
20 self.rx.close();
21
22 let _unplock_pending_open = std::fs::OpenOptions::new()
25 .write(true)
26 .custom_flags(nix::fcntl::OFlag::O_NONBLOCK.bits())
27 .open(self.pipe_file.clone());
28 }
29}
30
31impl Pipe {
32 pub async fn new(pipe_file: PathBuf) -> Result<Self> {
38 let _pipe_reset = fs::remove_file(pipe_file.as_path()).await;
39 nix::unistd::mkfifo(&pipe_file, nix::sys::stat::Mode::S_IRWXU)?;
40
41 let path = pipe_file.clone();
42 let (tx, rx) = mpsc::unbounded_channel();
43 tokio::spawn(async move {
44 while !tx.is_closed() {
45 read_from_pipe(&path, &tx).await;
46 }
47 fs::remove_file(path).await.ok();
48 });
49
50 Ok(Self { pipe_file, rx })
51 }
52
53 #[must_use]
54 pub fn pipe_name() -> PathBuf {
55 let display = std::env::var("DISPLAY")
56 .ok()
57 .and_then(|d| d.rsplit_once(':').map(|(_, r)| r.to_owned()))
58 .unwrap_or_else(|| "0".to_string());
59
60 PathBuf::from(format!("command-{display}.pipe"))
61 }
62
63 pub async fn get_next_command(&mut self) -> Option<Box<dyn Command>> {
64 if let Some(normalized_command) = self.rx.recv().await {
65 return command::denormalize(&normalized_command).ok();
66 }
67 None
68 }
69}
70
71async fn read_from_pipe(pipe_file: &Path, tx: &mpsc::UnboundedSender<NormalizedCommand>) {
72 if let Ok(file) = fs::File::open(pipe_file).await {
73 let mut lines = BufReader::new(file).lines();
74
75 while let Ok(line) = lines.next_line().await {
76 if let Some(content) = line
77 && let Ok(normalized_command) = NormalizedCommand::try_from(content)
78 && command::denormalize(&normalized_command.clone()).is_ok()
79 && let Err(err) = tx.send(normalized_command)
80 {
81 tracing::error!("{}", err);
82 }
83 }
84 }
85}