coap_server/app/
observers.rs

1use crate::app::path_matcher::{key_from_path, PathMatcher};
2use crate::app::u24::u24;
3use std::sync::Arc;
4use log::trace;
5use tokio::sync::{watch, Mutex, RwLock};
6
7/// Optional convenience mechanism to aid in managing dynamic [`Observers`] instances.
8///
9/// Intended to be used as:
10/// ```no_run
11/// use coap_server::app::{ObservableResource, Observers, ObserversHolder};
12/// struct MyResource {
13///     holder: ObserversHolder
14/// }
15/// #[async_trait::async_trait]
16/// impl ObservableResource for MyResource {
17///     async fn on_active(&self, observers: Observers) -> Observers {
18///         let attached = self.holder.attach(observers).await;
19///         attached.stay_active().await;
20///         attached.detach().await
21///     }
22/// }
23/// ```
24#[derive(Debug, Clone)]
25pub struct ObserversHolder {
26    inner: Arc<RwLock<PathMatcher<Arc<Observers>>>>,
27}
28
29/// Handle that can be used to inform the server when changes are detected.
30#[derive(Debug)]
31pub struct Observers {
32    relative_path_key: Vec<String>,
33
34    notify_change_tx: Arc<watch::Sender<NotificationState>>,
35
36    /// This will become the Observe value (i.e. the sequence number) if one is not provided
37    /// by the handler directly.
38    change_num: Arc<Mutex<u24>>,
39}
40
41#[derive(Debug, Copy, Clone)]
42pub enum NotificationState {
43    InitialSequence(u24),
44    ResourceChanged(u24),
45}
46
47impl ObserversHolder {
48    pub fn new() -> Self {
49        Self {
50            inner: Arc::new(RwLock::new(PathMatcher::new_empty())),
51        }
52    }
53
54    /// Attach a new [`Observers`] instance which affects how [`notify_change`] behaves.
55    pub async fn attach(&self, observers: Observers) -> Attached<'_> {
56        let key = observers.relative_path_key.clone();
57        let observers_arc = Arc::new(observers);
58        self.inner.write().await.insert(key.clone(), observers_arc.clone());
59        Attached { key, value: observers_arc, holder: self }
60    }
61
62    /// Defers to [`Observers::notify_change`] when attached; does nothing otherwise.
63    pub async fn notify_change(&self) {
64        for observers in self.inner.read().await.values() {
65            observers.notify_change().await;
66        }
67    }
68
69    /// Special variation of [`notify_change`] that indicates only observe requests grouped
70    /// under the provided path should be notified of the change.  This optimization can help a lot
71    /// when you are observing dynamic resources (i.e. /resources/{resource_name}/) with a very
72    /// large number of updates across different resources.
73    ///
74    /// The provided `relative_path` is used for fuzzy matching of any "relevant" observing path.  For
75    /// example, if `relative_path` is `"resources/abc"` then it will match against observe requests for
76    /// `"resources/abc/some_property"`, `"resources/abc"`, or even `"resources"`.  It would not
77    /// match observe requests for `"/resources/xyz"`.
78    ///
79    /// `relative_path` is relative to the resource path that the [`crate::app::ObservableResource`]
80    /// was installed at.
81    pub async fn notify_change_for_path(&self, relative_path: &str) {
82        trace!("entered notify_change_for_path: {relative_path}");
83        for result in self
84            .inner
85            .read()
86            .await
87            .match_all(&key_from_path(relative_path))
88        {
89            trace!("entered notify_change");
90            result.value.notify_change().await;
91            trace!("...exit");
92        }
93        trace!("...exit");
94    }
95}
96
97pub struct Attached<'a> {
98    key: Vec<String>,
99    value: Arc<Observers>,
100    holder: &'a ObserversHolder,
101}
102
103impl<'a> Attached<'a> {
104    /// Keep an attached [`Observers`] instance active.  Panics if none is attached.
105    pub async fn stay_active(&self) {
106        self.value.stay_active().await;
107    }
108
109    /// Detach and return the owned [`Observers`] instance, meant to be sent back to
110    /// [`crate::app::ObservableResource::on_active`].
111    pub async fn detach(self) -> Observers {
112        self.holder.inner.write().await.remove(&self.key).unwrap();
113        Arc::try_unwrap(self.value).unwrap()
114    }
115}
116
117impl Default for ObserversHolder {
118    fn default() -> Self {
119        ObserversHolder::new()
120    }
121}
122
123impl Observers {
124    pub(crate) fn new(relative_path_key: Vec<String>, change_num: u24) -> Self {
125        let init = NotificationState::InitialSequence(change_num);
126        let (notify_change_tx, _) = watch::channel(init);
127        Self {
128            relative_path_key,
129            notify_change_tx: Arc::new(notify_change_tx),
130            change_num: Arc::new(Mutex::new(change_num)),
131        }
132    }
133
134    pub fn relative_path(&self) -> String {
135        self.relative_path_key.join("/")
136    }
137
138    pub async fn stay_active(&self) {
139        self.notify_change_tx.closed().await;
140    }
141
142    pub(crate) fn leak_notify_change_tx(&self) -> Arc<watch::Sender<NotificationState>> {
143        self.notify_change_tx.clone()
144    }
145
146    /// Inform the server that a change to the underlying resource has potentially occurred.  The
147    /// server responds by re-executing synthetic `Get` or `Fetch` requests roughly matching
148    /// the original client request, then delivering the results to the peer.  Note that
149    /// spurious changes will be delivered if this method is spammed so callers must take care
150    /// to ensure it is only invoked when a genuine change is expected.
151    ///
152    /// Note that a sequence number will be generated for you if one is omitted
153    /// from response in the re-executed request.  If you wish to provide your own,
154    /// simply set the observe value in the response with `response.message.set_observe_value(...)`.
155    /// Be sure that if you do this, you are taking care that the sequence number does not
156    /// run backwards within 256 seconds as per:
157    /// [RFC 7641, section 4.4](https://datatracker.ietf.org/doc/html/rfc7641#section-4.4)
158    pub async fn notify_change(&self) {
159        let new_change_num = {
160            let mut change_num = self.change_num.lock().await;
161            *change_num = change_num.wrapping_add(u24::from(1u8));
162            *change_num
163        };
164        let _ = self
165            .notify_change_tx
166            .send(NotificationState::ResourceChanged(new_change_num));
167    }
168}