datex_core/references/
observers.rs

1use crate::dif::update::DIFUpdate;
2use crate::references::{
3    reference::Reference, value_reference::ValueReference,
4};
5use serde::{Deserialize, Serialize};
6use std::{cell::RefCell, fmt::Display, rc::Rc};
7
8#[derive(Debug)]
9pub enum ObserverError {
10    ObserverNotFound,
11    FinalReference,
12}
13
14impl Display for ObserverError {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        match self {
17            ObserverError::ObserverNotFound => {
18                write!(f, "Observer not found")
19            }
20            ObserverError::FinalReference => {
21                write!(f, "Cannot observe a final reference")
22            }
23        }
24    }
25}
26
27pub type ObserverCallback = Rc<dyn Fn(&DIFUpdate)>;
28
29/// unique identifier for a transceiver (source of updates)
30/// 0-255 are reserved for DIF clients
31pub type TransceiverId = u32;
32
33#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
34pub struct ObserveOptions {
35    /// If true, the transceiver will be notified of changes that originated from itself
36    pub relay_own_updates: bool,
37}
38
39#[derive(Clone)]
40pub struct Observer {
41    pub transceiver_id: TransceiverId,
42    pub options: ObserveOptions,
43    pub callback: ObserverCallback,
44}
45
46impl Observer {
47    /// Creates a new observer with the given callback function,
48    /// using default options and a transceiver ID of 0.
49    pub fn new<F: Fn(&DIFUpdate) + 'static>(callback: F) -> Self {
50        Observer {
51            transceiver_id: 0,
52            options: ObserveOptions::default(),
53            callback: Rc::new(callback),
54        }
55    }
56}
57
58impl Reference {
59    /// Adds an observer to this reference that will be notified on value changes.
60    /// Returns an error if the reference is immutable or a type reference.
61    /// The returned u32 is an observer ID that can be used to remove the observer later.
62    pub fn observe(&self, observer: Observer) -> Result<u32, ObserverError> {
63        // Add the observer to the list of observers
64        Ok(self
65            .ensure_non_final_value_reference()?
66            .borrow_mut()
67            .observers
68            .add(observer))
69
70        // TODO #299: also set observers on child references if not yet active, keep track of active observers
71    }
72
73    /// Removes an observer by its ID.
74    /// Returns an error if the observer ID is not found or the reference is immutable.
75    pub fn unobserve(&self, observer_id: u32) -> Result<(), ObserverError> {
76        let removed = self
77            .ensure_non_final_value_reference()?
78            .borrow_mut()
79            .observers
80            .remove(observer_id);
81        if removed.is_some() {
82            Ok(())
83        } else {
84            Err(ObserverError::ObserverNotFound)
85        }
86    }
87
88    /// Updates the options for an existing observer by its ID.
89    /// Returns an error if the observer ID is not found or the reference is immutable.
90    pub fn update_observer_options(
91        &self,
92        observer_id: u32,
93        options: ObserveOptions,
94    ) -> Result<(), ObserverError> {
95        let vr = self.ensure_non_final_value_reference()?;
96        let mut vr_borrow = vr.borrow_mut();
97        if let Some(observer) = vr_borrow.observers.get_mut(&observer_id) {
98            observer.options = options;
99            Ok(())
100        } else {
101            Err(ObserverError::ObserverNotFound)
102        }
103    }
104
105    /// Returns a list of all observer IDs currently registered to this reference.
106    /// A type reference or immutable reference will always return an empty list.
107    pub fn observers_ids(&self) -> Vec<u32> {
108        match self {
109            Reference::TypeReference(_) => vec![],
110            Reference::ValueReference(vr) => {
111                vr.borrow().observers.keys().cloned().collect()
112            }
113        }
114    }
115
116    /// Removes all observers from this reference.
117    /// Returns an error if the reference is immutable.
118    pub fn unobserve_all(&self) -> Result<(), ObserverError> {
119        self.ensure_non_final_value_reference()?;
120        for id in self.observers_ids() {
121            let _ = self.unobserve(id);
122        }
123        Ok(())
124    }
125
126    /// Ensures that this reference is a non-final value reference and returns it.
127    /// Returns an ObserverError if the reference is final or a type reference.
128    fn ensure_non_final_value_reference(
129        &self,
130    ) -> Result<Rc<RefCell<ValueReference>>, ObserverError> {
131        self.non_final_reference()
132            .ok_or(ObserverError::FinalReference)
133    }
134
135    /// Notifies all observers of a change represented by the given DIFUpdate.
136    pub fn notify_observers(&self, dif: &DIFUpdate) {
137        let observer_callbacks: Vec<ObserverCallback> = match self {
138            Reference::TypeReference(_) => return, // no observers
139            Reference::ValueReference(vr) => {
140                // Clone observers while holding borrow
141                let vr_ref = vr.borrow();
142                vr_ref
143                    .observers
144                    .iter()
145                    .filter(|(_, f)| {
146                        // Filter out bounced back transceiver updates if relay_own_updates not enabled
147                        f.options.relay_own_updates
148                            || f.transceiver_id != dif.source_id
149                    })
150                    .map(|(_, f)| f.callback.clone())
151                    .collect()
152            }
153        };
154
155        // Call each observer synchronously
156        for callback in observer_callbacks {
157            callback(dif);
158        }
159    }
160
161    /// Check if there are any observers registered
162    pub fn has_observers(&self) -> bool {
163        match self {
164            Reference::TypeReference(_) => false,
165            Reference::ValueReference(vr) => !vr.borrow().observers.is_empty(),
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use crate::dif::update::{DIFUpdate, DIFUpdateData};
173    use crate::references::observers::{ObserveOptions, TransceiverId};
174    use crate::runtime::memory::Memory;
175    use crate::values::core_values::map::Map;
176    use crate::{
177        dif::{
178            representation::DIFValueRepresentation,
179            r#type::DIFTypeContainer,
180            value::{DIFValue, DIFValueContainer},
181        },
182        references::{
183            observers::ObserverError,
184            reference::{Reference, ReferenceMutability},
185        },
186        values::value_container::ValueContainer,
187    };
188    use datex_core::references::observers::Observer;
189    use std::{assert_matches::assert_matches, cell::RefCell, rc::Rc};
190
191    /// Helper function to record DIF updates observed on a reference
192    /// Returns a Rc<RefCell<Vec<DIFUpdate>>> that contains all observed updates
193    /// The caller can borrow this to inspect the updates after performing operations on the reference
194    fn record_dif_updates(
195        reference: &Reference,
196        transceiver_id: TransceiverId,
197        observe_options: ObserveOptions,
198    ) -> Rc<RefCell<Vec<DIFUpdate>>> {
199        let updates: Rc<RefCell<Vec<DIFUpdate>>> =
200            Rc::new(RefCell::new(vec![]));
201        let updates_clone = Rc::clone(&updates);
202        reference
203            .observe(Observer {
204                transceiver_id,
205                options: observe_options,
206                callback: Rc::new(move |update| {
207                    updates_clone.borrow_mut().push(update.clone());
208                }),
209            })
210            .expect("Failed to attach observer");
211        updates
212    }
213
214    #[test]
215    fn final_reference_observe_fails() {
216        let r = Reference::try_new_from_value_container(
217            42.into(),
218            None,
219            None,
220            ReferenceMutability::Final,
221        )
222        .unwrap();
223        assert_matches!(
224            r.observe(Observer::new(|_| {})),
225            Err(ObserverError::FinalReference)
226        );
227
228        let r = Reference::try_new_from_value_container(
229            42.into(),
230            None,
231            None,
232            ReferenceMutability::Mutable,
233        )
234        .unwrap();
235        assert_matches!(r.observe(Observer::new(|_| {})), Ok(_));
236
237        let r = Reference::try_new_from_value_container(
238            42.into(),
239            None,
240            None,
241            ReferenceMutability::Immutable,
242        )
243        .unwrap();
244        assert_matches!(r.observe(Observer::new(|_| {})), Ok(_));
245    }
246
247    #[test]
248    fn observe_and_unobserve() {
249        let r = Reference::try_mut_from(42.into()).unwrap();
250        assert!(!r.has_observers());
251        let observer_id = r.observe(Observer::new(|_| {})).unwrap();
252        assert!(observer_id == 0);
253        assert!(r.has_observers());
254        assert!(r.unobserve(observer_id).is_ok());
255        assert!(!r.has_observers());
256        assert_matches!(
257            r.unobserve(observer_id),
258            Err(ObserverError::ObserverNotFound)
259        );
260    }
261
262    #[test]
263    fn observer_ids_incremental() {
264        let r = Reference::try_mut_from(42.into()).unwrap();
265        let id1 = r.observe(Observer::new(|_| {})).unwrap();
266        let id2 = r.observe(Observer::new(|_| {})).unwrap();
267        assert!(id1 == 0);
268        assert!(id2 == 1);
269        assert!(r.unobserve(id1).is_ok());
270        let id3 = r.observe(Observer::new(|_| {})).unwrap();
271        assert!(id3 == 0);
272        let id4 = r.observe(Observer::new(|_| {})).unwrap();
273        assert!(id4 == 2);
274    }
275
276    #[test]
277    fn observe_replace() {
278        let memory = &RefCell::new(Memory::default());
279
280        let int_ref = Reference::try_mut_from(42.into()).unwrap();
281        let observed_update =
282            record_dif_updates(&int_ref, 0, ObserveOptions::default());
283
284        // Update the value of the reference
285        int_ref
286            .try_set_value(1, 43, memory)
287            .expect("Failed to set value");
288
289        // Verify the observed update matches the expected change
290        let expected_update = DIFUpdate {
291            source_id: 1,
292            data: DIFUpdateData::replace(
293                DIFValueContainer::from_value_container(
294                    &ValueContainer::from(43),
295                    memory,
296                ),
297            ),
298        };
299
300        assert_eq!(*observed_update.borrow(), vec![expected_update]);
301    }
302
303    #[test]
304    fn observe_replace_same_transceiver() {
305        let memory = &RefCell::new(Memory::default());
306
307        let int_ref = Reference::try_mut_from(42.into()).unwrap();
308        let observed_update =
309            record_dif_updates(&int_ref, 0, ObserveOptions::default());
310
311        // Update the value of the reference
312        int_ref
313            .try_set_value(0, 43, memory)
314            .expect("Failed to set value");
315
316        // No update triggered, same transceiver id
317        assert_eq!(*observed_update.borrow(), vec![]);
318    }
319
320    #[test]
321    fn observe_replace_same_transceiver_relay_own_updates() {
322        let memory = &RefCell::new(Memory::default());
323
324        let int_ref = Reference::try_mut_from(42.into()).unwrap();
325        let observed_update = record_dif_updates(
326            &int_ref,
327            0,
328            ObserveOptions {
329                relay_own_updates: true,
330            },
331        );
332
333        // Update the value of the reference
334        int_ref
335            .try_set_value(0, 43, memory)
336            .expect("Failed to set value");
337
338        // update triggered, same transceiver id but relay_own_updates enabled
339        let expected_update = DIFUpdate {
340            source_id: 0,
341            data: DIFUpdateData::replace(
342                DIFValueContainer::from_value_container(
343                    &ValueContainer::from(43),
344                    memory,
345                ),
346            ),
347        };
348
349        assert_eq!(*observed_update.borrow(), vec![expected_update]);
350    }
351
352    #[test]
353    fn observe_update_property() {
354        let memory = &RefCell::new(Memory::default());
355
356        let reference = Reference::try_mut_from(
357            Map::from(vec![
358                ("a".to_string(), ValueContainer::from(1)),
359                ("b".to_string(), ValueContainer::from(2)),
360            ])
361            .into(),
362        )
363        .unwrap();
364        let observed_updates =
365            record_dif_updates(&reference, 0, ObserveOptions::default());
366        // Update a property
367        reference
368            .try_set_text_property(1, "a", "val".into(), memory)
369            .expect("Failed to set property");
370        // Verify the observed update matches the expected change
371        let expected_update = DIFUpdate {
372            source_id: 1,
373            data: DIFUpdateData::set(
374                "a",
375                DIFValue::new(
376                    DIFValueRepresentation::String("val".to_string()),
377                    None as Option<DIFTypeContainer>,
378                ),
379            ),
380        };
381        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
382    }
383}