1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, mem};
4
5use futures_lite::io;
6use serde::{Deserialize, Serialize};
7
8pub struct IpcFileHandle {
24 #[cfg(ipc)]
25 handle: usize,
26 #[cfg(not(ipc))]
27 handle: std::fs::File,
28}
29impl fmt::Debug for IpcFileHandle {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 f.debug_struct("IpcFileHandle").field("handle", &self.handle).finish()
32 }
33}
34#[cfg(not(ipc))]
35impl From<std::fs::File> for IpcFileHandle {
36 fn from(file: std::fs::File) -> Self {
37 Self { handle: file }
38 }
39}
40#[cfg(ipc)]
41impl From<std::fs::File> for IpcFileHandle {
42 fn from(file: std::fs::File) -> Self {
43 #[cfg(not(any(windows, unix)))]
44 panic!("IpcFileHandle not implemented for {}", std::env::consts::OS);
45
46 #[cfg(windows)]
47 let handle = std::os::windows::io::IntoRawHandle::into_raw_handle(file) as usize;
48 #[cfg(unix)]
49 let handle = std::os::fd::IntoRawFd::into_raw_fd(file) as usize;
50 Self { handle }
51 }
52}
53#[cfg(not(ipc))]
54impl From<IpcFileHandle> for std::fs::File {
55 fn from(f: IpcFileHandle) -> Self {
56 f.handle
57 }
58}
59#[cfg(ipc)]
60impl From<IpcFileHandle> for std::fs::File {
61 fn from(mut f: IpcFileHandle) -> Self {
62 let handle = mem::take(&mut f.handle);
63 assert!(handle != 0);
64 unsafe { into_file(handle) }
66 }
67}
68#[cfg(not(ipc))]
69impl From<IpcFileHandle> for crate::fs::File {
70 fn from(f: IpcFileHandle) -> Self {
71 crate::fs::File::from(f.handle)
72 }
73}
74#[cfg(ipc)]
75impl From<IpcFileHandle> for crate::fs::File {
76 fn from(f: IpcFileHandle) -> Self {
77 crate::fs::File::from(std::fs::File::from(f))
78 }
79}
80impl IpcFileHandle {
81 pub fn duplicate(&self) -> io::Result<Self> {
86 #[cfg(ipc)]
87 {
88 let handle = self.handle;
89 assert!(handle != 0);
90 let file = unsafe { into_file(handle) };
92
93 let handle: Self = file.try_clone()?.into();
95
96 #[cfg(windows)]
98 let _ = std::os::windows::io::IntoRawHandle::into_raw_handle(file) as usize;
99 #[cfg(unix)]
100 let _ = std::os::fd::IntoRawFd::into_raw_fd(file) as usize;
101
102 Ok(handle)
103 }
104 #[cfg(not(ipc))]
105 {
106 Ok(Self {
107 handle: self.handle.try_clone()?,
108 })
109 }
110 }
111}
112#[cfg(ipc)]
113impl Drop for IpcFileHandle {
114 fn drop(&mut self) {
115 let handle = mem::take(&mut self.handle);
116 if handle != 0 {
117 drop(unsafe { into_file(handle) });
119 }
120 }
121}
122#[cfg(ipc)]
123unsafe fn into_file(handle: usize) -> std::fs::File {
124 #[cfg(windows)]
125 unsafe {
126 std::os::windows::io::FromRawHandle::from_raw_handle(handle as _)
127 }
128 #[cfg(unix)]
129 unsafe {
130 std::os::fd::FromRawFd::from_raw_fd(handle as _)
131 }
132
133 #[cfg(not(any(windows, unix)))]
134 {
135 let _ = handle;
136 panic!("IpcFileHandle not implemented for {}", std::env::consts::OS)
137 }
138}
139#[cfg(not(ipc))]
140impl Serialize for IpcFileHandle {
141 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
142 where
143 S: serde::Serializer,
144 {
145 return Err(serde::ser::Error::custom("cannot serialize `IpcFileHandle` outside IPC"));
146 }
147}
148#[cfg(ipc)]
149impl Serialize for IpcFileHandle {
150 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151 where
152 S: serde::Serializer,
153 {
154 if !crate::channel::is_ipc_serialization() {
155 return Err(serde::ser::Error::custom("cannot serialize `IpcFileHandle` outside IPC"));
156 }
157 let handle = self.duplicate().map_err(serde::ser::Error::custom)?;
158
159 #[cfg(windows)]
160 {
161 let (s, mut r) =
168 super::ipc_unbounded::<(u32, super::IpcSender<(usize, super::IpcSender<bool>)>)>().map_err(serde::ser::Error::custom)?;
169 let ok = Serialize::serialize(&s, serializer)?;
170
171 blocking::unblock(move || {
173 let _hold = &handle;
174 match r.recv_blocking() {
175 Ok((process_id, mut shared_sender)) => {
176 if let Some(handle) = duplicate_handle_for_process(process_id, handle.handle) {
177 match super::ipc_unbounded() {
179 Ok((s, mut r)) => match shared_sender.send_blocking((handle, s)) {
180 Ok(()) => {
181 let _ = r.recv_blocking();
183 }
184 Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
185 },
186 Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
187 }
188 }
189 }
190 Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
191 }
192 })
193 .detach();
194 Ok(ok)
195 }
196 #[cfg(unix)]
197 {
198 let (s, mut r) = super::ipc_unbounded::<(String, super::IpcReceiver<bool>)>().map_err(serde::ser::Error::custom)?;
205 let ok = Serialize::serialize(&s, serializer)?;
206
207 blocking::unblock(move || {
209 let _hold = &handle;
210
211 match r.recv_blocking() {
212 Ok((socket, mut confirm_rcv)) => match std::os::unix::net::UnixDatagram::unbound() {
213 Ok(datagram) => {
214 #[cfg(target_os = "linux")]
215 let result = if let Some(socket) = socket.strip_prefix('\0') {
216 use std::os::{linux::net::SocketAddrExt as _, unix::net::SocketAddr};
217 datagram.connect_addr(&SocketAddr::from_abstract_name(socket.as_bytes()).unwrap())
218 } else {
219 let socket = std::path::PathBuf::from("/tmp/").join(socket);
220 datagram.connect(&socket)
221 };
222 #[cfg(not(target_os = "linux"))]
223 let result = {
224 let socket = std::path::PathBuf::from("/tmp/").join(socket);
225 datagram.connect(&socket)
226 };
227 match result {
228 Ok(()) => {
229 use sendfd::SendWithFd as _;
231 match datagram.send_with_fd(b"zng", &[handle.handle as _]) {
232 Ok(_) => {
233 let _ = confirm_rcv.recv_blocking();
235 }
236 Err(e) => tracing::error!("cannot send IpcFileHandle, {e}"),
237 }
238 }
239 Err(e) => tracing::error!("cannot send IpcFileHandle, cannot connect socket, {e}"),
240 }
241 }
242 Err(e) => tracing::error!("cannot send IpcFileHandle, cannot create unbound datagram, {e}"),
243 },
244 Err(e) => tracing::error!("cannot send IpcFileHandle, side channel disconnected, {e}"),
245 }
246 })
247 .detach();
248
249 Ok(ok)
250 }
251
252 #[cfg(not(any(windows, unix)))]
253 {
254 panic!("IpcFileHandle not implemented for {}", std::env::consts::OS);
255 }
256 }
257}
258#[cfg(not(ipc))]
259impl<'de> Deserialize<'de> for IpcFileHandle {
260 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
261 where
262 D: serde::Deserializer<'de>,
263 {
264 return Err(serde::de::Error::custom("cannot deserialize `IpcFileHandle` outside IPC"));
265 }
266}
267#[cfg(ipc)]
268impl<'de> Deserialize<'de> for IpcFileHandle {
269 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
270 where
271 D: serde::Deserializer<'de>,
272 {
273 #[cfg(windows)]
274 {
275 type Confirm = bool;
276 type Handle = (usize, super::IpcSender<Confirm>);
277 type Process = (u32, super::IpcSender<Handle>);
278
279 let mut process_id_sender = <super::IpcSender<Process> as Deserialize<'de>>::deserialize(deserializer)?;
280 let (s, mut handle_receiver) = super::ipc_unbounded::<Handle>().map_err(serde::de::Error::custom)?;
281
282 process_id_sender
283 .send_blocking((std::process::id(), s))
284 .map_err(serde::de::Error::custom)?;
285
286 let (handle, mut confirm_sender) = handle_receiver.recv_blocking().map_err(serde::de::Error::custom)?;
287
288 use std::os::windows::io::FromRawHandle as _;
289 let handle = unsafe { std::fs::File::from_raw_handle(handle as _) };
291
292 let _ = confirm_sender.send_blocking(true);
293
294 Ok(handle.into())
295 }
296
297 #[cfg(unix)]
298 {
299 use std::{os::unix::net::UnixDatagram, sync::atomic::AtomicUsize};
300
301 let mut socket_sender = <super::IpcSender<(String, super::IpcReceiver<bool>)> as Deserialize<'de>>::deserialize(deserializer)?;
302
303 static SOCKET_ID: AtomicUsize = AtomicUsize::new(0);
304 #[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
305 let mut socket = format!(
306 "zng_task-ipc_file-{}-{}",
307 std::process::id(),
308 SOCKET_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
309 );
310 let mut socket_tmp = None;
311
312 #[cfg(target_os = "linux")]
313 let fd_recv = {
314 use std::os::{linux::net::SocketAddrExt as _, unix::net::SocketAddr};
316 match UnixDatagram::bind_addr(&SocketAddr::from_abstract_name(socket.as_bytes()).unwrap()) {
317 Ok(r) => {
318 socket = format!("\0{socket}");
319 r
320 }
321 Err(e) => {
322 if matches!(e.kind(), std::io::ErrorKind::InvalidInput) {
323 let socket = std::path::PathBuf::from("/tmp/").join(&socket);
325 let _ = std::fs::remove_file(&socket);
326 let r = UnixDatagram::bind(&socket).map_err(serde::de::Error::custom)?;
327 socket_tmp = Some(socket);
328 r
329 } else {
330 return Err(serde::de::Error::custom(e));
331 }
332 }
333 }
334 };
335 #[cfg(not(target_os = "linux"))]
336 let fd_recv = {
337 let socket = std::path::PathBuf::from("/tmp/").join(&socket);
338 let _ = std::fs::remove_file(&socket);
339 let r = UnixDatagram::bind(&socket).map_err(serde::de::Error::custom)?;
340 socket_tmp = Some(socket);
341 r
342 };
343 let _cleanup = zng_app_context::RunOnDrop::new(move || {
344 if let Some(socket) = socket_tmp {
345 let _ = std::fs::remove_file(socket);
346 }
347 });
348
349 let (mut confirm_sender, r) = super::ipc_unbounded().map_err(serde::de::Error::custom)?;
350 socket_sender.send_blocking((socket, r)).map_err(serde::de::Error::custom)?;
351
352 use sendfd::RecvWithFd as _;
353 let mut ignore = [b'z', b'n', b'g'];
354 let mut fd = [0 as std::os::fd::RawFd];
355 fd_recv.recv_with_fd(&mut ignore, &mut fd).map_err(serde::de::Error::custom)?;
356
357 use std::os::fd::FromRawFd as _;
358 let handle = unsafe { std::fs::File::from_raw_fd(fd[0]) };
359 let _ = confirm_sender.send_blocking(true);
360
361 Ok(handle.into())
362 }
363
364 #[cfg(not(any(windows, unix)))]
365 {
366 panic!("IpcFile not implemented for {}", std::env::consts::OS);
367 }
368 }
369}
370
371#[cfg(all(ipc, windows))]
372fn duplicate_handle_for_process(process_id: u32, handle: usize) -> Option<usize> {
373 unsafe {
374 use windows_sys::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
375 use windows_sys::Win32::System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_DUP_HANDLE};
376
377 let target_process = OpenProcess(PROCESS_DUP_HANDLE, 0, process_id);
378 if !target_process.is_null() {
379 let mut target_handle: HANDLE = std::ptr::null_mut();
380 let success = DuplicateHandle(
381 GetCurrentProcess(),
382 handle as HANDLE,
383 target_process,
384 &mut target_handle,
385 0,
386 0,
387 DUPLICATE_SAME_ACCESS,
388 );
389
390 windows_sys::Win32::Foundation::CloseHandle(target_process);
391
392 if success != 0 {
393 Some(target_handle as usize)
394 } else {
395 let error_code = windows_sys::Win32::Foundation::GetLastError();
396 tracing::error!("failed to duplicate IpcFile handle, error code: {error_code:x}");
397 None
398 }
399 } else {
400 tracing::error!("failed to connect to target process for IpcFile handle duplication");
401 None
402 }
403 }
404}