1use std::{
15 cell::UnsafeCell,
16 collections::HashMap,
17 fmt,
18 fs::{File, OpenOptions},
19 io::{ErrorKind, Read, Write},
20 os::unix::fs::OpenOptionsExt,
21 sync::Arc,
22};
23
24#[cfg(not(target_os = "macos"))]
25use advisory_lock::{AdvisoryFileLock, FileLockMode};
26use async_trait::async_trait;
27use filepath::FilePath;
28use nix::{libc, unistd::unlink};
29use rand::Rng;
30use tokio::{
31 fs::remove_file,
32 io::{unix::AsyncFd, Interest},
33 task::JoinHandle,
34};
35use tokio_util::sync::CancellationToken;
36use unix_named_pipe::{create, open_write};
37use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait};
38use zenoh_link_commons::{
39 ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
40 LinkUnicastTrait, NewLinkChannelSender,
41};
42use zenoh_protocol::{
43 core::{EndPoint, Locator},
44 transport::BatchSize,
45};
46use zenoh_result::{bail, ZResult};
47use zenoh_runtime::ZRuntime;
48
49use super::FILE_ACCESS_MASK;
50use crate::config;
51
52const LINUX_PIPE_MAX_MTU: BatchSize = BatchSize::MAX;
53const LINUX_PIPE_DEDICATE_TRIES: usize = 100;
54
55static PIPE_INVITATION: &[u8] = &[0xDE, 0xAD, 0xBE, 0xEF];
56
57struct Invitation;
58impl Invitation {
59 async fn send(suffix: u32, pipe: &mut PipeW) -> ZResult<()> {
60 let msg: [u8; 8] = {
61 let mut msg: [u8; 8] = [0; 8];
62 let (one, two) = msg.split_at_mut(PIPE_INVITATION.len());
63 one.copy_from_slice(PIPE_INVITATION);
64 two.copy_from_slice(&suffix.to_ne_bytes());
65 msg
66 };
67 pipe.write_all(&msg).await
68 }
69
70 async fn receive(pipe: &mut PipeR) -> ZResult<u32> {
71 let mut msg: [u8; 8] = [0; 8];
72 pipe.read_exact(&mut msg).await?;
73 if !msg.starts_with(PIPE_INVITATION) {
74 bail!("Unexpected invitation received during pipe handshake!")
75 }
76
77 let suffix_bytes: &[u8; 4] = &msg[4..].try_into()?;
78 let suffix = u32::from_ne_bytes(*suffix_bytes);
79 Ok(suffix)
80 }
81
82 async fn confirm(suffix: u32, pipe: &mut PipeW) -> ZResult<()> {
83 Self::send(suffix, pipe).await
84 }
85
86 async fn expect(expected_suffix: u32, pipe: &mut PipeR) -> ZResult<()> {
87 let received_suffix = Self::receive(pipe).await?;
88 if received_suffix != expected_suffix {
89 bail!(
90 "Suffix mismatch: expected {} got {}",
91 expected_suffix,
92 received_suffix
93 )
94 }
95 Ok(())
96 }
97}
98
99struct PipeR {
100 pipe: AsyncFd<File>,
101}
102
103impl Drop for PipeR {
104 fn drop(&mut self) {
105 if let Ok(path) = self.pipe.get_ref().path() {
106 let _ = unlink(&path);
107 }
108 }
109}
110impl PipeR {
111 async fn new(path: &str, access_mode: u32) -> ZResult<Self> {
112 let pipe_file = Self::create_and_open_unique_pipe_for_read(path, access_mode).await?;
114 let pipe = AsyncFd::new(pipe_file)?;
116 Ok(Self { pipe })
117 }
118
119 async fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ZResult<usize> {
120 let result = self
121 .pipe
122 .async_io_mut(Interest::READABLE, |pipe| match pipe.read(&mut buf[..]) {
123 Ok(0) => Err(ErrorKind::WouldBlock.into()),
124 Ok(val) => Ok(val),
125 Err(e) => Err(e),
126 })
127 .await?;
128 Ok(result)
129 }
130
131 async fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ZResult<()> {
132 let mut r: usize = 0;
133 self.pipe
134 .async_io_mut(Interest::READABLE, |pipe| match pipe.read(&mut buf[r..]) {
135 Ok(0) => Err(ErrorKind::WouldBlock.into()),
136 Ok(val) => {
137 r += val;
138 if r == buf.len() {
139 return Ok(());
140 }
141 Err(ErrorKind::WouldBlock.into())
142 }
143 Err(e) => Err(e),
144 })
145 .await?;
146 Ok(())
147 }
148
149 async fn create_and_open_unique_pipe_for_read(path_r: &str, access_mode: u32) -> ZResult<File> {
150 let r_was_created = create(path_r, Some(access_mode));
151 let open_result = Self::open_unique_pipe_for_read(path_r);
152 match (open_result.as_ref(), r_was_created) {
153 (Err(_), Ok(_)) => {
154 let _ = remove_file(path_r).await;
156 }
157 (Ok(mut pipe_file), Err(_)) => {
158 let mut buf: [u8; 1] = [0; 1];
160 while let Ok(val) = pipe_file.read(&mut buf) {
161 if val == 0 {
162 break;
163 }
164 }
165 }
166 _ => {}
167 }
168
169 open_result
170 }
171
172 fn open_unique_pipe_for_read(path: &str) -> ZResult<File> {
173 let read = OpenOptions::new()
174 .read(true)
175 .write(true)
176 .custom_flags(libc::O_NONBLOCK)
177 .open(path)?;
178
179 #[cfg(not(target_os = "macos"))]
180 AdvisoryFileLock::try_lock(&read, FileLockMode::Exclusive)?;
181 Ok(read)
182 }
183}
184
185struct PipeW {
186 pipe: AsyncFd<File>,
187}
188impl PipeW {
189 async fn new(path: &str) -> ZResult<Self> {
190 let pipe_file = Self::open_unique_pipe_for_write(path)?;
192 let pipe = AsyncFd::new(pipe_file)?;
194 Ok(Self { pipe })
195 }
196
197 async fn write<'a>(&'a mut self, buf: &'a [u8]) -> ZResult<usize> {
198 let result = self
199 .pipe
200 .async_io_mut(Interest::WRITABLE, |pipe| match pipe.write(buf) {
201 Ok(0) => Err(ErrorKind::WouldBlock.into()),
202 Ok(val) => Ok(val),
203 Err(e) => Err(e),
204 })
205 .await?;
206 Ok(result)
207 }
208
209 async fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ZResult<()> {
210 let mut r: usize = 0;
211 self.pipe
212 .async_io_mut(Interest::WRITABLE, |pipe| match pipe.write(&buf[r..]) {
213 Ok(0) => Err(ErrorKind::WouldBlock.into()),
214 Ok(val) => {
215 r += val;
216 if r == buf.len() {
217 return Ok(());
218 }
219 Err(ErrorKind::WouldBlock.into())
220 }
221 Err(e) => Err(e),
222 })
223 .await?;
224 Ok(())
225 }
226
227 fn open_unique_pipe_for_write(path: &str) -> ZResult<File> {
228 let write = open_write(path)?;
229 #[cfg(not(target_os = "macos"))]
231 if AdvisoryFileLock::try_lock(&write, FileLockMode::Exclusive).is_ok() {
232 let _ = AdvisoryFileLock::unlock(&write);
233 bail!("no listener...")
234 }
235 Ok(write)
236 }
237}
238
239async fn handle_incoming_connections(
240 endpoint: &EndPoint,
241 manager: &Arc<NewLinkChannelSender>,
242 request_channel: &mut PipeR,
243 path_downlink: &str,
244 path_uplink: &str,
245 access_mode: u32,
246) -> ZResult<()> {
247 let suffix = Invitation::receive(request_channel).await?;
249
250 let (dedicated_downlink_path, dedicated_uplink_path) =
252 get_dedicated_pipe_names(path_downlink, path_uplink, suffix);
253
254 let mut dedicated_downlink = PipeW::new(&dedicated_downlink_path).await?;
256 let mut dedicated_uplink = PipeR::new(&dedicated_uplink_path, access_mode).await?;
257
258 Invitation::confirm(suffix, &mut dedicated_downlink).await?;
260
261 Invitation::expect(suffix, &mut dedicated_uplink).await?;
263
264 let local = Locator::new(
266 endpoint.protocol(),
267 dedicated_uplink_path,
268 endpoint.metadata(),
269 )?;
270 let remote = Locator::new(
271 endpoint.protocol(),
272 dedicated_downlink_path,
273 endpoint.metadata(),
274 )?;
275
276 let link = Arc::new(UnicastPipe {
277 r: UnsafeCell::new(dedicated_uplink),
278 w: UnsafeCell::new(dedicated_downlink),
279 local,
280 remote,
281 });
282
283 manager.send_async(LinkUnicast(link)).await?;
285
286 ZResult::Ok(())
287}
288
289struct UnicastPipeListener {
290 uplink_locator: Locator,
291 token: CancellationToken,
292 handle: JoinHandle<()>,
293}
294impl UnicastPipeListener {
295 async fn listen(endpoint: EndPoint, manager: Arc<NewLinkChannelSender>) -> ZResult<Self> {
296 let (path, access_mode) = endpoint_to_pipe_path(&endpoint);
297 let (path_uplink, path_downlink) = split_pipe_path(&path);
298 let local = Locator::new(endpoint.protocol(), path, endpoint.metadata())?;
299
300 let mut request_channel = PipeR::new(&path_uplink, access_mode).await?;
302
303 let token = CancellationToken::new();
304 let c_token = token.clone();
305
306 let handle = tokio::task::spawn_blocking(move || {
309 ZRuntime::Acceptor.block_on(async move {
310 loop {
311 tokio::select! {
312 _ = handle_incoming_connections(
313 &endpoint,
314 &manager,
315 &mut request_channel,
316 &path_downlink,
317 &path_uplink,
318 access_mode,
319 ) => {}
320
321 _ = c_token.cancelled() => break
322 }
323 }
324 })
325 });
326
327 Ok(Self {
328 uplink_locator: local,
329 token,
330 handle,
331 })
332 }
333
334 fn stop_listening(self) {
335 self.token.cancel();
336 let _ = ResolveFuture::new(self.handle).wait();
337 }
338}
339
340fn get_dedicated_pipe_names(
341 path_downlink: &str,
342 path_uplink: &str,
343 suffix: u32,
344) -> (String, String) {
345 let suffix_str = suffix.to_string();
346 let path_uplink = path_uplink.to_string() + &suffix_str;
347 let path_downlink = path_downlink.to_string() + &suffix_str;
348 (path_downlink, path_uplink)
349}
350
351async fn create_pipe(
352 path_uplink: &str,
353 path_downlink: &str,
354 access_mode: u32,
355) -> ZResult<(PipeR, u32, String, String)> {
356 let suffix: u32 = rand::thread_rng().gen();
358
359 let (path_downlink, path_uplink) = get_dedicated_pipe_names(path_downlink, path_uplink, suffix);
361
362 let downlink = PipeR::new(&path_downlink, access_mode).await?;
364 let _uplink = PipeR::new(&path_uplink, access_mode).await?; Ok((downlink, suffix, path_downlink, path_uplink))
367}
368
369async fn dedicate_pipe(
370 path_uplink: &str,
371 path_downlink: &str,
372 access_mode: u32,
373) -> ZResult<(PipeR, u32, String, String)> {
374 for _ in 0..LINUX_PIPE_DEDICATE_TRIES {
375 match create_pipe(path_uplink, path_downlink, access_mode).await {
376 Err(_) => {}
377 val => {
378 return val;
379 }
380 }
381 }
382 bail!("Unabe to dedicate pipe!")
383}
384
385struct UnicastPipeClient;
386impl UnicastPipeClient {
387 async fn connect_to(endpoint: EndPoint) -> ZResult<UnicastPipe> {
388 let (path, access_mode) = endpoint_to_pipe_path(&endpoint);
389 let (path_uplink, path_downlink) = split_pipe_path(&path);
390
391 let mut request_channel = PipeW::new(&path_uplink).await?;
395
396 let (
398 mut dedicated_downlink,
399 dedicated_suffix,
400 dedicated_donlink_path,
401 dedicated_uplink_path,
402 ) = dedicate_pipe(&path_uplink, &path_downlink, access_mode).await?;
403
404 Invitation::send(dedicated_suffix, &mut request_channel).await?;
406
407 Invitation::expect(dedicated_suffix, &mut dedicated_downlink).await?;
410
411 let mut dedicated_uplink = PipeW::new(&dedicated_uplink_path).await?;
413
414 Invitation::confirm(dedicated_suffix, &mut dedicated_uplink).await?;
416
417 let local = Locator::new(
419 endpoint.protocol(),
420 dedicated_donlink_path,
421 endpoint.metadata(),
422 )?;
423 let remote = Locator::new(
424 endpoint.protocol(),
425 dedicated_uplink_path,
426 endpoint.metadata(),
427 )?;
428
429 Ok(UnicastPipe {
430 r: UnsafeCell::new(dedicated_downlink),
431 w: UnsafeCell::new(dedicated_uplink),
432 local,
433 remote,
434 })
435 }
436}
437
438struct UnicastPipe {
439 r: UnsafeCell<PipeR>,
448 w: UnsafeCell<PipeW>,
449 local: Locator,
450 remote: Locator,
451}
452
453impl UnicastPipe {
454 #[allow(clippy::mut_from_ref)]
458 fn get_r_mut(&self) -> &mut PipeR {
459 unsafe { &mut *self.r.get() }
460 }
461
462 #[allow(clippy::mut_from_ref)]
463 fn get_w_mut(&self) -> &mut PipeW {
464 unsafe { &mut *self.w.get() }
465 }
466}
467unsafe impl Sync for UnicastPipe {}
469
470impl Drop for UnicastPipe {
471 fn drop(&mut self) {}
472}
473
474#[async_trait]
475impl LinkUnicastTrait for UnicastPipe {
476 async fn close(&self) -> ZResult<()> {
477 tracing::trace!("Closing Unix Pipe link: {}", self);
478 Ok(())
479 }
480
481 async fn write(&self, buffer: &[u8]) -> ZResult<usize> {
482 self.get_w_mut().write(buffer).await
483 }
484
485 async fn write_all(&self, buffer: &[u8]) -> ZResult<()> {
486 self.get_w_mut().write_all(buffer).await
487 }
488
489 async fn read(&self, buffer: &mut [u8]) -> ZResult<usize> {
490 self.get_r_mut().read(buffer).await
491 }
492
493 async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> {
494 self.get_r_mut().read_exact(buffer).await
495 }
496
497 #[inline(always)]
498 fn get_src(&self) -> &Locator {
499 &self.local
500 }
501
502 #[inline(always)]
503 fn get_dst(&self) -> &Locator {
504 &self.remote
505 }
506
507 #[inline(always)]
508 fn get_mtu(&self) -> BatchSize {
509 LINUX_PIPE_MAX_MTU
510 }
511
512 #[inline(always)]
513 fn get_interface_names(&self) -> Vec<String> {
514 tracing::debug!("The get_interface_names for UnicastPipe is not supported");
516 vec![]
517 }
518
519 #[inline(always)]
520 fn is_reliable(&self) -> bool {
521 super::IS_RELIABLE
522 }
523
524 #[inline(always)]
525 fn is_streamed(&self) -> bool {
526 true
527 }
528
529 #[inline(always)]
530 fn get_auth_id(&self) -> &LinkAuthId {
531 &LinkAuthId::Unixpipe
532 }
533}
534
535impl fmt::Display for UnicastPipe {
536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537 write!(f, "{} => {}", self.local, self.remote)?;
538 Ok(())
539 }
540}
541
542impl fmt::Debug for UnicastPipe {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 f.debug_struct("UnicastPipe")
545 .field("src", &self.local)
546 .field("dst", &self.remote)
547 .finish()
548 }
549}
550
551pub struct LinkManagerUnicastPipe {
552 manager: Arc<NewLinkChannelSender>,
553 listeners: tokio::sync::RwLock<HashMap<EndPoint, UnicastPipeListener>>,
554}
555
556impl LinkManagerUnicastPipe {
557 pub fn new(manager: NewLinkChannelSender) -> Self {
558 Self {
559 manager: Arc::new(manager),
560 listeners: tokio::sync::RwLock::new(HashMap::new()),
561 }
562 }
563}
564impl ConstructibleLinkManagerUnicast<()> for LinkManagerUnicastPipe {
565 fn new(new_link_sender: NewLinkChannelSender, _: ()) -> ZResult<Self> {
566 Ok(Self::new(new_link_sender))
567 }
568}
569
570#[async_trait]
571impl LinkManagerUnicastTrait for LinkManagerUnicastPipe {
572 async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
573 let pipe = UnicastPipeClient::connect_to(endpoint).await?;
574 Ok(LinkUnicast(Arc::new(pipe)))
575 }
576
577 async fn new_listener(&self, endpoint: EndPoint) -> ZResult<Locator> {
578 let listener = UnicastPipeListener::listen(endpoint.clone(), self.manager.clone()).await?;
579 let locator = listener.uplink_locator.clone();
580 zasyncwrite!(self.listeners).insert(endpoint, listener);
581 Ok(locator)
582 }
583
584 async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> {
585 let removed = zasyncwrite!(self.listeners).remove(endpoint);
586 match removed {
587 Some(val) => {
588 val.stop_listening();
589 Ok(())
590 }
591 None => bail!("No listener found for endpoint {}", endpoint),
592 }
593 }
594
595 async fn get_listeners(&self) -> Vec<EndPoint> {
596 zasyncread!(self.listeners).keys().cloned().collect()
597 }
598
599 async fn get_locators(&self) -> Vec<Locator> {
600 zasyncread!(self.listeners)
601 .values()
602 .map(|v| v.uplink_locator.clone())
603 .collect()
604 }
605}
606
607fn endpoint_to_pipe_path(endpoint: &EndPoint) -> (String, u32) {
608 let path = endpoint.address().to_string();
609 let access_mode = endpoint
610 .config()
611 .get(config::FILE_ACCESS_MASK)
612 .map_or(*FILE_ACCESS_MASK, |val| {
613 val.parse().unwrap_or(*FILE_ACCESS_MASK)
614 });
615 (path, access_mode)
616}
617
618fn split_pipe_path(path: &str) -> (String, String) {
619 let path_uplink = format!("{path}_uplink");
620 let path_downlink = format!("{path}_downlink");
621 (path_uplink, path_downlink)
622}