Skip to main content

datex_core/references/
observers.rs

1use crate::{
2    dif::update::{DIFUpdate, DIFUpdateData},
3    references::{reference::Reference, value_reference::ValueReference},
4};
5
6use crate::prelude::*;
7use core::{cell::RefCell, fmt::Display, result::Result};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug)]
11pub enum ObserverError {
12    ObserverNotFound,
13    ImmutableReference,
14}
15
16impl Display for ObserverError {
17    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
18        match self {
19            ObserverError::ObserverNotFound => {
20                core::write!(f, "Observer not found")
21            }
22            ObserverError::ImmutableReference => {
23                core::write!(f, "Cannot observe an immutable reference")
24            }
25        }
26    }
27}
28
29pub type ObserverCallback = Rc<dyn Fn(&DIFUpdateData, TransceiverId)>;
30
31/// unique identifier for a transceiver (source of updates)
32/// 0-255 are reserved for DIF clients
33pub type TransceiverId = u32;
34
35#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
36pub struct ObserveOptions {
37    /// If true, the transceiver will be notified of changes that originated from itself
38    pub relay_own_updates: bool,
39}
40
41#[derive(Clone)]
42pub struct Observer {
43    pub transceiver_id: TransceiverId,
44    pub options: ObserveOptions,
45    pub callback: ObserverCallback,
46}
47
48impl Observer {
49    /// Creates a new observer with the given callback function,
50    /// using default options and a transceiver ID of 0.
51    pub fn new<F: Fn(&DIFUpdateData, TransceiverId) + 'static>(
52        callback: F,
53    ) -> Self {
54        Observer {
55            transceiver_id: 0,
56            options: ObserveOptions::default(),
57            callback: Rc::new(callback),
58        }
59    }
60}
61
62impl Reference {
63    /// Adds an observer to this reference that will be notified on value changes.
64    /// Returns an error if the reference is immutable or a type reference.
65    /// The returned u32 is an observer ID that can be used to remove the observer later.
66    pub fn observe(&self, observer: Observer) -> Result<u32, ObserverError> {
67        // Add the observer to the list of observers
68        Ok(self
69            .ensure_mutable_value_reference()?
70            .borrow_mut()
71            .observers
72            .add(observer))
73
74        // TODO #299: also set observers on child references if not yet active, keep track of active observers
75    }
76
77    /// Removes an observer by its ID.
78    /// Returns an error if the observer ID is not found or the reference is immutable.
79    pub fn unobserve(&self, observer_id: u32) -> Result<(), ObserverError> {
80        let removed = self
81            .ensure_mutable_value_reference()?
82            .borrow_mut()
83            .observers
84            .remove(observer_id);
85        if removed.is_some() {
86            Ok(())
87        } else {
88            Err(ObserverError::ObserverNotFound)
89        }
90    }
91
92    /// Updates the options for an existing observer by its ID.
93    /// Returns an error if the observer ID is not found or the reference is immutable.
94    pub fn update_observer_options(
95        &self,
96        observer_id: u32,
97        options: ObserveOptions,
98    ) -> Result<(), ObserverError> {
99        let vr = self.ensure_mutable_value_reference()?;
100        let mut vr_borrow = vr.borrow_mut();
101        if let Some(observer) = vr_borrow.observers.get_mut(&observer_id) {
102            observer.options = options;
103            Ok(())
104        } else {
105            Err(ObserverError::ObserverNotFound)
106        }
107    }
108
109    /// Returns a list of all observer IDs currently registered to this reference.
110    /// A type reference or immutable reference will always return an empty list.
111    pub fn observers_ids(&self) -> Vec<u32> {
112        match self {
113            Reference::TypeReference(_) => vec![],
114            Reference::ValueReference(vr) => {
115                vr.borrow().observers.keys().cloned().collect()
116            }
117        }
118    }
119
120    /// Removes all observers from this reference.
121    /// Returns an error if the reference is immutable.
122    pub fn unobserve_all(&self) -> Result<(), ObserverError> {
123        self.ensure_mutable_value_reference()?;
124        for id in self.observers_ids() {
125            let _ = self.unobserve(id);
126        }
127        Ok(())
128    }
129
130    /// Ensures that this reference is a mutable value reference and returns it.
131    /// Returns an ObserverError if the reference is immutable or a type reference.
132    fn ensure_mutable_value_reference(
133        &self,
134    ) -> Result<Rc<RefCell<ValueReference>>, ObserverError> {
135        self.mutable_reference()
136            .ok_or(ObserverError::ImmutableReference)
137    }
138
139    /// Notifies all observers of a change represented by the given DIFUpdate.
140    pub fn notify_observers(&self, dif: &DIFUpdate) {
141        let observer_callbacks: Vec<ObserverCallback> = match self {
142            Reference::TypeReference(_) => return, // no observers
143            Reference::ValueReference(vr) => {
144                // Clone observers while holding borrow
145                let vr_ref = vr.borrow();
146                vr_ref
147                    .observers
148                    .iter()
149                    .filter(|(_, f)| {
150                        // Filter out bounced back transceiver updates if relay_own_updates not enabled
151                        f.options.relay_own_updates
152                            || f.transceiver_id != dif.source_id
153                    })
154                    .map(|(_, f)| f.callback.clone())
155                    .collect()
156            }
157        };
158
159        // Call each observer synchronously
160        for callback in observer_callbacks {
161            callback(&dif.data, dif.source_id);
162        }
163    }
164
165    /// Check if there are any observers registered
166    pub fn has_observers(&self) -> bool {
167        match self {
168            Reference::TypeReference(_) => false,
169            Reference::ValueReference(vr) => !vr.borrow().observers.is_empty(),
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use crate::{
177        dif::{
178            representation::DIFValueRepresentation,
179            r#type::DIFTypeDefinition,
180            update::{DIFUpdate, DIFUpdateData},
181            value::{DIFValue, DIFValueContainer},
182        },
183        prelude::*,
184        references::{
185            observers::{
186                ObserveOptions, Observer, ObserverError, TransceiverId,
187            },
188            reference::{Reference, ReferenceMutability},
189        },
190        runtime::memory::Memory,
191        values::{core_values::map::Map, value_container::ValueContainer},
192    };
193    use core::{assert_matches, cell::RefCell};
194    /// Helper function to record DIF updates observed on a reference
195    /// Returns a Rc<RefCell<Vec<DIFUpdate>>> that contains all observed updates
196    /// The caller can borrow this to inspect the updates after performing operations on the reference
197    fn record_dif_updates(
198        reference: &Reference,
199        transceiver_id: TransceiverId,
200        observe_options: ObserveOptions,
201    ) -> Rc<RefCell<Vec<DIFUpdate<'static>>>> {
202        let update_collector = Rc::new(RefCell::new(Vec::new()));
203        let update_collector_clone = update_collector.clone();
204        reference
205            .observe(Observer {
206                transceiver_id,
207                options: observe_options,
208                callback: Rc::new(move |update_data, source_id| {
209                    update_collector_clone.borrow_mut().push(DIFUpdate {
210                        source_id,
211                        data: Cow::Owned(update_data.clone()),
212                    });
213                }),
214            })
215            .expect("Failed to attach observer");
216        update_collector
217    }
218
219    #[test]
220    fn immutable_reference_observe_fails() {
221        let r = Reference::try_new_from_value_container(
222            42.into(),
223            None,
224            None,
225            ReferenceMutability::Immutable,
226        )
227        .unwrap();
228        assert_matches!(
229            r.observe(Observer::new(|_, _| {})),
230            Err(ObserverError::ImmutableReference)
231        );
232
233        let r = Reference::try_new_from_value_container(
234            42.into(),
235            None,
236            None,
237            ReferenceMutability::Mutable,
238        )
239        .unwrap();
240        assert_matches!(r.observe(Observer::new(|_, _| {})), Ok(_));
241    }
242
243    #[test]
244    fn observe_and_unobserve() {
245        let r = Reference::try_mut_from(42.into()).unwrap();
246        assert!(!r.has_observers());
247        let observer_id = r.observe(Observer::new(|_, _| {})).unwrap();
248        assert!(observer_id == 0);
249        assert!(r.has_observers());
250        assert!(r.unobserve(observer_id).is_ok());
251        assert!(!r.has_observers());
252        assert_matches!(
253            r.unobserve(observer_id),
254            Err(ObserverError::ObserverNotFound)
255        );
256    }
257
258    #[test]
259    fn observer_ids_incremental() {
260        let r = Reference::try_mut_from(42.into()).unwrap();
261        let id1 = r.observe(Observer::new(|_, _| {})).unwrap();
262        let id2 = r.observe(Observer::new(|_, _| {})).unwrap();
263        assert!(id1 == 0);
264        assert!(id2 == 1);
265        assert!(r.unobserve(id1).is_ok());
266        let id3 = r.observe(Observer::new(|_, _| {})).unwrap();
267        assert!(id3 == 0);
268        let id4 = r.observe(Observer::new(|_, _| {})).unwrap();
269        assert!(id4 == 2);
270    }
271
272    #[test]
273    fn observe_replace() {
274        let memory = &RefCell::new(Memory::default());
275
276        let int_ref = Reference::try_mut_from(42.into()).unwrap();
277        let observed_updates =
278            record_dif_updates(&int_ref, 0, ObserveOptions::default());
279
280        // Update the value of the reference
281        int_ref
282            .try_replace(1, memory, 43)
283            .expect("Failed to set value");
284
285        // Verify the observed update matches the expected change
286        let expected_update = DIFUpdate {
287            source_id: 1,
288            data: Cow::Owned(DIFUpdateData::replace(
289                DIFValueContainer::from_value_container(
290                    &ValueContainer::from(43),
291                    memory,
292                ),
293            )),
294        };
295
296        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
297    }
298
299    #[test]
300    fn observe_replace_same_transceiver() {
301        let memory = &RefCell::new(Memory::default());
302
303        let int_ref = Reference::try_mut_from(42.into()).unwrap();
304        let observed_update =
305            record_dif_updates(&int_ref, 0, ObserveOptions::default());
306
307        // Update the value of the reference
308        int_ref
309            .try_replace(0, memory, 43)
310            .expect("Failed to set value");
311
312        // No update triggered, same transceiver id
313        assert_eq!(*observed_update.borrow(), vec![]);
314    }
315
316    #[test]
317    fn observe_replace_same_transceiver_relay_own_updates() {
318        let memory = &RefCell::new(Memory::default());
319
320        let int_ref = Reference::try_mut_from(42.into()).unwrap();
321        let observed_update = record_dif_updates(
322            &int_ref,
323            0,
324            ObserveOptions {
325                relay_own_updates: true,
326            },
327        );
328
329        // Update the value of the reference
330        int_ref
331            .try_replace(0, memory, 43)
332            .expect("Failed to set value");
333
334        // update triggered, same transceiver id but relay_own_updates enabled
335        let expected_update = DIFUpdate {
336            source_id: 0,
337            data: Cow::Owned(DIFUpdateData::replace(
338                DIFValueContainer::from_value_container(
339                    &ValueContainer::from(43),
340                    memory,
341                ),
342            )),
343        };
344
345        assert_eq!(*observed_update.borrow(), vec![expected_update]);
346    }
347
348    #[test]
349    fn observe_update_property() {
350        let memory = &RefCell::new(Memory::default());
351
352        let reference = Reference::try_mut_from(
353            Map::from(vec![
354                ("a".to_string(), ValueContainer::from(1)),
355                ("b".to_string(), ValueContainer::from(2)),
356            ])
357            .into(),
358        )
359        .unwrap();
360        let observed_updates =
361            record_dif_updates(&reference, 0, ObserveOptions::default());
362        // Update a property
363        reference
364            .try_set_property(1, memory, "a", "val".into())
365            .expect("Failed to set property");
366        // Verify the observed update matches the expected change
367        let expected_update = DIFUpdate {
368            source_id: 1,
369            data: Cow::Owned(DIFUpdateData::set(
370                "a",
371                DIFValue::new(
372                    DIFValueRepresentation::String("val".to_string()),
373                    None as Option<DIFTypeDefinition>,
374                ),
375            )),
376        };
377        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
378    }
379}