pass_it_on/interfaces/
pipe.rs1#[cfg(feature = "pipe-client")]
21pub(crate) mod pipe_client;
22#[cfg(feature = "pipe-server")]
23pub(crate) mod pipe_server;
24
25use crate::interfaces::{Interface, InterfaceConfig};
26use crate::notifications::Notification;
27use crate::Error;
28use async_trait::async_trait;
29#[cfg(feature = "pipe-server")]
30use nix::sys::stat::Mode;
31use serde::Deserialize;
32#[cfg(feature = "pipe-server")]
33use std::path::Path;
34use std::path::PathBuf;
35use nix::fcntl::AT_FDCWD;
36use tokio::sync::mpsc::Sender;
37use tokio::sync::{broadcast, watch};
38
39#[derive(Debug, Clone)]
41pub struct PipeInterface {
42 path: PathBuf,
43 group_read: bool,
44 group_write: bool,
45 other_read: bool,
46 other_write: bool,
47}
48
49#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)]
51pub(crate) struct PipeConfigFile {
52 path: String,
53 group_read_permission: Option<bool>,
54 group_write_permission: Option<bool>,
55 other_read_permission: Option<bool>,
56 other_write_permission: Option<bool>,
57}
58
59impl PipeInterface {
60 pub fn new(path: &str, group_read: bool, group_write: bool, other_read: bool, other_write: bool) -> Self {
62 let path = PathBuf::from(path);
63 Self { path, group_read, group_write, other_read, other_write }
64 }
65
66 pub fn path(&self) -> &PathBuf {
68 &self.path
69 }
70
71 pub fn group_read(&self) -> bool {
73 self.group_read
74 }
75
76 pub fn group_write(&self) -> bool {
78 self.group_write
79 }
80
81 pub fn other_read(&self) -> bool {
83 self.other_read
84 }
85
86 pub fn other_write(&self) -> bool {
88 self.other_write
89 }
90}
91
92impl TryFrom<&PipeConfigFile> for PipeInterface {
93 type Error = Error;
94
95 fn try_from(value: &PipeConfigFile) -> Result<Self, Self::Error> {
96 if value.path.is_empty() {
97 return Err(Error::InvalidInterfaceConfiguration("Pipe path is empty".to_string()));
98 }
99
100 Ok(Self {
101 path: PathBuf::from(value.path.as_str()),
102 group_read: value.group_read_permission.unwrap_or(false),
103 group_write: value.group_write_permission.unwrap_or(false),
104 other_read: value.other_read_permission.unwrap_or(false),
105 other_write: value.other_write_permission.unwrap_or(false),
106 })
107 }
108}
109
110#[typetag::deserialize(name = "pipe")]
111impl InterfaceConfig for PipeConfigFile {
112 fn to_interface(&self) -> Result<Box<dyn Interface + Send>, Error> {
113 Ok(Box::new(PipeInterface::try_from(self)?))
114 }
115}
116
117#[async_trait]
118impl Interface for PipeInterface {
119 #[cfg(feature = "pipe-server")]
120 async fn receive(&self, interface_tx: Sender<String>, shutdown: watch::Receiver<bool>) -> Result<(), Error> {
121 use crate::interfaces::pipe::pipe_server::read_pipe;
122 use tracing::info;
123
124 const USER_RWX: Mode = Mode::S_IRWXU;
125 const GROUP_READ: Mode = Mode::S_IRGRP;
126 const GROUP_WRITE: Mode = Mode::S_IWGRP;
127 const OTHER_READ: Mode = Mode::S_IROTH;
128 const OTHER_WRITE: Mode = Mode::S_IWOTH;
129
130 let path = self.path().clone();
131 let pipe_permissions = {
132 let mut permissions = vec![USER_RWX];
133 if self.group_read() {
134 permissions.push(GROUP_READ);
135 }
136
137 if self.group_write() {
138 permissions.push(GROUP_WRITE);
139 }
140
141 if self.other_read() {
142 permissions.push(OTHER_READ);
143 }
144
145 if self.other_write() {
146 permissions.push(OTHER_WRITE);
147 }
148
149 create_permissions(permissions)
150 };
151
152 tokio::spawn(async move {
153 if !path.exists() {
154 create_pipe(&path, pipe_permissions)?
155 }
156 info!("Setting up Interface: Pipe on -> {}", &path.to_str().unwrap_or_default());
157 read_pipe(&path, interface_tx, shutdown).await
158 });
159 Ok(())
160 }
161
162 #[cfg(not(feature = "pipe-server"))]
163 async fn receive(&self, _interface_tx: Sender<String>, _shutdown: watch::Receiver<bool>) -> Result<(), Error> {
164 Err(Error::DisabledInterfaceFeature("pipe-server".to_string()))
165 }
166
167 #[cfg(feature = "pipe-client")]
168 async fn send(
169 &self,
170 interface_tx: broadcast::Receiver<Notification>,
171 shutdown: watch::Receiver<bool>,
172 ) -> Result<(), Error> {
173 use crate::interfaces::pipe::pipe_client::write_pipe;
174 use tracing::error;
175
176 let path = self.path.clone();
177 tokio::spawn(async move {
178 match write_pipe(path, interface_tx, shutdown).await {
179 Ok(_) => (),
180 Err(error) => error!("Pipe write error {}", error),
181 }
182 });
183 Ok(())
184 }
185
186 #[cfg(not(feature = "pipe-client"))]
187 async fn send(
188 &self,
189 _interface_rx: broadcast::Receiver<Notification>,
190 _shutdown: watch::Receiver<bool>,
191 ) -> Result<(), Error> {
192 Err(Error::DisabledInterfaceFeature("pipe-client".to_string()))
193 }
194}
195
196#[cfg(feature = "pipe-server")]
197fn create_pipe<P: AsRef<Path>>(path: P, permissions: Mode) -> Result<(), Error> {
198 match nix::unistd::mkfifo(path.as_ref(), permissions) {
199 Err(e) => Err(Error::NixErrorNoError(e)),
200 Ok(_) => set_permissions(path, permissions),
201 }
202}
203
204#[cfg(feature = "pipe-server")]
205fn create_permissions(permissions: Vec<Mode>) -> Mode {
206 let mut set_permission = Mode::empty();
207 for permission in permissions {
208 set_permission.insert(permission)
209 }
210 if set_permission.is_empty() {
211 set_permission.insert(Mode::S_IRWXU)
212 }
213 set_permission
214}
215
216#[cfg(feature = "pipe-server")]
217fn set_permissions<P: AsRef<Path>>(path: P, permissions: Mode) -> Result<(), Error> {
218 use nix::sys::stat::FchmodatFlags;
219 nix::sys::stat::fchmodat(AT_FDCWD, path.as_ref(), permissions, FchmodatFlags::NoFollowSymlink)?;
220 Ok(())
221}
222
223#[cfg(feature = "pipe-server")]
224async fn cleanup_pipe<P: AsRef<Path>>(path: P) -> Result<(), Error> {
225 std::fs::remove_file(path)?;
226 Ok(())
227}