coap_server/app/observers.rs
1use crate::app::path_matcher::{key_from_path, PathMatcher};
2use crate::app::u24::u24;
3use std::sync::Arc;
4use log::trace;
5use tokio::sync::{watch, Mutex, RwLock};
6
7/// Optional convenience mechanism to aid in managing dynamic [`Observers`] instances.
8///
9/// Intended to be used as:
10/// ```no_run
11/// use coap_server::app::{ObservableResource, Observers, ObserversHolder};
12/// struct MyResource {
13/// holder: ObserversHolder
14/// }
15/// #[async_trait::async_trait]
16/// impl ObservableResource for MyResource {
17/// async fn on_active(&self, observers: Observers) -> Observers {
18/// let attached = self.holder.attach(observers).await;
19/// attached.stay_active().await;
20/// attached.detach().await
21/// }
22/// }
23/// ```
24#[derive(Debug, Clone)]
25pub struct ObserversHolder {
26 inner: Arc<RwLock<PathMatcher<Arc<Observers>>>>,
27}
28
29/// Handle that can be used to inform the server when changes are detected.
30#[derive(Debug)]
31pub struct Observers {
32 relative_path_key: Vec<String>,
33
34 notify_change_tx: Arc<watch::Sender<NotificationState>>,
35
36 /// This will become the Observe value (i.e. the sequence number) if one is not provided
37 /// by the handler directly.
38 change_num: Arc<Mutex<u24>>,
39}
40
41#[derive(Debug, Copy, Clone)]
42pub enum NotificationState {
43 InitialSequence(u24),
44 ResourceChanged(u24),
45}
46
47impl ObserversHolder {
48 pub fn new() -> Self {
49 Self {
50 inner: Arc::new(RwLock::new(PathMatcher::new_empty())),
51 }
52 }
53
54 /// Attach a new [`Observers`] instance which affects how [`notify_change`] behaves.
55 pub async fn attach(&self, observers: Observers) -> Attached<'_> {
56 let key = observers.relative_path_key.clone();
57 let observers_arc = Arc::new(observers);
58 self.inner.write().await.insert(key.clone(), observers_arc.clone());
59 Attached { key, value: observers_arc, holder: self }
60 }
61
62 /// Defers to [`Observers::notify_change`] when attached; does nothing otherwise.
63 pub async fn notify_change(&self) {
64 for observers in self.inner.read().await.values() {
65 observers.notify_change().await;
66 }
67 }
68
69 /// Special variation of [`notify_change`] that indicates only observe requests grouped
70 /// under the provided path should be notified of the change. This optimization can help a lot
71 /// when you are observing dynamic resources (i.e. /resources/{resource_name}/) with a very
72 /// large number of updates across different resources.
73 ///
74 /// The provided `relative_path` is used for fuzzy matching of any "relevant" observing path. For
75 /// example, if `relative_path` is `"resources/abc"` then it will match against observe requests for
76 /// `"resources/abc/some_property"`, `"resources/abc"`, or even `"resources"`. It would not
77 /// match observe requests for `"/resources/xyz"`.
78 ///
79 /// `relative_path` is relative to the resource path that the [`crate::app::ObservableResource`]
80 /// was installed at.
81 pub async fn notify_change_for_path(&self, relative_path: &str) {
82 trace!("entered notify_change_for_path: {relative_path}");
83 for result in self
84 .inner
85 .read()
86 .await
87 .match_all(&key_from_path(relative_path))
88 {
89 trace!("entered notify_change");
90 result.value.notify_change().await;
91 trace!("...exit");
92 }
93 trace!("...exit");
94 }
95}
96
97pub struct Attached<'a> {
98 key: Vec<String>,
99 value: Arc<Observers>,
100 holder: &'a ObserversHolder,
101}
102
103impl<'a> Attached<'a> {
104 /// Keep an attached [`Observers`] instance active. Panics if none is attached.
105 pub async fn stay_active(&self) {
106 self.value.stay_active().await;
107 }
108
109 /// Detach and return the owned [`Observers`] instance, meant to be sent back to
110 /// [`crate::app::ObservableResource::on_active`].
111 pub async fn detach(self) -> Observers {
112 self.holder.inner.write().await.remove(&self.key).unwrap();
113 Arc::try_unwrap(self.value).unwrap()
114 }
115}
116
117impl Default for ObserversHolder {
118 fn default() -> Self {
119 ObserversHolder::new()
120 }
121}
122
123impl Observers {
124 pub(crate) fn new(relative_path_key: Vec<String>, change_num: u24) -> Self {
125 let init = NotificationState::InitialSequence(change_num);
126 let (notify_change_tx, _) = watch::channel(init);
127 Self {
128 relative_path_key,
129 notify_change_tx: Arc::new(notify_change_tx),
130 change_num: Arc::new(Mutex::new(change_num)),
131 }
132 }
133
134 pub fn relative_path(&self) -> String {
135 self.relative_path_key.join("/")
136 }
137
138 pub async fn stay_active(&self) {
139 self.notify_change_tx.closed().await;
140 }
141
142 pub(crate) fn leak_notify_change_tx(&self) -> Arc<watch::Sender<NotificationState>> {
143 self.notify_change_tx.clone()
144 }
145
146 /// Inform the server that a change to the underlying resource has potentially occurred. The
147 /// server responds by re-executing synthetic `Get` or `Fetch` requests roughly matching
148 /// the original client request, then delivering the results to the peer. Note that
149 /// spurious changes will be delivered if this method is spammed so callers must take care
150 /// to ensure it is only invoked when a genuine change is expected.
151 ///
152 /// Note that a sequence number will be generated for you if one is omitted
153 /// from response in the re-executed request. If you wish to provide your own,
154 /// simply set the observe value in the response with `response.message.set_observe_value(...)`.
155 /// Be sure that if you do this, you are taking care that the sequence number does not
156 /// run backwards within 256 seconds as per:
157 /// [RFC 7641, section 4.4](https://datatracker.ietf.org/doc/html/rfc7641#section-4.4)
158 pub async fn notify_change(&self) {
159 let new_change_num = {
160 let mut change_num = self.change_num.lock().await;
161 *change_num = change_num.wrapping_add(u24::from(1u8));
162 *change_num
163 };
164 let _ = self
165 .notify_change_tx
166 .send(NotificationState::ResourceChanged(new_change_num));
167 }
168}