datex_core/shared_values/
observers.rs1use crate::{
2 dif::update::{DIFUpdate, DIFUpdateData},
3 shared_values::{
4 shared_container::SharedContainer,
5 shared_value_container::SharedValueContainer,
6 },
7};
8
9use crate::prelude::*;
10use core::{cell::RefCell, fmt::Display, result::Result};
11use serde::{Deserialize, Serialize};
12
13#[derive(Debug)]
14pub enum ObserverError {
15 ObserverNotFound,
16 ImmutableReference,
17}
18
19impl Display for ObserverError {
20 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
21 match self {
22 ObserverError::ObserverNotFound => {
23 core::write!(f, "Observer not found")
24 }
25 ObserverError::ImmutableReference => {
26 core::write!(f, "Cannot observe an immutable reference")
27 }
28 }
29 }
30}
31
32pub type ObserverCallback = Rc<dyn Fn(&DIFUpdateData, TransceiverId)>;
33
34pub type TransceiverId = u32;
37
38#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
39pub struct ObserveOptions {
40 pub relay_own_updates: bool,
42}
43
44#[derive(Clone)]
45pub struct Observer {
46 pub transceiver_id: TransceiverId,
47 pub options: ObserveOptions,
48 pub callback: ObserverCallback,
49}
50
51impl Observer {
52 pub fn new<F: Fn(&DIFUpdateData, TransceiverId) + 'static>(
55 callback: F,
56 ) -> Self {
57 Observer {
58 transceiver_id: 0,
59 options: ObserveOptions::default(),
60 callback: Rc::new(callback),
61 }
62 }
63}
64
65impl SharedContainer {
66 pub fn observe(&self, observer: Observer) -> Result<u32, ObserverError> {
70 Ok(self
72 .ensure_mutable_value_reference()?
73 .borrow_mut()
74 .observers
75 .add(observer))
76
77 }
79
80 pub fn unobserve(&self, observer_id: u32) -> Result<(), ObserverError> {
83 let removed = self
84 .ensure_mutable_value_reference()?
85 .borrow_mut()
86 .observers
87 .remove(observer_id);
88 if removed.is_some() {
89 Ok(())
90 } else {
91 Err(ObserverError::ObserverNotFound)
92 }
93 }
94
95 pub fn update_observer_options(
98 &self,
99 observer_id: u32,
100 options: ObserveOptions,
101 ) -> Result<(), ObserverError> {
102 let vr = self.ensure_mutable_value_reference()?;
103 let mut vr_borrow = vr.borrow_mut();
104 if let Some(observer) = vr_borrow.observers.get_mut(&observer_id) {
105 observer.options = options;
106 Ok(())
107 } else {
108 Err(ObserverError::ObserverNotFound)
109 }
110 }
111
112 pub fn observers_ids(&self) -> Vec<u32> {
115 match self {
116 SharedContainer::Type(_) => vec![],
117 SharedContainer::Value(vr) => {
118 vr.borrow().observers.keys().cloned().collect()
119 }
120 }
121 }
122
123 pub fn unobserve_all(&self) -> Result<(), ObserverError> {
126 self.ensure_mutable_value_reference()?;
127 for id in self.observers_ids() {
128 let _ = self.unobserve(id);
129 }
130 Ok(())
131 }
132
133 fn ensure_mutable_value_reference(
136 &self,
137 ) -> Result<Rc<RefCell<SharedValueContainer>>, ObserverError> {
138 self.mutable_reference()
139 .ok_or(ObserverError::ImmutableReference)
140 }
141
142 pub fn notify_observers(&self, dif: &DIFUpdate) {
144 let observer_callbacks: Vec<ObserverCallback> = match self {
145 SharedContainer::Type(_) => return, SharedContainer::Value(vr) => {
147 let vr_ref = vr.borrow();
149 vr_ref
150 .observers
151 .iter()
152 .filter(|(_, f)| {
153 f.options.relay_own_updates
155 || f.transceiver_id != dif.source_id
156 })
157 .map(|(_, f)| f.callback.clone())
158 .collect()
159 }
160 };
161
162 for callback in observer_callbacks {
164 callback(&dif.data, dif.source_id);
165 }
166 }
167
168 pub fn has_observers(&self) -> bool {
170 match self {
171 SharedContainer::Type(_) => false,
172 SharedContainer::Value(vr) => !vr.borrow().observers.is_empty(),
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use crate::{
180 dif::{
181 representation::DIFValueRepresentation,
182 r#type::DIFTypeDefinition,
183 update::{DIFUpdate, DIFUpdateData},
184 value::{DIFValue, DIFValueContainer},
185 },
186 prelude::*,
187 runtime::memory::Memory,
188 shared_values::{
189 observers::{
190 ObserveOptions, Observer, ObserverError, TransceiverId,
191 },
192 pointer::Pointer,
193 shared_container::{SharedContainer, SharedContainerMutability},
194 },
195 values::{core_values::map::Map, value_container::ValueContainer},
196 };
197 use core::{assert_matches, cell::RefCell};
198
199 fn record_dif_updates(
203 reference: &SharedContainer,
204 transceiver_id: TransceiverId,
205 observe_options: ObserveOptions,
206 ) -> Rc<RefCell<Vec<DIFUpdate<'static>>>> {
207 let update_collector = Rc::new(RefCell::new(Vec::new()));
208 let update_collector_clone = update_collector.clone();
209 reference
210 .observe(Observer {
211 transceiver_id,
212 options: observe_options,
213 callback: Rc::new(move |update_data, source_id| {
214 update_collector_clone.borrow_mut().push(DIFUpdate {
215 source_id,
216 data: Cow::Owned(update_data.clone()),
217 });
218 }),
219 })
220 .expect("Failed to attach observer");
221 update_collector
222 }
223
224 #[test]
225 fn immutable_reference_observe_fails() {
226 let r = SharedContainer::try_boxed(
227 42.into(),
228 None,
229 Pointer::NULL,
230 SharedContainerMutability::Immutable,
231 )
232 .unwrap();
233 assert_matches!(
234 r.observe(Observer::new(|_, _| {})),
235 Err(ObserverError::ImmutableReference)
236 );
237
238 let r = SharedContainer::try_boxed(
239 42.into(),
240 None,
241 Pointer::NULL,
242 SharedContainerMutability::Mutable,
243 )
244 .unwrap();
245 assert_matches!(r.observe(Observer::new(|_, _| {})), Ok(_));
246 }
247
248 #[test]
249 fn observe_and_unobserve() {
250 let r = SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
251 assert!(!r.has_observers());
252 let observer_id = r.observe(Observer::new(|_, _| {})).unwrap();
253 assert!(observer_id == 0);
254 assert!(r.has_observers());
255 assert!(r.unobserve(observer_id).is_ok());
256 assert!(!r.has_observers());
257 assert_matches!(
258 r.unobserve(observer_id),
259 Err(ObserverError::ObserverNotFound)
260 );
261 }
262
263 #[test]
264 fn observer_ids_incremental() {
265 let r = SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
266 let id1 = r.observe(Observer::new(|_, _| {})).unwrap();
267 let id2 = r.observe(Observer::new(|_, _| {})).unwrap();
268 assert!(id1 == 0);
269 assert!(id2 == 1);
270 assert!(r.unobserve(id1).is_ok());
271 let id3 = r.observe(Observer::new(|_, _| {})).unwrap();
272 assert!(id3 == 0);
273 let id4 = r.observe(Observer::new(|_, _| {})).unwrap();
274 assert!(id4 == 2);
275 }
276
277 #[test]
278 fn observe_replace() {
279 let int_ref =
280 SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
281 let observed_updates =
282 record_dif_updates(&int_ref, 0, ObserveOptions::default());
283
284 int_ref
286 .try_replace(1, None, 43)
287 .expect("Failed to set value");
288
289 let expected_update = DIFUpdate {
291 source_id: 1,
292 data: Cow::Owned(DIFUpdateData::replace(
293 DIFValueContainer::from_value_container(&ValueContainer::from(
294 43,
295 )),
296 )),
297 };
298
299 assert_eq!(*observed_updates.borrow(), vec![expected_update]);
300 }
301
302 #[test]
303 fn observe_replace_same_transceiver() {
304 let int_ref =
305 SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
306 let observed_update =
307 record_dif_updates(&int_ref, 0, ObserveOptions::default());
308
309 int_ref
311 .try_replace(0, None, 43)
312 .expect("Failed to set value");
313
314 assert_eq!(*observed_update.borrow(), vec![]);
316 }
317
318 #[test]
319 fn observe_replace_same_transceiver_relay_own_updates() {
320 let int_ref =
321 SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
322 let observed_update = record_dif_updates(
323 &int_ref,
324 0,
325 ObserveOptions {
326 relay_own_updates: true,
327 },
328 );
329
330 int_ref
332 .try_replace(0, None, 43)
333 .expect("Failed to set value");
334
335 let expected_update = DIFUpdate {
337 source_id: 0,
338 data: Cow::Owned(DIFUpdateData::replace(
339 DIFValueContainer::from_value_container(&ValueContainer::from(
340 43,
341 )),
342 )),
343 };
344
345 assert_eq!(*observed_update.borrow(), vec![expected_update]);
346 }
347
348 #[test]
349 fn observe_update_property() {
350 let reference = SharedContainer::boxed_mut(
351 Map::from(vec![
352 ("a".to_string(), ValueContainer::from(1)),
353 ("b".to_string(), ValueContainer::from(2)),
354 ])
355 .into(),
356 Pointer::NULL,
357 )
358 .unwrap();
359 let observed_updates =
360 record_dif_updates(&reference, 0, ObserveOptions::default());
361 reference
363 .try_set_property(1, None, "a", "val".into())
364 .expect("Failed to set property");
365 let expected_update = DIFUpdate {
367 source_id: 1,
368 data: Cow::Owned(DIFUpdateData::set(
369 "a",
370 DIFValue::new(
371 DIFValueRepresentation::String("val".to_string()),
372 None as Option<DIFTypeDefinition>,
373 ),
374 )),
375 };
376 assert_eq!(*observed_updates.borrow(), vec![expected_update]);
377 }
378}