Skip to main content

zerodds_dcps_async/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! ZeroDDS async-DCPS-API (zerodds-async-1.0).
4//!
5//! Crate `zerodds-dcps-async`. Safety classification: **STANDARD**.
6//!
7//! Runtime-agnostische async-Wrappers um die DCPS-Sync-API. Newtypes
8//! teilen den internen `Arc<...>` mit den Sync-Pendants — kein
9//! State-Duplikat, kein Performance-Overhead.
10//!
11//! # Beispiel
12//!
13//! ```ignore
14//! use zerodds_dcps_async::AsyncDomainParticipantFactory;
15//! use futures_core::Stream;
16//! use std::time::Duration;
17//!
18//! #[tokio::main]
19//! async fn main() {
20//!     let factory = AsyncDomainParticipantFactory::instance();
21//!     let participant = factory.create_participant_offline(0);
22//!     // ... topic + reader + writer wie sync.
23//!     let writer = /* ... */;
24//!     let reader = /* ... */;
25//!
26//!     // write & take laufen ohne Thread-Block.
27//!     writer.write(&sample).await.unwrap();
28//!     let samples = reader.take(Duration::from_secs(1)).await.unwrap();
29//! }
30//! ```
31//!
32//! Spec: `docs/specs/zerodds-async-1.0.md`.
33
34#![cfg_attr(not(feature = "std"), no_std)]
35#![warn(missing_docs)]
36#![forbid(unsafe_code)]
37
38extern crate alloc;
39
40mod factory;
41mod participant;
42mod publisher;
43mod reader;
44mod subscriber;
45mod writer;
46
47pub use factory::AsyncDomainParticipantFactory;
48pub use participant::AsyncDomainParticipant;
49pub use publisher::AsyncPublisher;
50pub use reader::{AsyncDataReader, DataAvailableStream, PublicationMatchedStream, SampleStream};
51pub use subscriber::AsyncSubscriber;
52pub use writer::AsyncDataWriter;
53
54// Re-Exports der Sync-Types die der Caller weiterhin braucht.
55pub use zerodds_dcps::status::SubscriptionMatchedStatus;
56pub use zerodds_dcps::{
57    DataReaderQos, DataWriterQos, DdsError, DdsType, DomainParticipantQos, InstanceHandle,
58    PublisherQos, Result, SubscriberQos, Topic, TopicQos,
59};
60
61/// Runtime-agnostischer Sleep-Helper. Polled bis Deadline; Wakeup
62/// kommt durch einen detached thread (kein tokio-Hard-Dep).
63///
64/// Mit `--features tokio-glue` wird tokio::time::sleep verwendet,
65/// das Wakeup vom Tokio-Reactor kommt — kein Thread-Spawn-Overhead.
66#[cfg(feature = "std")]
67pub(crate) async fn yield_for(d: core::time::Duration) {
68    #[cfg(feature = "tokio-glue")]
69    {
70        tokio::time::sleep(d).await;
71    }
72    #[cfg(not(feature = "tokio-glue"))]
73    {
74        // Default-Pfad: detached-thread-Sleep + Waker.
75        SleepFuture::new(d).await
76    }
77}
78
79#[cfg(all(feature = "std", not(feature = "tokio-glue")))]
80struct SleepFuture {
81    deadline: std::time::Instant,
82    spawned: bool,
83}
84
85#[cfg(all(feature = "std", not(feature = "tokio-glue")))]
86impl SleepFuture {
87    fn new(d: core::time::Duration) -> Self {
88        Self {
89            deadline: std::time::Instant::now() + d,
90            spawned: false,
91        }
92    }
93}
94
95#[cfg(all(feature = "std", not(feature = "tokio-glue")))]
96impl core::future::Future for SleepFuture {
97    type Output = ();
98
99    fn poll(
100        mut self: core::pin::Pin<&mut Self>,
101        cx: &mut core::task::Context<'_>,
102    ) -> core::task::Poll<()> {
103        if std::time::Instant::now() >= self.deadline {
104            return core::task::Poll::Ready(());
105        }
106        if !self.spawned {
107            self.spawned = true;
108            let waker = cx.waker().clone();
109            let deadline = self.deadline;
110            std::thread::spawn(move || {
111                let now = std::time::Instant::now();
112                if deadline > now {
113                    std::thread::sleep(deadline - now);
114                }
115                waker.wake();
116            });
117        }
118        core::task::Poll::Pending
119    }
120}