zenoh_link_unixpipe/unix/
unicast.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    cell::UnsafeCell,
16    collections::HashMap,
17    fmt,
18    fs::{File, OpenOptions},
19    io::{ErrorKind, Read, Write},
20    os::unix::fs::OpenOptionsExt,
21    sync::Arc,
22};
23
24#[cfg(not(target_os = "macos"))]
25use advisory_lock::{AdvisoryFileLock, FileLockMode};
26use async_trait::async_trait;
27use filepath::FilePath;
28use nix::{libc, unistd::unlink};
29use rand::Rng;
30use tokio::{
31    fs::remove_file,
32    io::{unix::AsyncFd, Interest},
33    task::JoinHandle,
34};
35use tokio_util::sync::CancellationToken;
36use unix_named_pipe::{create, open_write};
37use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait};
38use zenoh_link_commons::{
39    ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
40    LinkUnicastTrait, NewLinkChannelSender,
41};
42use zenoh_protocol::{
43    core::{EndPoint, Locator},
44    transport::BatchSize,
45};
46use zenoh_result::{bail, ZResult};
47use zenoh_runtime::ZRuntime;
48
49use super::FILE_ACCESS_MASK;
50use crate::config;
51
52const LINUX_PIPE_MAX_MTU: BatchSize = BatchSize::MAX;
53const LINUX_PIPE_DEDICATE_TRIES: usize = 100;
54
55static PIPE_INVITATION: &[u8] = &[0xDE, 0xAD, 0xBE, 0xEF];
56
57struct Invitation;
58impl Invitation {
59    async fn send(suffix: u32, pipe: &mut PipeW) -> ZResult<()> {
60        let msg: [u8; 8] = {
61            let mut msg: [u8; 8] = [0; 8];
62            let (one, two) = msg.split_at_mut(PIPE_INVITATION.len());
63            one.copy_from_slice(PIPE_INVITATION);
64            two.copy_from_slice(&suffix.to_ne_bytes());
65            msg
66        };
67        pipe.write_all(&msg).await
68    }
69
70    async fn receive(pipe: &mut PipeR) -> ZResult<u32> {
71        let mut msg: [u8; 8] = [0; 8];
72        pipe.read_exact(&mut msg).await?;
73        if !msg.starts_with(PIPE_INVITATION) {
74            bail!("Unexpected invitation received during pipe handshake!")
75        }
76
77        let suffix_bytes: &[u8; 4] = &msg[4..].try_into()?;
78        let suffix = u32::from_ne_bytes(*suffix_bytes);
79        Ok(suffix)
80    }
81
82    async fn confirm(suffix: u32, pipe: &mut PipeW) -> ZResult<()> {
83        Self::send(suffix, pipe).await
84    }
85
86    async fn expect(expected_suffix: u32, pipe: &mut PipeR) -> ZResult<()> {
87        let received_suffix = Self::receive(pipe).await?;
88        if received_suffix != expected_suffix {
89            bail!(
90                "Suffix mismatch: expected {} got {}",
91                expected_suffix,
92                received_suffix
93            )
94        }
95        Ok(())
96    }
97}
98
99struct PipeR {
100    pipe: AsyncFd<File>,
101}
102
103impl Drop for PipeR {
104    fn drop(&mut self) {
105        if let Ok(path) = self.pipe.get_ref().path() {
106            let _ = unlink(&path);
107        }
108    }
109}
110impl PipeR {
111    async fn new(path: &str, access_mode: u32) -> ZResult<Self> {
112        // create, open and lock named pipe
113        let pipe_file = Self::create_and_open_unique_pipe_for_read(path, access_mode).await?;
114        // create async_io wrapper for pipe's file descriptor
115        let pipe = AsyncFd::new(pipe_file)?;
116        Ok(Self { pipe })
117    }
118
119    async fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ZResult<usize> {
120        let result = self
121            .pipe
122            .async_io_mut(Interest::READABLE, |pipe| match pipe.read(&mut buf[..]) {
123                Ok(0) => Err(ErrorKind::WouldBlock.into()),
124                Ok(val) => Ok(val),
125                Err(e) => Err(e),
126            })
127            .await?;
128        Ok(result)
129    }
130
131    async fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ZResult<()> {
132        let mut r: usize = 0;
133        self.pipe
134            .async_io_mut(Interest::READABLE, |pipe| match pipe.read(&mut buf[r..]) {
135                Ok(0) => Err(ErrorKind::WouldBlock.into()),
136                Ok(val) => {
137                    r += val;
138                    if r == buf.len() {
139                        return Ok(());
140                    }
141                    Err(ErrorKind::WouldBlock.into())
142                }
143                Err(e) => Err(e),
144            })
145            .await?;
146        Ok(())
147    }
148
149    async fn create_and_open_unique_pipe_for_read(path_r: &str, access_mode: u32) -> ZResult<File> {
150        let r_was_created = create(path_r, Some(access_mode));
151        let open_result = Self::open_unique_pipe_for_read(path_r);
152        match (open_result.as_ref(), r_was_created) {
153            (Err(_), Ok(_)) => {
154                // clean-up in case of failure
155                let _ = remove_file(path_r).await;
156            }
157            (Ok(mut pipe_file), Err(_)) => {
158                // drop all the data from the pipe in case if it already exists
159                let mut buf: [u8; 1] = [0; 1];
160                while let Ok(val) = pipe_file.read(&mut buf) {
161                    if val == 0 {
162                        break;
163                    }
164                }
165            }
166            _ => {}
167        }
168
169        open_result
170    }
171
172    fn open_unique_pipe_for_read(path: &str) -> ZResult<File> {
173        let read = OpenOptions::new()
174            .read(true)
175            .write(true)
176            .custom_flags(libc::O_NONBLOCK)
177            .open(path)?;
178
179        #[cfg(not(target_os = "macos"))]
180        AdvisoryFileLock::try_lock(&read, FileLockMode::Exclusive)?;
181        Ok(read)
182    }
183}
184
185struct PipeW {
186    pipe: AsyncFd<File>,
187}
188impl PipeW {
189    async fn new(path: &str) -> ZResult<Self> {
190        // create, open and lock named pipe
191        let pipe_file = Self::open_unique_pipe_for_write(path)?;
192        // create async_io wrapper for pipe's file descriptor
193        let pipe = AsyncFd::new(pipe_file)?;
194        Ok(Self { pipe })
195    }
196
197    async fn write<'a>(&'a mut self, buf: &'a [u8]) -> ZResult<usize> {
198        let result = self
199            .pipe
200            .async_io_mut(Interest::WRITABLE, |pipe| match pipe.write(buf) {
201                Ok(0) => Err(ErrorKind::WouldBlock.into()),
202                Ok(val) => Ok(val),
203                Err(e) => Err(e),
204            })
205            .await?;
206        Ok(result)
207    }
208
209    async fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ZResult<()> {
210        let mut r: usize = 0;
211        self.pipe
212            .async_io_mut(Interest::WRITABLE, |pipe| match pipe.write(&buf[r..]) {
213                Ok(0) => Err(ErrorKind::WouldBlock.into()),
214                Ok(val) => {
215                    r += val;
216                    if r == buf.len() {
217                        return Ok(());
218                    }
219                    Err(ErrorKind::WouldBlock.into())
220                }
221                Err(e) => Err(e),
222            })
223            .await?;
224        Ok(())
225    }
226
227    fn open_unique_pipe_for_write(path: &str) -> ZResult<File> {
228        let write = open_write(path)?;
229        // the file must be already locked at the other side...
230        #[cfg(not(target_os = "macos"))]
231        if AdvisoryFileLock::try_lock(&write, FileLockMode::Exclusive).is_ok() {
232            let _ = AdvisoryFileLock::unlock(&write);
233            bail!("no listener...")
234        }
235        Ok(write)
236    }
237}
238
239async fn handle_incoming_connections(
240    endpoint: &EndPoint,
241    manager: &Arc<NewLinkChannelSender>,
242    request_channel: &mut PipeR,
243    path_downlink: &str,
244    path_uplink: &str,
245    access_mode: u32,
246) -> ZResult<()> {
247    // read invitation from the request channel
248    let suffix = Invitation::receive(request_channel).await?;
249
250    // generate uplink and downlink names
251    let (dedicated_downlink_path, dedicated_uplink_path) =
252        get_dedicated_pipe_names(path_downlink, path_uplink, suffix);
253
254    // create dedicated downlink and uplink
255    let mut dedicated_downlink = PipeW::new(&dedicated_downlink_path).await?;
256    let mut dedicated_uplink = PipeR::new(&dedicated_uplink_path, access_mode).await?;
257
258    // confirm over the dedicated channel
259    Invitation::confirm(suffix, &mut dedicated_downlink).await?;
260
261    // got confirmation over the dedicated channel
262    Invitation::expect(suffix, &mut dedicated_uplink).await?;
263
264    // create Locators
265    let local = Locator::new(
266        endpoint.protocol(),
267        dedicated_uplink_path,
268        endpoint.metadata(),
269    )?;
270    let remote = Locator::new(
271        endpoint.protocol(),
272        dedicated_downlink_path,
273        endpoint.metadata(),
274    )?;
275
276    let link = Arc::new(UnicastPipe {
277        r: UnsafeCell::new(dedicated_uplink),
278        w: UnsafeCell::new(dedicated_downlink),
279        local,
280        remote,
281    });
282
283    // send newly established link to manager
284    manager.send_async(LinkUnicast(link)).await?;
285
286    ZResult::Ok(())
287}
288
289struct UnicastPipeListener {
290    uplink_locator: Locator,
291    token: CancellationToken,
292    handle: JoinHandle<()>,
293}
294impl UnicastPipeListener {
295    async fn listen(endpoint: EndPoint, manager: Arc<NewLinkChannelSender>) -> ZResult<Self> {
296        let (path, access_mode) = endpoint_to_pipe_path(&endpoint);
297        let (path_uplink, path_downlink) = split_pipe_path(&path);
298        let local = Locator::new(endpoint.protocol(), path, endpoint.metadata())?;
299
300        // create request channel
301        let mut request_channel = PipeR::new(&path_uplink, access_mode).await?;
302
303        let token = CancellationToken::new();
304        let c_token = token.clone();
305
306        // WARN: The spawn_blocking is mandatory verified by the ping/pong test
307        // create listening task
308        let handle = tokio::task::spawn_blocking(move || {
309            ZRuntime::Acceptor.block_on(async move {
310                loop {
311                    tokio::select! {
312                        _ = handle_incoming_connections(
313                            &endpoint,
314                            &manager,
315                            &mut request_channel,
316                            &path_downlink,
317                            &path_uplink,
318                            access_mode,
319                        ) => {}
320
321                        _ = c_token.cancelled() => break
322                    }
323                }
324            })
325        });
326
327        Ok(Self {
328            uplink_locator: local,
329            token,
330            handle,
331        })
332    }
333
334    fn stop_listening(self) {
335        self.token.cancel();
336        let _ = ResolveFuture::new(self.handle).wait();
337    }
338}
339
340fn get_dedicated_pipe_names(
341    path_downlink: &str,
342    path_uplink: &str,
343    suffix: u32,
344) -> (String, String) {
345    let suffix_str = suffix.to_string();
346    let path_uplink = path_uplink.to_string() + &suffix_str;
347    let path_downlink = path_downlink.to_string() + &suffix_str;
348    (path_downlink, path_uplink)
349}
350
351async fn create_pipe(
352    path_uplink: &str,
353    path_downlink: &str,
354    access_mode: u32,
355) -> ZResult<(PipeR, u32, String, String)> {
356    // generate random suffix
357    let suffix: u32 = rand::thread_rng().gen();
358
359    // generate uplink and downlink names
360    let (path_downlink, path_uplink) = get_dedicated_pipe_names(path_downlink, path_uplink, suffix);
361
362    // try create uplink and downlink pipes to ensure that the selected suffix is available
363    let downlink = PipeR::new(&path_downlink, access_mode).await?;
364    let _uplink = PipeR::new(&path_uplink, access_mode).await?; // uplink would be dropped, that is OK!
365
366    Ok((downlink, suffix, path_downlink, path_uplink))
367}
368
369async fn dedicate_pipe(
370    path_uplink: &str,
371    path_downlink: &str,
372    access_mode: u32,
373) -> ZResult<(PipeR, u32, String, String)> {
374    for _ in 0..LINUX_PIPE_DEDICATE_TRIES {
375        match create_pipe(path_uplink, path_downlink, access_mode).await {
376            Err(_) => {}
377            val => {
378                return val;
379            }
380        }
381    }
382    bail!("Unabe to dedicate pipe!")
383}
384
385struct UnicastPipeClient;
386impl UnicastPipeClient {
387    async fn connect_to(endpoint: EndPoint) -> ZResult<UnicastPipe> {
388        let (path, access_mode) = endpoint_to_pipe_path(&endpoint);
389        let (path_uplink, path_downlink) = split_pipe_path(&path);
390
391        // open the request channel
392        // this channel would be used to invite listener to the dedicated channel
393        // listener owns the request channel, so failure of this call means that there is nobody listening on the provided endpoint
394        let mut request_channel = PipeW::new(&path_uplink).await?;
395
396        // create dedicated channel prerequisites. The creation code also ensures that nobody else would use the same channel concurrently
397        let (
398            mut dedicated_downlink,
399            dedicated_suffix,
400            dedicated_donlink_path,
401            dedicated_uplink_path,
402        ) = dedicate_pipe(&path_uplink, &path_downlink, access_mode).await?;
403
404        // invite the listener to our dedicated channel over the request channel
405        Invitation::send(dedicated_suffix, &mut request_channel).await?;
406
407        // read response that should be sent over the dedicated channel, confirming that everything is OK
408        // on the listener's side and it is already working with the dedicated channel
409        Invitation::expect(dedicated_suffix, &mut dedicated_downlink).await?;
410
411        // open dedicated uplink
412        let mut dedicated_uplink = PipeW::new(&dedicated_uplink_path).await?;
413
414        // final confirmation over the dedicated uplink
415        Invitation::confirm(dedicated_suffix, &mut dedicated_uplink).await?;
416
417        // create Locators
418        let local = Locator::new(
419            endpoint.protocol(),
420            dedicated_donlink_path,
421            endpoint.metadata(),
422        )?;
423        let remote = Locator::new(
424            endpoint.protocol(),
425            dedicated_uplink_path,
426            endpoint.metadata(),
427        )?;
428
429        Ok(UnicastPipe {
430            r: UnsafeCell::new(dedicated_downlink),
431            w: UnsafeCell::new(dedicated_uplink),
432            local,
433            remote,
434        })
435    }
436}
437
438struct UnicastPipe {
439    // The underlying pipes wrapped into async_io
440    // SAFETY: Async requires &mut for read and write operations. This means
441    //         that concurrent reads and writes are not possible. To achieve that,
442    //         we use an UnsafeCell for interior mutability. Using an UnsafeCell
443    //         is safe in our case since the transmission and reception logic
444    //         already ensures that no concurrent reads or writes can happen on
445    //         the same stream: there is only one task at the time that writes on
446    //         the stream and only one task at the time that reads from the stream.
447    r: UnsafeCell<PipeR>,
448    w: UnsafeCell<PipeW>,
449    local: Locator,
450    remote: Locator,
451}
452
453impl UnicastPipe {
454    // SAFETY: It is safe to suppress Clippy warning since no concurrent access will ever happen.
455    // The write and read pipes are independent and support full-duplex operation,
456    // and single-direction operations are aligned at the transport side and will never access link concurrently
457    #[allow(clippy::mut_from_ref)]
458    fn get_r_mut(&self) -> &mut PipeR {
459        unsafe { &mut *self.r.get() }
460    }
461
462    #[allow(clippy::mut_from_ref)]
463    fn get_w_mut(&self) -> &mut PipeW {
464        unsafe { &mut *self.w.get() }
465    }
466}
467// Promise that proper synchronization exists *around accesses*.
468unsafe impl Sync for UnicastPipe {}
469
470impl Drop for UnicastPipe {
471    fn drop(&mut self) {}
472}
473
474#[async_trait]
475impl LinkUnicastTrait for UnicastPipe {
476    async fn close(&self) -> ZResult<()> {
477        tracing::trace!("Closing Unix Pipe link: {}", self);
478        Ok(())
479    }
480
481    async fn write(&self, buffer: &[u8]) -> ZResult<usize> {
482        self.get_w_mut().write(buffer).await
483    }
484
485    async fn write_all(&self, buffer: &[u8]) -> ZResult<()> {
486        self.get_w_mut().write_all(buffer).await
487    }
488
489    async fn read(&self, buffer: &mut [u8]) -> ZResult<usize> {
490        self.get_r_mut().read(buffer).await
491    }
492
493    async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> {
494        self.get_r_mut().read_exact(buffer).await
495    }
496
497    #[inline(always)]
498    fn get_src(&self) -> &Locator {
499        &self.local
500    }
501
502    #[inline(always)]
503    fn get_dst(&self) -> &Locator {
504        &self.remote
505    }
506
507    #[inline(always)]
508    fn get_mtu(&self) -> BatchSize {
509        LINUX_PIPE_MAX_MTU
510    }
511
512    #[inline(always)]
513    fn get_interface_names(&self) -> Vec<String> {
514        // @TODO: Not supported for now
515        tracing::debug!("The get_interface_names for UnicastPipe is not supported");
516        vec![]
517    }
518
519    #[inline(always)]
520    fn is_reliable(&self) -> bool {
521        super::IS_RELIABLE
522    }
523
524    #[inline(always)]
525    fn is_streamed(&self) -> bool {
526        true
527    }
528
529    #[inline(always)]
530    fn get_auth_id(&self) -> &LinkAuthId {
531        &LinkAuthId::Unixpipe
532    }
533}
534
535impl fmt::Display for UnicastPipe {
536    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537        write!(f, "{} => {}", self.local, self.remote)?;
538        Ok(())
539    }
540}
541
542impl fmt::Debug for UnicastPipe {
543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544        f.debug_struct("UnicastPipe")
545            .field("src", &self.local)
546            .field("dst", &self.remote)
547            .finish()
548    }
549}
550
551pub struct LinkManagerUnicastPipe {
552    manager: Arc<NewLinkChannelSender>,
553    listeners: tokio::sync::RwLock<HashMap<EndPoint, UnicastPipeListener>>,
554}
555
556impl LinkManagerUnicastPipe {
557    pub fn new(manager: NewLinkChannelSender) -> Self {
558        Self {
559            manager: Arc::new(manager),
560            listeners: tokio::sync::RwLock::new(HashMap::new()),
561        }
562    }
563}
564impl ConstructibleLinkManagerUnicast<()> for LinkManagerUnicastPipe {
565    fn new(new_link_sender: NewLinkChannelSender, _: ()) -> ZResult<Self> {
566        Ok(Self::new(new_link_sender))
567    }
568}
569
570#[async_trait]
571impl LinkManagerUnicastTrait for LinkManagerUnicastPipe {
572    async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
573        let pipe = UnicastPipeClient::connect_to(endpoint).await?;
574        Ok(LinkUnicast(Arc::new(pipe)))
575    }
576
577    async fn new_listener(&self, endpoint: EndPoint) -> ZResult<Locator> {
578        let listener = UnicastPipeListener::listen(endpoint.clone(), self.manager.clone()).await?;
579        let locator = listener.uplink_locator.clone();
580        zasyncwrite!(self.listeners).insert(endpoint, listener);
581        Ok(locator)
582    }
583
584    async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> {
585        let removed = zasyncwrite!(self.listeners).remove(endpoint);
586        match removed {
587            Some(val) => {
588                val.stop_listening();
589                Ok(())
590            }
591            None => bail!("No listener found for endpoint {}", endpoint),
592        }
593    }
594
595    async fn get_listeners(&self) -> Vec<EndPoint> {
596        zasyncread!(self.listeners).keys().cloned().collect()
597    }
598
599    async fn get_locators(&self) -> Vec<Locator> {
600        zasyncread!(self.listeners)
601            .values()
602            .map(|v| v.uplink_locator.clone())
603            .collect()
604    }
605}
606
607fn endpoint_to_pipe_path(endpoint: &EndPoint) -> (String, u32) {
608    let path = endpoint.address().to_string();
609    let access_mode = endpoint
610        .config()
611        .get(config::FILE_ACCESS_MASK)
612        .map_or(*FILE_ACCESS_MASK, |val| {
613            val.parse().unwrap_or(*FILE_ACCESS_MASK)
614        });
615    (path, access_mode)
616}
617
618fn split_pipe_path(path: &str) -> (String, String) {
619    let path_uplink = format!("{path}_uplink");
620    let path_downlink = format!("{path}_downlink");
621    (path_uplink, path_downlink)
622}