1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use crate::app::path_matcher::{key_from_path, PathMatcher};
use crate::app::u24::u24;
use std::sync::Arc;
use log::trace;
use tokio::sync::{watch, Mutex, RwLock};
/// Optional convenience mechanism to aid in managing dynamic [`Observers`] instances.
///
/// Intended to be used as:
/// ```no_run
/// use coap_server::app::{ObservableResource, Observers, ObserversHolder};
/// struct MyResource {
/// holder: ObserversHolder
/// }
/// #[async_trait::async_trait]
/// impl ObservableResource for MyResource {
/// async fn on_active(&self, observers: Observers) -> Observers {
/// let attached = self.holder.attach(observers).await;
/// attached.stay_active().await;
/// attached.detach().await
/// }
/// }
/// ```
#[derive(Debug, Clone)]
pub struct ObserversHolder {
inner: Arc<RwLock<PathMatcher<Arc<Observers>>>>,
}
/// Handle that can be used to inform the server when changes are detected.
#[derive(Debug)]
pub struct Observers {
relative_path_key: Vec<String>,
notify_change_tx: Arc<watch::Sender<NotificationState>>,
/// This will become the Observe value (i.e. the sequence number) if one is not provided
/// by the handler directly.
change_num: Arc<Mutex<u24>>,
}
#[derive(Debug, Copy, Clone)]
pub enum NotificationState {
InitialSequence(u24),
ResourceChanged(u24),
}
impl ObserversHolder {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(PathMatcher::new_empty())),
}
}
/// Attach a new [`Observers`] instance which affects how [`notify_change`] behaves.
pub async fn attach(&self, observers: Observers) -> Attached<'_> {
let key = observers.relative_path_key.clone();
let observers_arc = Arc::new(observers);
self.inner.write().await.insert(key.clone(), observers_arc.clone());
Attached { key, value: observers_arc, holder: self }
}
/// Defers to [`Observers::notify_change`] when attached; does nothing otherwise.
pub async fn notify_change(&self) {
for observers in self.inner.read().await.values() {
observers.notify_change().await;
}
}
/// Special variation of [`notify_change`] that indicates only observe requests grouped
/// under the provided path should be notified of the change. This optimization can help a lot
/// when you are observing dynamic resources (i.e. /resources/{resource_name}/) with a very
/// large number of updates across different resources.
///
/// The provided `relative_path` is used for fuzzy matching of any "relevant" observing path. For
/// example, if `relative_path` is `"resources/abc"` then it will match against observe requests for
/// `"resources/abc/some_property"`, `"resources/abc"`, or even `"resources"`. It would not
/// match observe requests for `"/resources/xyz"`.
///
/// `relative_path` is relative to the resource path that the [`crate::app::ObservableResource`]
/// was installed at.
pub async fn notify_change_for_path(&self, relative_path: &str) {
trace!("entered notify_change_for_path: {relative_path}");
for result in self
.inner
.read()
.await
.match_all(&key_from_path(relative_path))
{
trace!("entered notify_change");
result.value.notify_change().await;
trace!("...exit");
}
trace!("...exit");
}
}
pub struct Attached<'a> {
key: Vec<String>,
value: Arc<Observers>,
holder: &'a ObserversHolder,
}
impl<'a> Attached<'a> {
/// Keep an attached [`Observers`] instance active. Panics if none is attached.
pub async fn stay_active(&self) {
self.value.stay_active().await;
}
/// Detach and return the owned [`Observers`] instance, meant to be sent back to
/// [`crate::app::ObservableResource::on_active`].
pub async fn detach(self) -> Observers {
self.holder.inner.write().await.remove(&self.key).unwrap();
Arc::try_unwrap(self.value).unwrap()
}
}
impl Default for ObserversHolder {
fn default() -> Self {
ObserversHolder::new()
}
}
impl Observers {
pub(crate) fn new(relative_path_key: Vec<String>, change_num: u24) -> Self {
let init = NotificationState::InitialSequence(change_num);
let (notify_change_tx, _) = watch::channel(init);
Self {
relative_path_key,
notify_change_tx: Arc::new(notify_change_tx),
change_num: Arc::new(Mutex::new(change_num)),
}
}
pub fn relative_path(&self) -> String {
self.relative_path_key.join("/")
}
pub async fn stay_active(&self) {
self.notify_change_tx.closed().await;
}
pub(crate) fn leak_notify_change_tx(&self) -> Arc<watch::Sender<NotificationState>> {
self.notify_change_tx.clone()
}
/// Inform the server that a change to the underlying resource has potentially occurred. The
/// server responds by re-executing synthetic `Get` or `Fetch` requests roughly matching
/// the original client request, then delivering the results to the peer. Note that
/// spurious changes will be delivered if this method is spammed so callers must take care
/// to ensure it is only invoked when a genuine change is expected.
///
/// Note that a sequence number will be generated for you if one is omitted
/// from response in the re-executed request. If you wish to provide your own,
/// simply set the observe value in the response with `response.message.set_observe_value(...)`.
/// Be sure that if you do this, you are taking care that the sequence number does not
/// run backwards within 256 seconds as per:
/// [RFC 7641, section 4.4](https://datatracker.ietf.org/doc/html/rfc7641#section-4.4)
pub async fn notify_change(&self) {
let new_change_num = {
let mut change_num = self.change_num.lock().await;
*change_num = change_num.wrapping_add(u24::from(1u8));
*change_num
};
let _ = self
.notify_change_tx
.send(NotificationState::ResourceChanged(new_change_num));
}
}