coap_lite/
observe.rs

1use alloc::{
2    collections::BTreeMap,
3    string::{String, ToString},
4    vec::Vec,
5};
6use core::{fmt::Display, marker::PhantomData};
7
8use crate::{request::CoapRequest, MessageClass, MessageType, Packet};
9
10const DEFAULT_UNACKNOWLEDGED_LIMIT: u8 = 10;
11
12type ResourcePath = String;
13
14/// An observer client.
15pub struct Observer<Endpoint: Display> {
16    pub endpoint: Endpoint,
17    pub token: Vec<u8>,
18    unacknowledged_messages: u8,
19    // The message id of the last update to be acknowledged
20    message_id: Option<u16>,
21}
22
23/// An observed resource.
24pub struct Resource<Endpoint: Display> {
25    pub observers: Vec<Observer<Endpoint>>,
26    pub sequence: u32,
27}
28
29/// Keeps track of the state of the observed resources.
30pub struct Subject<Endpoint: Display + PartialEq> {
31    resources: BTreeMap<ResourcePath, Resource<Endpoint>>,
32    unacknowledged_limit: u8,
33    // The Endpoint generic is needed internally for CoapRequest, but not as an
34    // actual field for this struct
35    phantom: PhantomData<Endpoint>,
36}
37
38impl<Endpoint: Display + PartialEq + Clone> Subject<Endpoint> {
39    /// Registers an observer interested in a resource.
40    pub fn register(&mut self, request: &CoapRequest<Endpoint>) {
41        let observer_endpoint = request.source.as_ref().unwrap();
42        let resource_path = request.get_path();
43        let token = request.message.get_token();
44
45        let observer = Observer {
46            endpoint: observer_endpoint.clone(),
47            token: token.to_vec(),
48            unacknowledged_messages: 0,
49            message_id: None,
50        };
51
52        coap_info!(
53            "Registering observer {} for resource {}",
54            observer_endpoint,
55            resource_path
56        );
57
58        let resource =
59            self.resources.entry(resource_path).or_insert(Resource {
60                observers: Vec::new(),
61                sequence: 0,
62            });
63
64        if let Some(position) = resource
65            .observers
66            .iter()
67            .position(|x| x.endpoint == observer.endpoint)
68        {
69            resource.observers[position] = observer;
70        } else {
71            resource.observers.push(observer);
72        }
73    }
74
75    /// Removes an observer from the interested resource.
76    pub fn deregister(&mut self, request: &CoapRequest<Endpoint>) {
77        let observer_endpoint = request.source.as_ref().unwrap();
78        let resource_path = request.get_path();
79        let token = request.message.get_token();
80
81        if let Some(resource) = self.resources.get_mut(&resource_path) {
82            let position = resource.observers.iter().position(|x| {
83                x.endpoint == *observer_endpoint && x.token == *token
84            });
85
86            if let Some(position) = position {
87                coap_info!(
88                    "Deregistering observer {} for resource {}",
89                    observer_endpoint,
90                    resource_path
91                );
92
93                resource.observers.remove(position);
94            }
95        }
96    }
97
98    /// Updates the resource information after having notified the observers.
99    ///
100    /// It increments the resource sequence and counter of unacknowledged
101    /// updates (the latter only if `is_confirmable` is `true`).
102    pub fn resource_changed(
103        &mut self,
104        resource: &str,
105        message_id: u16,
106        is_confirmable: bool,
107    ) {
108        let unacknowledged_limit = self.unacknowledged_limit;
109        coap_debug!("Resource changed");
110
111        self.resources
112            .entry(resource.to_string())
113            .and_modify(|resource| {
114                resource.sequence += 1;
115
116                resource.observers.iter_mut().for_each(|observer| {
117                    observer.message_id = Some(message_id);
118                    if is_confirmable {
119                        observer.unacknowledged_messages += 1;
120                    }
121                });
122
123                resource.observers.retain(|observer| {
124                    observer.unacknowledged_messages <= unacknowledged_limit
125                });
126            });
127    }
128
129    /// Resets the counter of unacknowledged updates for a resource observer.
130    pub fn acknowledge(&mut self, request: &CoapRequest<Endpoint>) {
131        let observer_endpoint = request.source.as_ref().unwrap();
132        let message_id = request.message.header.message_id;
133
134        for (resource_path, resource) in self.resources.iter_mut() {
135            let observer = resource.observers.iter_mut().find(|x| {
136                if let Some(observe_msg_id) = x.message_id {
137                    // Unacknowledgement doesn't officially require the token
138                    // to be passed in the ACK so it's not checked
139                    return x.endpoint == *observer_endpoint
140                        && observe_msg_id == message_id;
141                }
142
143                false
144            });
145
146            if let Some(observer) = observer {
147                coap_debug!("Received ack for resource {}", resource_path);
148
149                observer.unacknowledged_messages = 0;
150                observer.message_id = None;
151            }
152        }
153    }
154
155    /// Gets the tracked resources.
156    pub fn get_resource(&self, resource: &str) -> Option<&Resource<Endpoint>> {
157        self.resources.get(resource)
158    }
159
160    /// Gets the observers of a resource.
161    pub fn get_resource_observers(
162        &self,
163        resource: &str,
164    ) -> Option<Vec<&Observer<Endpoint>>> {
165        self.resources
166            .get(resource)
167            .map(|resource| resource.observers.iter().collect())
168    }
169
170    /// Sets the limit of unacknowledged updates before removing an observer.
171    pub fn set_unacknowledged_limit(&mut self, limit: u8) {
172        self.unacknowledged_limit = limit;
173    }
174}
175
176/// Creates a notification response for notifying observers about an update.
177pub fn create_notification(
178    message_id: u16,
179    token: Vec<u8>,
180    sequence: u32,
181    payload: Vec<u8>,
182    is_confirmable: bool,
183) -> Packet {
184    let mut packet = Packet::new();
185
186    packet.header.set_version(1);
187    packet.header.set_type(if is_confirmable {
188        MessageType::Confirmable
189    } else {
190        MessageType::NonConfirmable
191    });
192    packet.header.code = MessageClass::Response(crate::ResponseType::Content);
193    packet.header.message_id = message_id;
194    packet.set_token(token);
195    packet.payload = payload;
196    packet.set_observe_value(sequence);
197
198    packet
199}
200
201impl<Endpoint: Display + PartialEq + Clone> Default for Subject<Endpoint> {
202    fn default() -> Self {
203        Subject {
204            resources: BTreeMap::new(),
205            unacknowledged_limit: DEFAULT_UNACKNOWLEDGED_LIMIT,
206            phantom: PhantomData,
207        }
208    }
209}
210
211#[cfg(test)]
212mod test {
213    use super::*;
214    use crate::{
215        header::{MessageType, RequestType as Method},
216        packet::ObserveOption,
217    };
218
219    type Endpoint = String;
220
221    #[test]
222    fn register() {
223        let resource_path = "temp";
224
225        let mut request = CoapRequest::new();
226        request.source = Some(String::from("0.0.0.0"));
227        request.set_method(Method::Get);
228        request.set_path(resource_path);
229        request.message.set_token(vec![0x7d, 0x34]);
230        request.set_observe_flag(ObserveOption::Register);
231
232        let mut subject: Subject<Endpoint> = Subject::default();
233        subject.register(&request);
234
235        let observers = subject.get_resource_observers(resource_path).unwrap();
236
237        assert_eq!(observers.len(), 1);
238    }
239
240    #[test]
241    fn register_replace() {
242        let resource_path = "temp";
243
244        let mut request1 = CoapRequest::new();
245        request1.source = Some(String::from("0.0.0.0"));
246        request1.set_method(Method::Get);
247        request1.set_path(resource_path);
248        request1.message.set_token(vec![0x00, 0x00]);
249        request1.set_observe_flag(ObserveOption::Register);
250
251        let mut request2 = CoapRequest::new();
252        request2.source = Some(String::from("0.0.0.0"));
253        request2.set_method(Method::Get);
254        request2.set_path(resource_path);
255        request2.message.set_token(vec![0xff, 0xff]);
256        request2.set_observe_flag(ObserveOption::Register);
257
258        let mut subject: Subject<Endpoint> = Subject::default();
259        subject.register(&request1);
260        subject.register(&request2);
261
262        let observers = subject.get_resource_observers(resource_path).unwrap();
263
264        assert_eq!(observers.len(), 1);
265
266        let observer = observers.first().unwrap();
267
268        assert_eq!(observer.token, vec![0xff, 0xff]);
269    }
270
271    #[test]
272    fn ack_flow_ok() {
273        let resource_path = "temp";
274
275        let mut request1 = CoapRequest::new();
276        request1.source = Some(String::from("0.0.0.0"));
277        request1.set_method(Method::Get);
278        request1.set_path(resource_path);
279        request1.message.set_token(vec![0x00, 0x00]);
280        request1.set_observe_flag(ObserveOption::Register);
281
282        let mut subject: Subject<Endpoint> = Subject::default();
283        subject.register(&request1);
284
285        let sequence1 = subject.get_resource(resource_path).unwrap().sequence;
286        subject.resource_changed(resource_path, 1, true);
287        let sequence2 = subject.get_resource(resource_path).unwrap().sequence;
288
289        assert!(sequence2 > sequence1);
290
291        {
292            let observers =
293                subject.get_resource_observers(resource_path).unwrap();
294            let observer = observers.first().unwrap();
295
296            assert_eq!(observer.unacknowledged_messages, 1);
297        }
298
299        let mut ack = CoapRequest::new();
300        ack.source = Some(String::from("0.0.0.0"));
301        ack.message.header.set_type(MessageType::Acknowledgement);
302        ack.set_path(resource_path);
303        ack.message.set_token(vec![0x00, 0x00]);
304        ack.message.header.message_id = 1;
305
306        subject.acknowledge(&ack);
307
308        {
309            let observers =
310                subject.get_resource_observers(resource_path).unwrap();
311            let observer = observers.first().unwrap();
312
313            assert_eq!(observer.unacknowledged_messages, 0);
314        }
315    }
316
317    #[test]
318    fn ack_flow_forget_observer() {
319        let resource_path = "temp";
320
321        let mut request1 = CoapRequest::new();
322        request1.source = Some(String::from("0.0.0.0"));
323        request1.set_method(Method::Get);
324        request1.set_path(resource_path);
325        request1.message.set_token(vec![0x00, 0x00]);
326        request1.set_observe_flag(ObserveOption::Register);
327
328        let mut subject: Subject<Endpoint> = Subject::default();
329        subject.set_unacknowledged_limit(5);
330        subject.register(&request1);
331
332        subject.resource_changed(resource_path, 1, true);
333        subject.resource_changed(resource_path, 2, true);
334        subject.resource_changed(resource_path, 3, true);
335        subject.resource_changed(resource_path, 4, true);
336        subject.resource_changed(resource_path, 5, true);
337        subject.resource_changed(resource_path, 6, true);
338
339        let observers = subject.get_resource_observers(resource_path).unwrap();
340
341        assert_eq!(observers.len(), 0);
342    }
343
344    #[test]
345    fn combine_ack_and_no_ack() {
346        let resource_path = "temp";
347
348        let mut request1 = CoapRequest::new();
349        request1.source = Some(String::from("0.0.0.0"));
350        request1.set_method(Method::Get);
351        request1.set_path(resource_path);
352        request1.message.set_token(vec![0x00, 0x00]);
353        request1.set_observe_flag(ObserveOption::Register);
354
355        let mut subject: Subject<Endpoint> = Subject::default();
356        subject.set_unacknowledged_limit(2);
357        subject.register(&request1);
358
359        subject.resource_changed(resource_path, 1, false);
360        subject.resource_changed(resource_path, 2, false);
361        subject.resource_changed(resource_path, 3, false);
362
363        let observers = subject.get_resource_observers(resource_path).unwrap();
364        assert_eq!(observers.len(), 1);
365
366        subject.resource_changed(resource_path, 4, true);
367        subject.resource_changed(resource_path, 5, true);
368        subject.resource_changed(resource_path, 6, true);
369
370        let observers = subject.get_resource_observers(resource_path).unwrap();
371        assert_eq!(observers.len(), 0);
372    }
373}