Skip to main content

zng_task/channel/
ipc_file.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, mem};
4
5use futures_lite::io;
6use serde::{Deserialize, Serialize};
7
8/// File handle that can be transferred to another process.
9///
10/// # File
11///
12/// This type can be converted  from and to [`std::fs::File`]. This type does not
13/// implement IO traits, it must be converted to read/write. The file handle is only closed on drop
14/// if it was not converted back.
15///
16/// # Serialization
17///
18/// This type implements serialization only for compatibility with IPC channel, attempting to
19/// serialize it outside of [`with_ipc_serialization`] context will return an error. On IPC serialization
20/// the handle is duplicated for the target process.
21///
22/// [`with_ipc_serialization`]: crate::channel::with_ipc_serialization
23pub struct IpcFileHandle {
24    #[cfg(ipc)]
25    handle: usize,
26    #[cfg(not(ipc))]
27    handle: std::fs::File,
28}
29impl fmt::Debug for IpcFileHandle {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct("IpcFileHandle").field("handle", &self.handle).finish()
32    }
33}
34#[cfg(not(ipc))]
35impl From<std::fs::File> for IpcFileHandle {
36    fn from(file: std::fs::File) -> Self {
37        Self { handle: file }
38    }
39}
40#[cfg(ipc)]
41impl From<std::fs::File> for IpcFileHandle {
42    fn from(file: std::fs::File) -> Self {
43        #[cfg(not(any(windows, unix)))]
44        panic!("IpcFileHandle not implemented for {}", std::env::consts::OS);
45
46        #[cfg(windows)]
47        let handle = std::os::windows::io::IntoRawHandle::into_raw_handle(file) as usize;
48        #[cfg(unix)]
49        let handle = std::os::fd::IntoRawFd::into_raw_fd(file) as usize;
50        Self { handle }
51    }
52}
53#[cfg(not(ipc))]
54impl From<IpcFileHandle> for std::fs::File {
55    fn from(f: IpcFileHandle) -> Self {
56        f.handle
57    }
58}
59#[cfg(ipc)]
60impl From<IpcFileHandle> for std::fs::File {
61    fn from(mut f: IpcFileHandle) -> Self {
62        let handle = mem::take(&mut f.handle);
63        assert!(handle != 0);
64        // SAFETY: handle was not moved (not zero) and was converted from File
65        unsafe { into_file(handle) }
66    }
67}
68#[cfg(not(ipc))]
69impl From<IpcFileHandle> for crate::fs::File {
70    fn from(f: IpcFileHandle) -> Self {
71        crate::fs::File::from(f.handle)
72    }
73}
74#[cfg(ipc)]
75impl From<IpcFileHandle> for crate::fs::File {
76    fn from(f: IpcFileHandle) -> Self {
77        crate::fs::File::from(std::fs::File::from(f))
78    }
79}
80impl IpcFileHandle {
81    /// Duplicate file handle for the same process.
82    ///
83    /// Note that the read/write offset is associated with the system handle, if you convert
84    /// multiple duplicates to `File` any read in one instance advances the position in all instances.
85    pub fn duplicate(&self) -> io::Result<Self> {
86        #[cfg(ipc)]
87        {
88            let handle = self.handle;
89            assert!(handle != 0);
90            // SAFETY: handle was not moved (not zero)
91            let file = unsafe { into_file(handle) };
92
93            // let std call duplicate
94            let handle: Self = file.try_clone()?.into();
95
96            // drop file without cleanup
97            #[cfg(windows)]
98            let _ = std::os::windows::io::IntoRawHandle::into_raw_handle(file) as usize;
99            #[cfg(unix)]
100            let _ = std::os::fd::IntoRawFd::into_raw_fd(file) as usize;
101
102            Ok(handle)
103        }
104        #[cfg(not(ipc))]
105        {
106            Ok(Self {
107                handle: self.handle.try_clone()?,
108            })
109        }
110    }
111}
112#[cfg(ipc)]
113impl Drop for IpcFileHandle {
114    fn drop(&mut self) {
115        let handle = mem::take(&mut self.handle);
116        if handle != 0 {
117            // SAFETY: handle was not moved (not zero) and was converted from File
118            drop(unsafe { into_file(handle) });
119        }
120    }
121}
122#[cfg(ipc)]
123unsafe fn into_file(handle: usize) -> std::fs::File {
124    #[cfg(windows)]
125    unsafe {
126        std::os::windows::io::FromRawHandle::from_raw_handle(handle as _)
127    }
128    #[cfg(unix)]
129    unsafe {
130        std::os::fd::FromRawFd::from_raw_fd(handle as _)
131    }
132
133    #[cfg(not(any(windows, unix)))]
134    {
135        let _ = handle;
136        panic!("IpcFileHandle not implemented for {}", std::env::consts::OS)
137    }
138}
139#[cfg(not(ipc))]
140impl Serialize for IpcFileHandle {
141    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
142    where
143        S: serde::Serializer,
144    {
145        return Err(serde::ser::Error::custom("cannot serialize `IpcFileHandle` outside IPC"));
146    }
147}
148#[cfg(ipc)]
149impl Serialize for IpcFileHandle {
150    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151    where
152        S: serde::Serializer,
153    {
154        if !crate::channel::is_ipc_serialization() {
155            return Err(serde::ser::Error::custom("cannot serialize `IpcFileHandle` outside IPC"));
156        }
157        let handle = self.duplicate().map_err(serde::ser::Error::custom)?;
158
159        #[cfg(windows)]
160        {
161            // -> Sends a channel sender to receive the target process id and a sender to continue the protocol
162            // <- Receives the target process id, DuplicateHandle
163            // -> Sends the new handle and a confirmation sender
164            // <- Receives confirmation, drops this handle
165
166            // ->
167            let (s, mut r) =
168                super::ipc_unbounded::<(u32, super::IpcSender<(usize, super::IpcSender<bool>)>)>().map_err(serde::ser::Error::custom)?;
169            let ok = Serialize::serialize(&s, serializer)?;
170
171            // <-
172            blocking::unblock(move || {
173                let _hold = &handle;
174                match r.recv_blocking() {
175                    Ok((process_id, mut shared_sender)) => {
176                        if let Some(handle) = duplicate_handle_for_process(process_id, handle.handle) {
177                            // ->
178                            match super::ipc_unbounded() {
179                                Ok((s, mut r)) => match shared_sender.send_blocking((handle, s)) {
180                                    Ok(()) => {
181                                        // <-
182                                        let _ = r.recv_blocking();
183                                    }
184                                    Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
185                                },
186                                Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
187                            }
188                        }
189                    }
190                    Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
191                }
192            })
193            .detach();
194            Ok(ok)
195        }
196        #[cfg(unix)]
197        {
198            // -> Sends a channel sender to get a socket name from target process and a sender to continue the protocol
199            // <- Receives socket name and and connects UnixDatagram
200            // ~> Sends the FD using datagram
201            // <- Receives confirmation, drops this handle
202
203            // ->
204            let (s, mut r) = super::ipc_unbounded::<(String, super::IpcReceiver<bool>)>().map_err(serde::ser::Error::custom)?;
205            let ok = Serialize::serialize(&s, serializer)?;
206
207            // <-
208            blocking::unblock(move || {
209                let _hold = &handle;
210
211                match r.recv_blocking() {
212                    Ok((socket, mut confirm_rcv)) => match std::os::unix::net::UnixDatagram::unbound() {
213                        Ok(datagram) => {
214                            #[cfg(target_os = "linux")]
215                            let result = if let Some(socket) = socket.strip_prefix('\0') {
216                                use std::os::{linux::net::SocketAddrExt as _, unix::net::SocketAddr};
217                                datagram.connect_addr(&SocketAddr::from_abstract_name(socket.as_bytes()).unwrap())
218                            } else {
219                                let socket = std::path::PathBuf::from("/tmp/").join(socket);
220                                datagram.connect(&socket)
221                            };
222                            #[cfg(not(target_os = "linux"))]
223                            let result = {
224                                let socket = std::path::PathBuf::from("/tmp/").join(socket);
225                                datagram.connect(&socket)
226                            };
227                            match result {
228                                Ok(()) => {
229                                    // ~>
230                                    use sendfd::SendWithFd as _;
231                                    match datagram.send_with_fd(b"zng", &[handle.handle as _]) {
232                                        Ok(_) => {
233                                            // <-
234                                            let _ = confirm_rcv.recv_blocking();
235                                        }
236                                        Err(e) => tracing::error!("cannot send IpcFileHandle, {e}"),
237                                    }
238                                }
239                                Err(e) => tracing::error!("cannot send IpcFileHandle, cannot connect socket, {e}"),
240                            }
241                        }
242                        Err(e) => tracing::error!("cannot send IpcFileHandle, cannot create unbound datagram, {e}"),
243                    },
244                    Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
245                }
246            })
247            .detach();
248
249            Ok(ok)
250        }
251
252        #[cfg(not(any(windows, unix)))]
253        {
254            panic!("IpcFileHandle not implemented for {}", std::env::consts::OS);
255        }
256    }
257}
258#[cfg(not(ipc))]
259impl<'de> Deserialize<'de> for IpcFileHandle {
260    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
261    where
262        D: serde::Deserializer<'de>,
263    {
264        return Err(serde::de::Error::custom("cannot deserialize `IpcFileHandle` outside IPC"));
265    }
266}
267#[cfg(ipc)]
268impl<'de> Deserialize<'de> for IpcFileHandle {
269    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
270    where
271        D: serde::Deserializer<'de>,
272    {
273        #[cfg(windows)]
274        {
275            type Confirm = bool;
276            type Handle = (usize, super::IpcSender<Confirm>);
277            type Process = (u32, super::IpcSender<Handle>);
278
279            let mut process_id_sender = <super::IpcSender<Process> as Deserialize<'de>>::deserialize(deserializer)?;
280            let (s, mut handle_receiver) = super::ipc_unbounded::<Handle>().map_err(serde::de::Error::custom)?;
281
282            process_id_sender
283                .send_blocking((std::process::id(), s))
284                .map_err(serde::de::Error::custom)?;
285
286            let (handle, mut confirm_sender) = handle_receiver.recv_blocking().map_err(serde::de::Error::custom)?;
287
288            use std::os::windows::io::FromRawHandle as _;
289            // SAFETY: this handle is the output of DuplicateHandle for the current process
290            let handle = unsafe { std::fs::File::from_raw_handle(handle as _) };
291
292            let _ = confirm_sender.send_blocking(true);
293
294            Ok(handle.into())
295        }
296
297        #[cfg(unix)]
298        {
299            use std::{os::unix::net::UnixDatagram, sync::atomic::AtomicUsize};
300
301            let mut socket_sender = <super::IpcSender<(String, super::IpcReceiver<bool>)> as Deserialize<'de>>::deserialize(deserializer)?;
302
303            static SOCKET_ID: AtomicUsize = AtomicUsize::new(0);
304            #[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
305            let mut socket = format!(
306                "zng_task-ipc_file-{}-{}",
307                std::process::id(),
308                SOCKET_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
309            );
310            let mut socket_tmp = None;
311
312            #[cfg(target_os = "linux")]
313            let fd_recv = {
314                // try abstract name first
315                use std::os::{linux::net::SocketAddrExt as _, unix::net::SocketAddr};
316                match UnixDatagram::bind_addr(&SocketAddr::from_abstract_name(socket.as_bytes()).unwrap()) {
317                    Ok(r) => {
318                        socket = format!("\0{socket}");
319                        r
320                    }
321                    Err(e) => {
322                        if matches!(e.kind(), std::io::ErrorKind::InvalidInput) {
323                            // fallback to tmp file socket
324                            let socket = std::path::PathBuf::from("/tmp/").join(&socket);
325                            let _ = std::fs::remove_file(&socket);
326                            let r = UnixDatagram::bind(&socket).map_err(serde::de::Error::custom)?;
327                            socket_tmp = Some(socket);
328                            r
329                        } else {
330                            return Err(serde::de::Error::custom(e));
331                        }
332                    }
333                }
334            };
335            #[cfg(not(target_os = "linux"))]
336            let fd_recv = {
337                let socket = std::path::PathBuf::from("/tmp/").join(&socket);
338                let _ = std::fs::remove_file(&socket);
339                let r = UnixDatagram::bind(&socket).map_err(serde::de::Error::custom)?;
340                socket_tmp = Some(socket);
341                r
342            };
343            let _cleanup = zng_app_context::RunOnDrop::new(move || {
344                if let Some(socket) = socket_tmp {
345                    let _ = std::fs::remove_file(socket);
346                }
347            });
348
349            let (mut confirm_sender, r) = super::ipc_unbounded().map_err(serde::de::Error::custom)?;
350            socket_sender.send_blocking((socket, r)).map_err(serde::de::Error::custom)?;
351
352            use sendfd::RecvWithFd as _;
353            let mut ignore = [b'z', b'n', b'g'];
354            let mut fd = [0 as std::os::fd::RawFd];
355            fd_recv.recv_with_fd(&mut ignore, &mut fd).map_err(serde::de::Error::custom)?;
356
357            use std::os::fd::FromRawFd as _;
358            let handle = unsafe { std::fs::File::from_raw_fd(fd[0]) };
359            let _ = confirm_sender.send_blocking(true);
360
361            Ok(handle.into())
362        }
363
364        #[cfg(not(any(windows, unix)))]
365        {
366            panic!("IpcFile not implemented for {}", std::env::consts::OS);
367        }
368    }
369}
370
371#[cfg(all(ipc, windows))]
372fn duplicate_handle_for_process(process_id: u32, handle: usize) -> Option<usize> {
373    unsafe {
374        use windows_sys::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
375        use windows_sys::Win32::System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_DUP_HANDLE};
376
377        let target_process = OpenProcess(PROCESS_DUP_HANDLE, 0, process_id);
378        if !target_process.is_null() {
379            let mut target_handle: HANDLE = std::ptr::null_mut();
380            let success = DuplicateHandle(
381                GetCurrentProcess(),
382                handle as HANDLE,
383                target_process,
384                &mut target_handle,
385                0,
386                0,
387                DUPLICATE_SAME_ACCESS,
388            );
389
390            windows_sys::Win32::Foundation::CloseHandle(target_process);
391
392            if success != 0 {
393                Some(target_handle as usize)
394            } else {
395                let error_code = windows_sys::Win32::Foundation::GetLastError();
396                tracing::error!("failed to duplicate IpcFile handle, error code: {error_code:x}");
397                None
398            }
399        } else {
400            tracing::error!("failed to connect to target process for IpcFile handle duplication");
401            None
402        }
403    }
404}