Skip to main content

datex_core/shared_values/
observers.rs

1use crate::{
2    dif::update::{DIFUpdate, DIFUpdateData},
3    shared_values::{
4        shared_container::SharedContainer,
5        shared_value_container::SharedValueContainer,
6    },
7};
8
9use crate::prelude::*;
10use core::{cell::RefCell, fmt::Display, result::Result};
11use serde::{Deserialize, Serialize};
12
13#[derive(Debug)]
14pub enum ObserverError {
15    ObserverNotFound,
16    ImmutableReference,
17}
18
19impl Display for ObserverError {
20    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
21        match self {
22            ObserverError::ObserverNotFound => {
23                core::write!(f, "Observer not found")
24            }
25            ObserverError::ImmutableReference => {
26                core::write!(f, "Cannot observe an immutable reference")
27            }
28        }
29    }
30}
31
32pub type ObserverCallback = Rc<dyn Fn(&DIFUpdateData, TransceiverId)>;
33
34/// unique identifier for a transceiver (source of updates)
35/// 0-255 are reserved for DIF clients
36pub type TransceiverId = u32;
37
38#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
39pub struct ObserveOptions {
40    /// If true, the transceiver will be notified of changes that originated from itself
41    pub relay_own_updates: bool,
42}
43
44#[derive(Clone)]
45pub struct Observer {
46    pub transceiver_id: TransceiverId,
47    pub options: ObserveOptions,
48    pub callback: ObserverCallback,
49}
50
51impl Observer {
52    /// Creates a new observer with the given callback function,
53    /// using default options and a transceiver ID of 0.
54    pub fn new<F: Fn(&DIFUpdateData, TransceiverId) + 'static>(
55        callback: F,
56    ) -> Self {
57        Observer {
58            transceiver_id: 0,
59            options: ObserveOptions::default(),
60            callback: Rc::new(callback),
61        }
62    }
63}
64
65impl SharedContainer {
66    /// Adds an observer to this reference that will be notified on value changes.
67    /// Returns an error if the reference is immutable or a type reference.
68    /// The returned u32 is an observer ID that can be used to remove the observer later.
69    pub fn observe(&self, observer: Observer) -> Result<u32, ObserverError> {
70        // Add the observer to the list of observers
71        Ok(self
72            .ensure_mutable_value_reference()?
73            .borrow_mut()
74            .observers
75            .add(observer))
76
77        // TODO #299: also set observers on child references if not yet active, keep track of active observers
78    }
79
80    /// Removes an observer by its ID.
81    /// Returns an error if the observer ID is not found or the reference is immutable.
82    pub fn unobserve(&self, observer_id: u32) -> Result<(), ObserverError> {
83        let removed = self
84            .ensure_mutable_value_reference()?
85            .borrow_mut()
86            .observers
87            .remove(observer_id);
88        if removed.is_some() {
89            Ok(())
90        } else {
91            Err(ObserverError::ObserverNotFound)
92        }
93    }
94
95    /// Updates the options for an existing observer by its ID.
96    /// Returns an error if the observer ID is not found or the reference is immutable.
97    pub fn update_observer_options(
98        &self,
99        observer_id: u32,
100        options: ObserveOptions,
101    ) -> Result<(), ObserverError> {
102        let vr = self.ensure_mutable_value_reference()?;
103        let mut vr_borrow = vr.borrow_mut();
104        if let Some(observer) = vr_borrow.observers.get_mut(&observer_id) {
105            observer.options = options;
106            Ok(())
107        } else {
108            Err(ObserverError::ObserverNotFound)
109        }
110    }
111
112    /// Returns a list of all observer IDs currently registered to this reference.
113    /// A type reference or immutable reference will always return an empty list.
114    pub fn observers_ids(&self) -> Vec<u32> {
115        match self {
116            SharedContainer::Type(_) => vec![],
117            SharedContainer::Value(vr) => {
118                vr.borrow().observers.keys().cloned().collect()
119            }
120        }
121    }
122
123    /// Removes all observers from this reference.
124    /// Returns an error if the reference is immutable.
125    pub fn unobserve_all(&self) -> Result<(), ObserverError> {
126        self.ensure_mutable_value_reference()?;
127        for id in self.observers_ids() {
128            let _ = self.unobserve(id);
129        }
130        Ok(())
131    }
132
133    /// Ensures that this reference is a mutable value reference and returns it.
134    /// Returns an ObserverError if the reference is immutable or a type reference.
135    fn ensure_mutable_value_reference(
136        &self,
137    ) -> Result<Rc<RefCell<SharedValueContainer>>, ObserverError> {
138        self.mutable_reference()
139            .ok_or(ObserverError::ImmutableReference)
140    }
141
142    /// Notifies all observers of a change represented by the given DIFUpdate.
143    pub fn notify_observers(&self, dif: &DIFUpdate) {
144        let observer_callbacks: Vec<ObserverCallback> = match self {
145            SharedContainer::Type(_) => return, // no observers
146            SharedContainer::Value(vr) => {
147                // Clone observers while holding borrow
148                let vr_ref = vr.borrow();
149                vr_ref
150                    .observers
151                    .iter()
152                    .filter(|(_, f)| {
153                        // Filter out bounced back transceiver updates if relay_own_updates not enabled
154                        f.options.relay_own_updates
155                            || f.transceiver_id != dif.source_id
156                    })
157                    .map(|(_, f)| f.callback.clone())
158                    .collect()
159            }
160        };
161
162        // Call each observer synchronously
163        for callback in observer_callbacks {
164            callback(&dif.data, dif.source_id);
165        }
166    }
167
168    /// Check if there are any observers registered
169    pub fn has_observers(&self) -> bool {
170        match self {
171            SharedContainer::Type(_) => false,
172            SharedContainer::Value(vr) => !vr.borrow().observers.is_empty(),
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use crate::{
180        dif::{
181            representation::DIFValueRepresentation,
182            r#type::DIFTypeDefinition,
183            update::{DIFUpdate, DIFUpdateData},
184            value::{DIFValue, DIFValueContainer},
185        },
186        prelude::*,
187        runtime::memory::Memory,
188        shared_values::{
189            observers::{
190                ObserveOptions, Observer, ObserverError, TransceiverId,
191            },
192            pointer::Pointer,
193            shared_container::{SharedContainer, SharedContainerMutability},
194        },
195        values::{core_values::map::Map, value_container::ValueContainer},
196    };
197    use core::{assert_matches, cell::RefCell};
198
199    /// Helper function to record DIF updates observed on a reference
200    /// Returns a Rc<RefCell<Vec<DIFUpdate>>> that contains all observed updates
201    /// The caller can borrow this to inspect the updates after performing operations on the reference
202    fn record_dif_updates(
203        reference: &SharedContainer,
204        transceiver_id: TransceiverId,
205        observe_options: ObserveOptions,
206    ) -> Rc<RefCell<Vec<DIFUpdate<'static>>>> {
207        let update_collector = Rc::new(RefCell::new(Vec::new()));
208        let update_collector_clone = update_collector.clone();
209        reference
210            .observe(Observer {
211                transceiver_id,
212                options: observe_options,
213                callback: Rc::new(move |update_data, source_id| {
214                    update_collector_clone.borrow_mut().push(DIFUpdate {
215                        source_id,
216                        data: Cow::Owned(update_data.clone()),
217                    });
218                }),
219            })
220            .expect("Failed to attach observer");
221        update_collector
222    }
223
224    #[test]
225    fn immutable_reference_observe_fails() {
226        let r = SharedContainer::try_boxed(
227            42.into(),
228            None,
229            Pointer::NULL,
230            SharedContainerMutability::Immutable,
231        )
232        .unwrap();
233        assert_matches!(
234            r.observe(Observer::new(|_, _| {})),
235            Err(ObserverError::ImmutableReference)
236        );
237
238        let r = SharedContainer::try_boxed(
239            42.into(),
240            None,
241            Pointer::NULL,
242            SharedContainerMutability::Mutable,
243        )
244        .unwrap();
245        assert_matches!(r.observe(Observer::new(|_, _| {})), Ok(_));
246    }
247
248    #[test]
249    fn observe_and_unobserve() {
250        let r = SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
251        assert!(!r.has_observers());
252        let observer_id = r.observe(Observer::new(|_, _| {})).unwrap();
253        assert!(observer_id == 0);
254        assert!(r.has_observers());
255        assert!(r.unobserve(observer_id).is_ok());
256        assert!(!r.has_observers());
257        assert_matches!(
258            r.unobserve(observer_id),
259            Err(ObserverError::ObserverNotFound)
260        );
261    }
262
263    #[test]
264    fn observer_ids_incremental() {
265        let r = SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
266        let id1 = r.observe(Observer::new(|_, _| {})).unwrap();
267        let id2 = r.observe(Observer::new(|_, _| {})).unwrap();
268        assert!(id1 == 0);
269        assert!(id2 == 1);
270        assert!(r.unobserve(id1).is_ok());
271        let id3 = r.observe(Observer::new(|_, _| {})).unwrap();
272        assert!(id3 == 0);
273        let id4 = r.observe(Observer::new(|_, _| {})).unwrap();
274        assert!(id4 == 2);
275    }
276
277    #[test]
278    fn observe_replace() {
279        let int_ref =
280            SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
281        let observed_updates =
282            record_dif_updates(&int_ref, 0, ObserveOptions::default());
283
284        // Update the value of the reference
285        int_ref
286            .try_replace(1, None, 43)
287            .expect("Failed to set value");
288
289        // Verify the observed update matches the expected change
290        let expected_update = DIFUpdate {
291            source_id: 1,
292            data: Cow::Owned(DIFUpdateData::replace(
293                DIFValueContainer::from_value_container(&ValueContainer::from(
294                    43,
295                )),
296            )),
297        };
298
299        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
300    }
301
302    #[test]
303    fn observe_replace_same_transceiver() {
304        let int_ref =
305            SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
306        let observed_update =
307            record_dif_updates(&int_ref, 0, ObserveOptions::default());
308
309        // Update the value of the reference
310        int_ref
311            .try_replace(0, None, 43)
312            .expect("Failed to set value");
313
314        // No update triggered, same transceiver id
315        assert_eq!(*observed_update.borrow(), vec![]);
316    }
317
318    #[test]
319    fn observe_replace_same_transceiver_relay_own_updates() {
320        let int_ref =
321            SharedContainer::boxed_mut(42.into(), Pointer::NULL).unwrap();
322        let observed_update = record_dif_updates(
323            &int_ref,
324            0,
325            ObserveOptions {
326                relay_own_updates: true,
327            },
328        );
329
330        // Update the value of the reference
331        int_ref
332            .try_replace(0, None, 43)
333            .expect("Failed to set value");
334
335        // update triggered, same transceiver id but relay_own_updates enabled
336        let expected_update = DIFUpdate {
337            source_id: 0,
338            data: Cow::Owned(DIFUpdateData::replace(
339                DIFValueContainer::from_value_container(&ValueContainer::from(
340                    43,
341                )),
342            )),
343        };
344
345        assert_eq!(*observed_update.borrow(), vec![expected_update]);
346    }
347
348    #[test]
349    fn observe_update_property() {
350        let reference = SharedContainer::boxed_mut(
351            Map::from(vec![
352                ("a".to_string(), ValueContainer::from(1)),
353                ("b".to_string(), ValueContainer::from(2)),
354            ])
355            .into(),
356            Pointer::NULL,
357        )
358        .unwrap();
359        let observed_updates =
360            record_dif_updates(&reference, 0, ObserveOptions::default());
361        // Update a property
362        reference
363            .try_set_property(1, None, "a", "val".into())
364            .expect("Failed to set property");
365        // Verify the observed update matches the expected change
366        let expected_update = DIFUpdate {
367            source_id: 1,
368            data: Cow::Owned(DIFUpdateData::set(
369                "a",
370                DIFValue::new(
371                    DIFValueRepresentation::String("val".to_string()),
372                    None as Option<DIFTypeDefinition>,
373                ),
374            )),
375        };
376        assert_eq!(*observed_updates.borrow(), vec![expected_update]);
377    }
378}