pass_it_on/interfaces/
pipe.rs

1//! Pipe [`Interface`] and [`InterfaceConfig`]  implementation
2//!
3//! # Server Configuration Example
4//! ```toml
5//! [[server.interface]]
6//! type = "pipe"
7//! path = '/path/to/pipe.fifo'
8//! group_read_permission = true
9//! ```
10//!
11//! # Client Configuration Example
12//! ```toml
13//! [[client.interface]]
14//! type = "pipe"
15//! path = '/path/to/pipe.fifo'
16//! group_read_permission = true
17//! group_write_permission = true
18//! ```
19
20#[cfg(feature = "pipe-client")]
21pub(crate) mod pipe_client;
22#[cfg(feature = "pipe-server")]
23pub(crate) mod pipe_server;
24
25use crate::interfaces::{Interface, InterfaceConfig};
26use crate::notifications::Notification;
27use crate::Error;
28use async_trait::async_trait;
29#[cfg(feature = "pipe-server")]
30use nix::sys::stat::Mode;
31use serde::Deserialize;
32#[cfg(feature = "pipe-server")]
33use std::path::Path;
34use std::path::PathBuf;
35use nix::fcntl::AT_FDCWD;
36use tokio::sync::mpsc::Sender;
37use tokio::sync::{broadcast, watch};
38
39/// Data structure to represent the Named Pipe [`Interface`].
40#[derive(Debug, Clone)]
41pub struct PipeInterface {
42    path: PathBuf,
43    group_read: bool,
44    group_write: bool,
45    other_read: bool,
46    other_write: bool,
47}
48
49/// Data structure to represent the Named Pipe [`InterfaceConfig`].
50#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)]
51pub(crate) struct PipeConfigFile {
52    path: String,
53    group_read_permission: Option<bool>,
54    group_write_permission: Option<bool>,
55    other_read_permission: Option<bool>,
56    other_write_permission: Option<bool>,
57}
58
59impl PipeInterface {
60    /// Create a new `PipeInterface`.
61    pub fn new(path: &str, group_read: bool, group_write: bool, other_read: bool, other_write: bool) -> Self {
62        let path = PathBuf::from(path);
63        Self { path, group_read, group_write, other_read, other_write }
64    }
65
66    /// Return the pipe file path.
67    pub fn path(&self) -> &PathBuf {
68        &self.path
69    }
70
71    /// Group read permission set.
72    pub fn group_read(&self) -> bool {
73        self.group_read
74    }
75
76    /// Group write permission set.
77    pub fn group_write(&self) -> bool {
78        self.group_write
79    }
80
81    /// Other read permission set.
82    pub fn other_read(&self) -> bool {
83        self.other_read
84    }
85
86    /// Other write permission set.
87    pub fn other_write(&self) -> bool {
88        self.other_write
89    }
90}
91
92impl TryFrom<&PipeConfigFile> for PipeInterface {
93    type Error = Error;
94
95    fn try_from(value: &PipeConfigFile) -> Result<Self, Self::Error> {
96        if value.path.is_empty() {
97            return Err(Error::InvalidInterfaceConfiguration("Pipe path is empty".to_string()));
98        }
99
100        Ok(Self {
101            path: PathBuf::from(value.path.as_str()),
102            group_read: value.group_read_permission.unwrap_or(false),
103            group_write: value.group_write_permission.unwrap_or(false),
104            other_read: value.other_read_permission.unwrap_or(false),
105            other_write: value.other_write_permission.unwrap_or(false),
106        })
107    }
108}
109
110#[typetag::deserialize(name = "pipe")]
111impl InterfaceConfig for PipeConfigFile {
112    fn to_interface(&self) -> Result<Box<dyn Interface + Send>, Error> {
113        Ok(Box::new(PipeInterface::try_from(self)?))
114    }
115}
116
117#[async_trait]
118impl Interface for PipeInterface {
119    #[cfg(feature = "pipe-server")]
120    async fn receive(&self, interface_tx: Sender<String>, shutdown: watch::Receiver<bool>) -> Result<(), Error> {
121        use crate::interfaces::pipe::pipe_server::read_pipe;
122        use tracing::info;
123
124        const USER_RWX: Mode = Mode::S_IRWXU;
125        const GROUP_READ: Mode = Mode::S_IRGRP;
126        const GROUP_WRITE: Mode = Mode::S_IWGRP;
127        const OTHER_READ: Mode = Mode::S_IROTH;
128        const OTHER_WRITE: Mode = Mode::S_IWOTH;
129
130        let path = self.path().clone();
131        let pipe_permissions = {
132            let mut permissions = vec![USER_RWX];
133            if self.group_read() {
134                permissions.push(GROUP_READ);
135            }
136
137            if self.group_write() {
138                permissions.push(GROUP_WRITE);
139            }
140
141            if self.other_read() {
142                permissions.push(OTHER_READ);
143            }
144
145            if self.other_write() {
146                permissions.push(OTHER_WRITE);
147            }
148
149            create_permissions(permissions)
150        };
151
152        tokio::spawn(async move {
153            if !path.exists() {
154                create_pipe(&path, pipe_permissions)?
155            }
156            info!("Setting up Interface: Pipe on -> {}", &path.to_str().unwrap_or_default());
157            read_pipe(&path, interface_tx, shutdown).await
158        });
159        Ok(())
160    }
161
162    #[cfg(not(feature = "pipe-server"))]
163    async fn receive(&self, _interface_tx: Sender<String>, _shutdown: watch::Receiver<bool>) -> Result<(), Error> {
164        Err(Error::DisabledInterfaceFeature("pipe-server".to_string()))
165    }
166
167    #[cfg(feature = "pipe-client")]
168    async fn send(
169        &self,
170        interface_tx: broadcast::Receiver<Notification>,
171        shutdown: watch::Receiver<bool>,
172    ) -> Result<(), Error> {
173        use crate::interfaces::pipe::pipe_client::write_pipe;
174        use tracing::error;
175
176        let path = self.path.clone();
177        tokio::spawn(async move {
178            match write_pipe(path, interface_tx, shutdown).await {
179                Ok(_) => (),
180                Err(error) => error!("Pipe write error {}", error),
181            }
182        });
183        Ok(())
184    }
185
186    #[cfg(not(feature = "pipe-client"))]
187    async fn send(
188        &self,
189        _interface_rx: broadcast::Receiver<Notification>,
190        _shutdown: watch::Receiver<bool>,
191    ) -> Result<(), Error> {
192        Err(Error::DisabledInterfaceFeature("pipe-client".to_string()))
193    }
194}
195
196#[cfg(feature = "pipe-server")]
197fn create_pipe<P: AsRef<Path>>(path: P, permissions: Mode) -> Result<(), Error> {
198    match nix::unistd::mkfifo(path.as_ref(), permissions) {
199        Err(e) => Err(Error::NixErrorNoError(e)),
200        Ok(_) => set_permissions(path, permissions),
201    }
202}
203
204#[cfg(feature = "pipe-server")]
205fn create_permissions(permissions: Vec<Mode>) -> Mode {
206    let mut set_permission = Mode::empty();
207    for permission in permissions {
208        set_permission.insert(permission)
209    }
210    if set_permission.is_empty() {
211        set_permission.insert(Mode::S_IRWXU)
212    }
213    set_permission
214}
215
216#[cfg(feature = "pipe-server")]
217fn set_permissions<P: AsRef<Path>>(path: P, permissions: Mode) -> Result<(), Error> {
218    use nix::sys::stat::FchmodatFlags;
219    nix::sys::stat::fchmodat(AT_FDCWD, path.as_ref(), permissions, FchmodatFlags::NoFollowSymlink)?;
220    Ok(())
221}
222
223#[cfg(feature = "pipe-server")]
224async fn cleanup_pipe<P: AsRef<Path>>(path: P) -> Result<(), Error> {
225    std::fs::remove_file(path)?;
226    Ok(())
227}