dimas_com/zenoh/
publisher.rs1#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11use crate::error::Error;
13use alloc::{string::String, sync::Arc};
14use core::fmt::Debug;
15use dimas_core::{Result, enums::OperationState, message_types::Message, traits::Capability};
16use tracing::{Level, instrument};
17use zenoh::{
18 Session, Wait,
19 qos::{CongestionControl, Priority},
20};
21#[cfg(feature = "unstable")]
22use zenoh::{qos::Reliability, sample::Locality};
23pub struct Publisher {
28 session: Arc<Session>,
30 selector: String,
31 activation_state: OperationState,
32 #[cfg(feature = "unstable")]
33 allowed_destination: Locality,
34 congestion_control: CongestionControl,
35 encoding: String,
36 express: bool,
37 priority: Priority,
38 #[cfg(feature = "unstable")]
39 reliability: Reliability,
40 declared_publ: std::sync::Mutex<Option<zenoh::pubsub::Publisher<'static>>>,
41}
42
43impl Debug for Publisher {
44 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45 f.debug_struct("Publisher")
46 .field("selector", &self.selector)
47 .field("initialized", &self.declared_publ)
48 .finish_non_exhaustive()
49 }
50}
51
52impl crate::traits::Publisher for Publisher {
53 fn selector(&self) -> &str {
55 &self.selector
56 }
57
58 #[instrument(name="publish", level = Level::ERROR, skip_all)]
62 fn put(&self, message: Message) -> Result<()> {
63 self.declared_publ.lock().map_or_else(
64 |_| todo!(),
65 |publisher| match publisher
66 .as_ref()
67 .ok_or(Error::AccessPublisher)?
68 .put(message.value())
69 .wait()
70 {
71 Ok(()) => Ok(()),
72 Err(source) => Err(Error::PublishingPut { source }.into()),
73 },
74 )
75 }
76
77 #[instrument(level = Level::ERROR, skip_all)]
81 fn delete(&self) -> Result<()> {
82 self.declared_publ.lock().map_or_else(
83 |_| todo!(),
84 |publisher| match publisher
85 .as_ref()
86 .ok_or(Error::AccessPublisher)?
87 .delete()
88 .wait()
89 {
90 Ok(()) => Ok(()),
91 Err(source) => Err(Error::PublishingDelete { source }.into()),
92 },
93 )
94 }
95}
96
97impl Capability for Publisher {
98 fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
99 if state >= &self.activation_state {
100 return self.init();
101 } else if state < &self.activation_state {
102 return self.de_init();
103 }
104 Ok(())
105 }
106}
107
108impl Publisher {
109 #[allow(clippy::too_many_arguments)]
111 #[must_use]
112 pub const fn new(
113 session: Arc<Session>,
114 selector: String,
115 activation_state: OperationState,
116 #[cfg(feature = "unstable")] allowed_destination: Locality,
117 congestion_control: CongestionControl,
118 encoding: String,
119 express: bool,
120 priority: Priority,
121 #[cfg(feature = "unstable")] reliability: Reliability,
122 ) -> Self {
123 Self {
124 session,
125 selector,
126 activation_state,
127 #[cfg(feature = "unstable")]
128 allowed_destination,
129 congestion_control,
130 encoding,
131 express,
132 priority,
133 #[cfg(feature = "unstable")]
134 reliability,
135 declared_publ: std::sync::Mutex::new(None),
136 }
137 }
138
139 fn init(&self) -> Result<()> {
143 self.de_init()?;
144
145 let builder = self
146 .session
147 .declare_publisher(self.selector.clone())
148 .congestion_control(self.congestion_control)
149 .encoding(self.encoding.as_str())
150 .express(self.express)
151 .priority(self.priority);
152
153 #[cfg(feature = "unstable")]
154 let builder = builder
155 .allowed_destination(self.allowed_destination)
156 .reliability(self.reliability);
157
158 let new_publisher = builder.wait()?;
159 self.declared_publ.lock().map_or_else(
161 |_| todo!(),
162 |mut publisher| {
163 publisher.replace(new_publisher);
164 Ok(())
165 },
166 )
167 }
168
169 #[allow(clippy::unnecessary_wraps)]
173 fn de_init(&self) -> Result<()> {
174 self.declared_publ.lock().map_or_else(
175 |_| todo!(),
176 |mut publisher| {
177 publisher.take();
178 Ok(())
179 },
180 )
181 }
182}
183#[cfg(test)]
186mod tests {
187 use super::*;
188
189 const fn is_normal<T: Sized + Send + Sync>() {}
191
192 #[test]
193 const fn normal_types() {
194 is_normal::<Publisher>();
195 }
196}