pub struct Subscriber { /* private fields */ }Expand description
Subscriber — Entity-Gruppe fuer DataReader.
Implementations§
Source§impl Subscriber
impl Subscriber
Sourcepub fn contains_reader(&self, handle: InstanceHandle) -> bool
pub fn contains_reader(&self, handle: InstanceHandle) -> bool
Spec §2.2.2.2.1.10 — true wenn handle ein DataReader ist,
der ueber diesen Subscriber erzeugt wurde.
Sourcepub fn begin_access(&self)
pub fn begin_access(&self)
Spec §2.2.2.5.2.8 begin_access — markiert den Beginn eines
kohaerenten Read-Sets. Verschachtelung ist erlaubt; jeder
Aufruf erhoeht einen internen Counter, jedes end_access
erniedrigt ihn.
Sourcepub fn end_access(&self) -> Result<()>
pub fn end_access(&self) -> Result<()>
Spec §2.2.2.5.2.9 end_access — Gegenstueck zu begin_access.
§Errors
DdsError::PreconditionNotMet wenn end_access ohne
vorhergehendes begin_access gerufen wird.
Sourcepub fn is_access_open(&self) -> bool
pub fn is_access_open(&self) -> bool
true wenn aktuell ein Group-Access offen ist.
Sourcepub fn set_listener(
&self,
listener: Option<ArcSubscriberListener>,
mask: StatusMask,
)
pub fn set_listener( &self, listener: Option<ArcSubscriberListener>, mask: StatusMask, )
setzt den SubscriberListener + StatusMask. None
loescht den Slot. Spec §2.2.2.5.6.x set_listener.
Sourcepub fn get_listener(&self) -> Option<ArcSubscriberListener>
pub fn get_listener(&self) -> Option<ArcSubscriberListener>
aktueller Listener-Klon.
Sourcepub fn create_datareader<T: DdsType + Send + 'static>(
&self,
topic: &Topic<T>,
qos: DataReaderQos,
) -> Result<DataReader<T>>
pub fn create_datareader<T: DdsType + Send + 'static>( &self, topic: &Topic<T>, qos: DataReaderQos, ) -> Result<DataReader<T>>
Examples found in repository?
examples/hello_dds_subscriber.rs (line 33)
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
examples/shapes_demo_subscriber.rs (line 27)
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}examples/zerodds_perf.rs (line 97)
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}examples/multi_endpoint_perf.rs (line 100)
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}examples/shapes_demo_viewer.rs (line 132)
117fn main() -> Result<(), Box<dyn std::error::Error>> {
118 let args: Vec<String> = env::args().collect();
119 let domain_id: i32 = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
120
121 let stop = Arc::new(AtomicBool::new(false));
122 install_signal_handler(stop.clone());
123
124 let factory = DomainParticipantFactory::instance();
125 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
126 let subscriber = participant.create_subscriber(SubscriberQos::default());
127
128 let topics = ["Square", "Circle", "Triangle"];
129 let mut readers = Vec::new();
130 for t in &topics {
131 let topic = participant.create_topic::<ShapeType>(t, TopicQos::default())?;
132 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
133 readers.push((*t, reader));
134 }
135
136 eprintln!(
137 "shapes_demo_viewer: Domain={domain_id} — subscribed to Square / Circle / Triangle. Ctrl-C beendet."
138 );
139 eprintln!("Warte auf Discovery + erste Samples...");
140
141 // Hide cursor + clear screen.
142 print!("\x1B[?25l\x1B[2J");
143 let mut stdout = std::io::stdout();
144 stdout.flush().ok();
145
146 // Pro (topic, color) speichern wir die letzte Position.
147 let mut shapes: HashMap<(String, String), (i32, i32)> = HashMap::new();
148 let mut sample_count: u64 = 0;
149 let mut grid: Vec<Vec<(char, &'static str)>> = vec![vec![(' ', ""); VIEW_W]; VIEW_H];
150
151 while !stop.load(Ordering::Relaxed) {
152 // 1) Take all pending samples per topic.
153 for (topic_name, reader) in &readers {
154 if let Ok(samples) = reader.take() {
155 for sample in samples {
156 shapes.insert(
157 ((*topic_name).to_string(), sample.color.clone()),
158 (sample.x, sample.y),
159 );
160 sample_count += 1;
161 }
162 }
163 }
164
165 // 2) Clear grid.
166 for row in &mut grid {
167 for cell in row {
168 *cell = (' ', "");
169 }
170 }
171
172 // 3) Plot shapes.
173 for ((topic, color), (x, y)) in &shapes {
174 let gx = map_x(*x);
175 let gy = map_y(*y);
176 if gy < VIEW_H && gx < VIEW_W {
177 grid[gy][gx] = (glyph(topic), ansi_color(color));
178 }
179 }
180
181 // 4) Render: home, draw, status line.
182 print!("\x1B[H");
183 // top border
184 print!("┌");
185 for _ in 0..VIEW_W {
186 print!("─");
187 }
188 println!("┐");
189 for row in &grid {
190 print!("│");
191 for (ch, color) in row {
192 if color.is_empty() {
193 print!(" ");
194 } else {
195 print!("{color}{ch}{ANSI_RESET}");
196 }
197 }
198 println!("│");
199 }
200 print!("└");
201 for _ in 0..VIEW_W {
202 print!("─");
203 }
204 println!("┘");
205 println!(
206 "shapes={:3} samples={:6} domain={} — Ctrl-C beendet ",
207 shapes.len(),
208 sample_count,
209 domain_id,
210 );
211 stdout.flush().ok();
212
213 thread::sleep(Duration::from_millis(60));
214 }
215
216 // Show cursor again.
217 print!("\x1B[?25h");
218 println!("[shapes_demo_viewer] beendet. Total samples empfangen: {sample_count}");
219 Ok(())
220}Trait Implementations§
Source§impl Debug for Subscriber
impl Debug for Subscriber
Source§impl Entity for Subscriber
Available on crate feature std only.
impl Entity for Subscriber
Available on crate feature
std only.Source§type Qos = SubscriberQos
type Qos = SubscriberQos
QoS-Typ fuer diese Entity (z.B.
DomainParticipantQos,
DataWriterQos, …).Source§fn set_qos(&self, qos: Self::Qos) -> Result<()>
fn set_qos(&self, qos: Self::Qos) -> Result<()>
Aendert QoS. Pre-enable: alles erlaubt. Post-enable: nur
Felder mit “Changeable=YES” — sonst
ImmutablePolicy-Error.
Spec §2.2.2.1.2 set_qos. Read moreSource§fn enable(&self) -> Result<()>
fn enable(&self) -> Result<()>
Enabled die Entity (idempotent). Spec §2.2.2.1.4
enable. Read moreSource§fn entity_state(&self) -> Arc<EntityState> ⓘ
fn entity_state(&self) -> Arc<EntityState> ⓘ
Interner Accessor — jede Impl liefert ihren
Arc<EntityState>.Source§fn is_enabled(&self) -> bool
fn is_enabled(&self) -> bool
True wenn die Entity bereits enabled ist.
Source§fn get_status_condition(&self) -> StatusCondition
fn get_status_condition(&self) -> StatusCondition
StatusCondition dieser Entity.
Spec §2.2.2.1.6 get_status_condition.Source§fn get_status_changes(&self) -> StatusMask
fn get_status_changes(&self) -> StatusMask
Bitmask der Status-Kinds, die seit letztem Read geaendert haben.
Spec §2.2.2.1.5
get_status_changes.Source§fn get_instance_handle(&self) -> InstanceHandle
fn get_instance_handle(&self) -> InstanceHandle
Lokaler 64-Bit-Identifier. Spec §2.2.2.1.7
get_instance_handle.Auto Trait Implementations§
impl Freeze for Subscriber
impl !RefUnwindSafe for Subscriber
impl Send for Subscriber
impl Sync for Subscriber
impl Unpin for Subscriber
impl UnsafeUnpin for Subscriber
impl !UnwindSafe for Subscriber
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more