datex_core/references/
observers.rs

1use crate::dif::update::{DIFUpdate, DIFUpdateData};
2use crate::references::{
3    reference::Reference, value_reference::ValueReference,
4};
5use crate::stdlib::vec;
6use crate::stdlib::vec::Vec;
7use crate::stdlib::{cell::RefCell, rc::Rc};
8use core::fmt::Display;
9use core::prelude::rust_2024::*;
10use core::result::Result;
11use serde::{Deserialize, Serialize};
12
13#[derive(Debug)]
14pub enum ObserverError {
15    ObserverNotFound,
16    ImmutableReference,
17}
18
19impl Display for ObserverError {
20    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
21        match self {
22            ObserverError::ObserverNotFound => {
23                core::write!(f, "Observer not found")
24            }
25            ObserverError::ImmutableReference => {
26                core::write!(f, "Cannot observe an immutable reference")
27            }
28        }
29    }
30}
31
32pub type ObserverCallback = Rc<dyn Fn(&DIFUpdateData, TransceiverId)>;
33
34/// unique identifier for a transceiver (source of updates)
35/// 0-255 are reserved for DIF clients
36pub type TransceiverId = u32;
37
38#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
39pub struct ObserveOptions {
40    /// If true, the transceiver will be notified of changes that originated from itself
41    pub relay_own_updates: bool,
42}
43
44#[derive(Clone)]
45pub struct Observer {
46    pub transceiver_id: TransceiverId,
47    pub options: ObserveOptions,
48    pub callback: ObserverCallback,
49}
50
51impl Observer {
52    /// Creates a new observer with the given callback function,
53    /// using default options and a transceiver ID of 0.
54    pub fn new<F: Fn(&DIFUpdateData, TransceiverId) + 'static>(
55        callback: F,
56    ) -> Self {
57        Observer {
58            transceiver_id: 0,
59            options: ObserveOptions::default(),
60            callback: Rc::new(callback),
61        }
62    }
63}
64
65impl Reference {
66    /// Adds an observer to this reference that will be notified on value changes.
67    /// Returns an error if the reference is immutable or a type reference.
68    /// The returned u32 is an observer ID that can be used to remove the observer later.
69    pub fn observe(&self, observer: Observer) -> Result<u32, ObserverError> {
70        // Add the observer to the list of observers
71        Ok(self
72            .ensure_mutable_value_reference()?
73            .borrow_mut()
74            .observers
75            .add(observer))
76
77        // TODO #299: also set observers on child references if not yet active, keep track of active observers
78    }
79
80    /// Removes an observer by its ID.
81    /// Returns an error if the observer ID is not found or the reference is immutable.
82    pub fn unobserve(&self, observer_id: u32) -> Result<(), ObserverError> {
83        let removed = self
84            .ensure_mutable_value_reference()?
85            .borrow_mut()
86            .observers
87            .remove(observer_id);
88        if removed.is_some() {
89            Ok(())
90        } else {
91            Err(ObserverError::ObserverNotFound)
92        }
93    }
94
95    /// Updates the options for an existing observer by its ID.
96    /// Returns an error if the observer ID is not found or the reference is immutable.
97    pub fn update_observer_options(
98        &self,
99        observer_id: u32,
100        options: ObserveOptions,
101    ) -> Result<(), ObserverError> {
102        let vr = self.ensure_mutable_value_reference()?;
103        let mut vr_borrow = vr.borrow_mut();
104        if let Some(observer) = vr_borrow.observers.get_mut(&observer_id) {
105            observer.options = options;
106            Ok(())
107        } else {
108            Err(ObserverError::ObserverNotFound)
109        }
110    }
111
112    /// Returns a list of all observer IDs currently registered to this reference.
113    /// A type reference or immutable reference will always return an empty list.
114    pub fn observers_ids(&self) -> Vec<u32> {
115        match self {
116            Reference::TypeReference(_) => vec![],
117            Reference::ValueReference(vr) => {
118                vr.borrow().observers.keys().cloned().collect()
119            }
120        }
121    }
122
123    /// Removes all observers from this reference.
124    /// Returns an error if the reference is immutable.
125    pub fn unobserve_all(&self) -> Result<(), ObserverError> {
126        self.ensure_mutable_value_reference()?;
127        for id in self.observers_ids() {
128            let _ = self.unobserve(id);
129        }
130        Ok(())
131    }
132
133    /// Ensures that this reference is a mutable value reference and returns it.
134    /// Returns an ObserverError if the reference is immutable or a type reference.
135    fn ensure_mutable_value_reference(
136        &self,
137    ) -> Result<Rc<RefCell<ValueReference>>, ObserverError> {
138        self.mutable_reference()
139            .ok_or(ObserverError::ImmutableReference)
140    }
141
142    /// Notifies all observers of a change represented by the given DIFUpdate.
143    pub fn notify_observers(&self, dif: &DIFUpdate) {
144        let observer_callbacks: Vec<ObserverCallback> = match self {
145            Reference::TypeReference(_) => return, // no observers
146            Reference::ValueReference(vr) => {
147                // Clone observers while holding borrow
148                let vr_ref = vr.borrow();
149                vr_ref
150                    .observers
151                    .iter()
152                    .filter(|(_, f)| {
153                        // Filter out bounced back transceiver updates if relay_own_updates not enabled
154                        f.options.relay_own_updates
155                            || f.transceiver_id != dif.source_id
156                    })
157                    .map(|(_, f)| f.callback.clone())
158                    .collect()
159            }
160        };
161
162        // Call each observer synchronously
163        for callback in observer_callbacks {
164            callback(&dif.data, dif.source_id);
165        }
166    }
167
168    /// Check if there are any observers registered
169    pub fn has_observers(&self) -> bool {
170        match self {
171            Reference::TypeReference(_) => false,
172            Reference::ValueReference(vr) => !vr.borrow().observers.is_empty(),
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use crate::dif::r#type::DIFTypeDefinition;
180    use crate::dif::update::{DIFUpdate, DIFUpdateData};
181    use crate::references::observers::Observer;
182    use crate::references::observers::{ObserveOptions, TransceiverId};
183    use crate::runtime::memory::Memory;
184    use crate::stdlib::borrow::Cow;
185    use crate::stdlib::{
186        assert_matches::assert_matches, cell::RefCell, rc::Rc,
187    };
188    use crate::values::core_values::map::Map;
189    use crate::{
190        dif::{
191            representation::DIFValueRepresentation,
192            value::{DIFValue, DIFValueContainer},
193        },
194        references::{
195            observers::ObserverError,
196            reference::{Reference, ReferenceMutability},
197        },
198        values::value_container::ValueContainer,
199    };
200
201    /// Helper function to record DIF updates observed on a reference
202    /// Returns a Rc<RefCell<Vec<DIFUpdate>>> that contains all observed updates
203    /// The caller can borrow this to inspect the updates after performing operations on the reference
204    fn record_dif_updates(
205        reference: &Reference,
206        transceiver_id: TransceiverId,
207        observe_options: ObserveOptions,
208    ) -> Rc<RefCell<Vec<DIFUpdate<'static>>>> {
209        let update_collector = Rc::new(RefCell::new(Vec::new()));
210        let update_collector_clone = update_collector.clone();
211        reference
212            .observe(Observer {
213                transceiver_id,
214                options: observe_options,
215                callback: Rc::new(move |update_data, source_id| {
216                    update_collector_clone.borrow_mut().push(DIFUpdate {
217                        source_id,
218                        data: Cow::Owned(update_data.clone()),
219                    });
220                }),
221            })
222            .expect("Failed to attach observer");
223        update_collector
224    }
225
226    #[test]
227    fn immutable_reference_observe_fails() {
228        let r = Reference::try_new_from_value_container(
229            42.into(),
230            None,
231            None,
232            ReferenceMutability::Immutable,
233        )
234        .unwrap();
235        assert_matches!(
236            r.observe(Observer::new(|_, _| {})),
237            Err(ObserverError::ImmutableReference)
238        );
239
240        let r = Reference::try_new_from_value_container(
241            42.into(),
242            None,
243            None,
244            ReferenceMutability::Mutable,
245        )
246        .unwrap();
247        assert_matches!(r.observe(Observer::new(|_, _| {})), Ok(_));
248    }
249
250    #[test]
251    fn observe_and_unobserve() {
252        let r = Reference::try_mut_from(42.into()).unwrap();
253        assert!(!r.has_observers());
254        let observer_id = r.observe(Observer::new(|_, _| {})).unwrap();
255        assert!(observer_id == 0);
256        assert!(r.has_observers());
257        assert!(r.unobserve(observer_id).is_ok());
258        assert!(!r.has_observers());
259        assert_matches!(
260            r.unobserve(observer_id),
261            Err(ObserverError::ObserverNotFound)
262        );
263    }
264
265    #[test]
266    fn observer_ids_incremental() {
267        let r = Reference::try_mut_from(42.into()).unwrap();
268        let id1 = r.observe(Observer::new(|_, _| {})).unwrap();
269        let id2 = r.observe(Observer::new(|_, _| {})).unwrap();
270        assert!(id1 == 0);
271        assert!(id2 == 1);
272        assert!(r.unobserve(id1).is_ok());
273        let id3 = r.observe(Observer::new(|_, _| {})).unwrap();
274        assert!(id3 == 0);
275        let id4 = r.observe(Observer::new(|_, _| {})).unwrap();
276        assert!(id4 == 2);
277    }
278
279    #[test]
280    fn observe_replace() {
281        let memory = &RefCell::new(Memory::default());
282
283        let int_ref = Reference::try_mut_from(42.into()).unwrap();
284        let observed_updates =
285            record_dif_updates(&int_ref, 0, ObserveOptions::default());
286
287        // Update the value of the reference
288        int_ref
289            .try_replace(1, memory, 43)
290            .expect("Failed to set value");
291
292        // Verify the observed update matches the expected change
293        let expected_update = DIFUpdate {
294            source_id: 1,
295            data: Cow::Owned(DIFUpdateData::replace(
296                DIFValueContainer::from_value_container(
297                    &ValueContainer::from(43),
298                    memory,
299                ),
300            )),
301        };
302
303        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
304    }
305
306    #[test]
307    fn observe_replace_same_transceiver() {
308        let memory = &RefCell::new(Memory::default());
309
310        let int_ref = Reference::try_mut_from(42.into()).unwrap();
311        let observed_update =
312            record_dif_updates(&int_ref, 0, ObserveOptions::default());
313
314        // Update the value of the reference
315        int_ref
316            .try_replace(0, memory, 43)
317            .expect("Failed to set value");
318
319        // No update triggered, same transceiver id
320        assert_eq!(*observed_update.borrow(), vec![]);
321    }
322
323    #[test]
324    fn observe_replace_same_transceiver_relay_own_updates() {
325        let memory = &RefCell::new(Memory::default());
326
327        let int_ref = Reference::try_mut_from(42.into()).unwrap();
328        let observed_update = record_dif_updates(
329            &int_ref,
330            0,
331            ObserveOptions {
332                relay_own_updates: true,
333            },
334        );
335
336        // Update the value of the reference
337        int_ref
338            .try_replace(0, memory, 43)
339            .expect("Failed to set value");
340
341        // update triggered, same transceiver id but relay_own_updates enabled
342        let expected_update = DIFUpdate {
343            source_id: 0,
344            data: Cow::Owned(DIFUpdateData::replace(
345                DIFValueContainer::from_value_container(
346                    &ValueContainer::from(43),
347                    memory,
348                ),
349            )),
350        };
351
352        assert_eq!(*observed_update.borrow(), vec![expected_update]);
353    }
354
355    #[test]
356    fn observe_update_property() {
357        let memory = &RefCell::new(Memory::default());
358
359        let reference = Reference::try_mut_from(
360            Map::from(vec![
361                ("a".to_string(), ValueContainer::from(1)),
362                ("b".to_string(), ValueContainer::from(2)),
363            ])
364            .into(),
365        )
366        .unwrap();
367        let observed_updates =
368            record_dif_updates(&reference, 0, ObserveOptions::default());
369        // Update a property
370        reference
371            .try_set_property(1, memory, "a", "val".into())
372            .expect("Failed to set property");
373        // Verify the observed update matches the expected change
374        let expected_update = DIFUpdate {
375            source_id: 1,
376            data: Cow::Owned(DIFUpdateData::set(
377                "a",
378                DIFValue::new(
379                    DIFValueRepresentation::String("val".to_string()),
380                    None as Option<DIFTypeDefinition>,
381                ),
382            )),
383        };
384        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
385    }
386}