1use alloc::{
2 collections::BTreeMap,
3 string::{String, ToString},
4 vec::Vec,
5};
6use core::{fmt::Display, marker::PhantomData};
7
8use crate::{request::CoapRequest, MessageClass, MessageType, Packet};
9
10const DEFAULT_UNACKNOWLEDGED_LIMIT: u8 = 10;
11
12type ResourcePath = String;
13
14pub struct Observer<Endpoint: Display> {
16 pub endpoint: Endpoint,
17 pub token: Vec<u8>,
18 unacknowledged_messages: u8,
19 message_id: Option<u16>,
21}
22
23pub struct Resource<Endpoint: Display> {
25 pub observers: Vec<Observer<Endpoint>>,
26 pub sequence: u32,
27}
28
29pub struct Subject<Endpoint: Display + PartialEq> {
31 resources: BTreeMap<ResourcePath, Resource<Endpoint>>,
32 unacknowledged_limit: u8,
33 phantom: PhantomData<Endpoint>,
36}
37
38impl<Endpoint: Display + PartialEq + Clone> Subject<Endpoint> {
39 pub fn register(&mut self, request: &CoapRequest<Endpoint>) {
41 let observer_endpoint = request.source.as_ref().unwrap();
42 let resource_path = request.get_path();
43 let token = request.message.get_token();
44
45 let observer = Observer {
46 endpoint: observer_endpoint.clone(),
47 token: token.to_vec(),
48 unacknowledged_messages: 0,
49 message_id: None,
50 };
51
52 coap_info!(
53 "Registering observer {} for resource {}",
54 observer_endpoint,
55 resource_path
56 );
57
58 let resource =
59 self.resources.entry(resource_path).or_insert(Resource {
60 observers: Vec::new(),
61 sequence: 0,
62 });
63
64 if let Some(position) = resource
65 .observers
66 .iter()
67 .position(|x| x.endpoint == observer.endpoint)
68 {
69 resource.observers[position] = observer;
70 } else {
71 resource.observers.push(observer);
72 }
73 }
74
75 pub fn deregister(&mut self, request: &CoapRequest<Endpoint>) {
77 let observer_endpoint = request.source.as_ref().unwrap();
78 let resource_path = request.get_path();
79 let token = request.message.get_token();
80
81 if let Some(resource) = self.resources.get_mut(&resource_path) {
82 let position = resource.observers.iter().position(|x| {
83 x.endpoint == *observer_endpoint && x.token == *token
84 });
85
86 if let Some(position) = position {
87 coap_info!(
88 "Deregistering observer {} for resource {}",
89 observer_endpoint,
90 resource_path
91 );
92
93 resource.observers.remove(position);
94 }
95 }
96 }
97
98 pub fn resource_changed(
103 &mut self,
104 resource: &str,
105 message_id: u16,
106 is_confirmable: bool,
107 ) {
108 let unacknowledged_limit = self.unacknowledged_limit;
109 coap_debug!("Resource changed");
110
111 self.resources
112 .entry(resource.to_string())
113 .and_modify(|resource| {
114 resource.sequence += 1;
115
116 resource.observers.iter_mut().for_each(|observer| {
117 observer.message_id = Some(message_id);
118 if is_confirmable {
119 observer.unacknowledged_messages += 1;
120 }
121 });
122
123 resource.observers.retain(|observer| {
124 observer.unacknowledged_messages <= unacknowledged_limit
125 });
126 });
127 }
128
129 pub fn acknowledge(&mut self, request: &CoapRequest<Endpoint>) {
131 let observer_endpoint = request.source.as_ref().unwrap();
132 let message_id = request.message.header.message_id;
133
134 for (resource_path, resource) in self.resources.iter_mut() {
135 let observer = resource.observers.iter_mut().find(|x| {
136 if let Some(observe_msg_id) = x.message_id {
137 return x.endpoint == *observer_endpoint
140 && observe_msg_id == message_id;
141 }
142
143 false
144 });
145
146 if let Some(observer) = observer {
147 coap_debug!("Received ack for resource {}", resource_path);
148
149 observer.unacknowledged_messages = 0;
150 observer.message_id = None;
151 }
152 }
153 }
154
155 pub fn get_resource(&self, resource: &str) -> Option<&Resource<Endpoint>> {
157 self.resources.get(resource)
158 }
159
160 pub fn get_resource_observers(
162 &self,
163 resource: &str,
164 ) -> Option<Vec<&Observer<Endpoint>>> {
165 self.resources
166 .get(resource)
167 .map(|resource| resource.observers.iter().collect())
168 }
169
170 pub fn set_unacknowledged_limit(&mut self, limit: u8) {
172 self.unacknowledged_limit = limit;
173 }
174}
175
176pub fn create_notification(
178 message_id: u16,
179 token: Vec<u8>,
180 sequence: u32,
181 payload: Vec<u8>,
182 is_confirmable: bool,
183) -> Packet {
184 let mut packet = Packet::new();
185
186 packet.header.set_version(1);
187 packet.header.set_type(if is_confirmable {
188 MessageType::Confirmable
189 } else {
190 MessageType::NonConfirmable
191 });
192 packet.header.code = MessageClass::Response(crate::ResponseType::Content);
193 packet.header.message_id = message_id;
194 packet.set_token(token);
195 packet.payload = payload;
196 packet.set_observe_value(sequence);
197
198 packet
199}
200
201impl<Endpoint: Display + PartialEq + Clone> Default for Subject<Endpoint> {
202 fn default() -> Self {
203 Subject {
204 resources: BTreeMap::new(),
205 unacknowledged_limit: DEFAULT_UNACKNOWLEDGED_LIMIT,
206 phantom: PhantomData,
207 }
208 }
209}
210
211#[cfg(test)]
212mod test {
213 use super::*;
214 use crate::{
215 header::{MessageType, RequestType as Method},
216 packet::ObserveOption,
217 };
218
219 type Endpoint = String;
220
221 #[test]
222 fn register() {
223 let resource_path = "temp";
224
225 let mut request = CoapRequest::new();
226 request.source = Some(String::from("0.0.0.0"));
227 request.set_method(Method::Get);
228 request.set_path(resource_path);
229 request.message.set_token(vec![0x7d, 0x34]);
230 request.set_observe_flag(ObserveOption::Register);
231
232 let mut subject: Subject<Endpoint> = Subject::default();
233 subject.register(&request);
234
235 let observers = subject.get_resource_observers(resource_path).unwrap();
236
237 assert_eq!(observers.len(), 1);
238 }
239
240 #[test]
241 fn register_replace() {
242 let resource_path = "temp";
243
244 let mut request1 = CoapRequest::new();
245 request1.source = Some(String::from("0.0.0.0"));
246 request1.set_method(Method::Get);
247 request1.set_path(resource_path);
248 request1.message.set_token(vec![0x00, 0x00]);
249 request1.set_observe_flag(ObserveOption::Register);
250
251 let mut request2 = CoapRequest::new();
252 request2.source = Some(String::from("0.0.0.0"));
253 request2.set_method(Method::Get);
254 request2.set_path(resource_path);
255 request2.message.set_token(vec![0xff, 0xff]);
256 request2.set_observe_flag(ObserveOption::Register);
257
258 let mut subject: Subject<Endpoint> = Subject::default();
259 subject.register(&request1);
260 subject.register(&request2);
261
262 let observers = subject.get_resource_observers(resource_path).unwrap();
263
264 assert_eq!(observers.len(), 1);
265
266 let observer = observers.first().unwrap();
267
268 assert_eq!(observer.token, vec![0xff, 0xff]);
269 }
270
271 #[test]
272 fn ack_flow_ok() {
273 let resource_path = "temp";
274
275 let mut request1 = CoapRequest::new();
276 request1.source = Some(String::from("0.0.0.0"));
277 request1.set_method(Method::Get);
278 request1.set_path(resource_path);
279 request1.message.set_token(vec![0x00, 0x00]);
280 request1.set_observe_flag(ObserveOption::Register);
281
282 let mut subject: Subject<Endpoint> = Subject::default();
283 subject.register(&request1);
284
285 let sequence1 = subject.get_resource(resource_path).unwrap().sequence;
286 subject.resource_changed(resource_path, 1, true);
287 let sequence2 = subject.get_resource(resource_path).unwrap().sequence;
288
289 assert!(sequence2 > sequence1);
290
291 {
292 let observers =
293 subject.get_resource_observers(resource_path).unwrap();
294 let observer = observers.first().unwrap();
295
296 assert_eq!(observer.unacknowledged_messages, 1);
297 }
298
299 let mut ack = CoapRequest::new();
300 ack.source = Some(String::from("0.0.0.0"));
301 ack.message.header.set_type(MessageType::Acknowledgement);
302 ack.set_path(resource_path);
303 ack.message.set_token(vec![0x00, 0x00]);
304 ack.message.header.message_id = 1;
305
306 subject.acknowledge(&ack);
307
308 {
309 let observers =
310 subject.get_resource_observers(resource_path).unwrap();
311 let observer = observers.first().unwrap();
312
313 assert_eq!(observer.unacknowledged_messages, 0);
314 }
315 }
316
317 #[test]
318 fn ack_flow_forget_observer() {
319 let resource_path = "temp";
320
321 let mut request1 = CoapRequest::new();
322 request1.source = Some(String::from("0.0.0.0"));
323 request1.set_method(Method::Get);
324 request1.set_path(resource_path);
325 request1.message.set_token(vec![0x00, 0x00]);
326 request1.set_observe_flag(ObserveOption::Register);
327
328 let mut subject: Subject<Endpoint> = Subject::default();
329 subject.set_unacknowledged_limit(5);
330 subject.register(&request1);
331
332 subject.resource_changed(resource_path, 1, true);
333 subject.resource_changed(resource_path, 2, true);
334 subject.resource_changed(resource_path, 3, true);
335 subject.resource_changed(resource_path, 4, true);
336 subject.resource_changed(resource_path, 5, true);
337 subject.resource_changed(resource_path, 6, true);
338
339 let observers = subject.get_resource_observers(resource_path).unwrap();
340
341 assert_eq!(observers.len(), 0);
342 }
343
344 #[test]
345 fn combine_ack_and_no_ack() {
346 let resource_path = "temp";
347
348 let mut request1 = CoapRequest::new();
349 request1.source = Some(String::from("0.0.0.0"));
350 request1.set_method(Method::Get);
351 request1.set_path(resource_path);
352 request1.message.set_token(vec![0x00, 0x00]);
353 request1.set_observe_flag(ObserveOption::Register);
354
355 let mut subject: Subject<Endpoint> = Subject::default();
356 subject.set_unacknowledged_limit(2);
357 subject.register(&request1);
358
359 subject.resource_changed(resource_path, 1, false);
360 subject.resource_changed(resource_path, 2, false);
361 subject.resource_changed(resource_path, 3, false);
362
363 let observers = subject.get_resource_observers(resource_path).unwrap();
364 assert_eq!(observers.len(), 1);
365
366 subject.resource_changed(resource_path, 4, true);
367 subject.resource_changed(resource_path, 5, true);
368 subject.resource_changed(resource_path, 6, true);
369
370 let observers = subject.get_resource_observers(resource_path).unwrap();
371 assert_eq!(observers.len(), 0);
372 }
373}