use alloc::{
collections::BTreeMap,
string::{String, ToString},
vec::Vec,
};
use core::{fmt::Display, marker::PhantomData};
use crate::{request::CoapRequest, MessageClass, MessageType, Packet};
const DEFAULT_UNACKNOWLEDGED_LIMIT: u8 = 10;
type ResourcePath = String;
pub struct Observer<Endpoint: Display> {
pub endpoint: Endpoint,
pub token: Vec<u8>,
unacknowledged_messages: u8,
message_id: Option<u16>,
}
pub struct Resource<Endpoint: Display> {
pub observers: Vec<Observer<Endpoint>>,
pub sequence: u32,
}
pub struct Subject<Endpoint: Display + PartialEq> {
resources: BTreeMap<ResourcePath, Resource<Endpoint>>,
unacknowledged_limit: u8,
phantom: PhantomData<Endpoint>,
}
impl<Endpoint: Display + PartialEq + Clone> Subject<Endpoint> {
pub fn register(&mut self, request: &CoapRequest<Endpoint>) {
let observer_endpoint = request.source.as_ref().unwrap();
let resource_path = request.get_path();
let token = request.message.get_token();
let observer = Observer {
endpoint: observer_endpoint.clone(),
token: token.to_vec(),
unacknowledged_messages: 0,
message_id: None,
};
coap_info!(
"Registering observer {} for resource {}",
observer_endpoint,
resource_path
);
let resource =
self.resources.entry(resource_path).or_insert(Resource {
observers: Vec::new(),
sequence: 0,
});
if let Some(position) = resource
.observers
.iter()
.position(|x| x.endpoint == observer.endpoint)
{
resource.observers[position] = observer;
} else {
resource.observers.push(observer);
}
}
pub fn deregister(&mut self, request: &CoapRequest<Endpoint>) {
let observer_endpoint = request.source.as_ref().unwrap();
let resource_path = request.get_path();
let token = request.message.get_token();
if let Some(resource) = self.resources.get_mut(&resource_path) {
let position = resource.observers.iter().position(|x| {
x.endpoint == *observer_endpoint && x.token == *token
});
if let Some(position) = position {
coap_info!(
"Deregistering observer {} for resource {}",
observer_endpoint,
resource_path
);
resource.observers.remove(position);
}
}
}
pub fn resource_changed(&mut self, resource: &str, message_id: u16) {
let unacknowledged_limit = self.unacknowledged_limit;
coap_debug!("Resource changed");
self.resources
.entry(resource.to_string())
.and_modify(|resource| {
resource.sequence += 1;
resource.observers.iter_mut().for_each(|observer| {
observer.unacknowledged_messages += 1;
observer.message_id = Some(message_id);
});
resource.observers.retain(|observer| {
observer.unacknowledged_messages <= unacknowledged_limit
});
});
}
pub fn acknowledge(&mut self, request: &CoapRequest<Endpoint>) {
let observer_endpoint = request.source.as_ref().unwrap();
let message_id = request.message.header.message_id;
for (resource_path, resource) in self.resources.iter_mut() {
let observer = resource.observers.iter_mut().find(|x| {
if let Some(observe_msg_id) = x.message_id {
return x.endpoint == *observer_endpoint
&& observe_msg_id == message_id;
}
false
});
if let Some(observer) = observer {
coap_debug!("Received ack for resource {}", resource_path);
observer.unacknowledged_messages = 0;
observer.message_id = None;
}
}
}
pub fn get_resource(&self, resource: &str) -> Option<&Resource<Endpoint>> {
self.resources.get(resource)
}
pub fn get_resource_observers(
&self,
resource: &str,
) -> Option<Vec<&Observer<Endpoint>>> {
self.resources
.get(resource)
.map(|resource| resource.observers.iter().collect())
}
pub fn set_unacknowledged_limit(&mut self, limit: u8) {
self.unacknowledged_limit = limit;
}
}
pub fn create_notification(
message_id: u16,
token: Vec<u8>,
sequence: u32,
payload: Vec<u8>,
) -> Packet {
let mut packet = Packet::new();
packet.header.set_version(1);
packet.header.set_type(MessageType::Confirmable);
packet.header.code = MessageClass::Response(crate::ResponseType::Content);
packet.header.message_id = message_id;
packet.set_token(token);
packet.payload = payload;
packet.set_observe_value(sequence);
packet
}
impl<Endpoint: Display + PartialEq + Clone> Default for Subject<Endpoint> {
fn default() -> Self {
Subject {
resources: BTreeMap::new(),
unacknowledged_limit: DEFAULT_UNACKNOWLEDGED_LIMIT,
phantom: PhantomData,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
header::{MessageType, RequestType as Method},
packet::ObserveOption,
};
type Endpoint = String;
#[test]
fn register() {
let resource_path = "temp";
let mut request = CoapRequest::new();
request.source = Some(String::from("0.0.0.0"));
request.set_method(Method::Get);
request.set_path(resource_path.clone());
request.message.set_token(vec![0x7d, 0x34]);
request.set_observe_flag(ObserveOption::Register);
let mut subject: Subject<Endpoint> = Subject::default();
subject.register(&request);
let observers = subject
.get_resource_observers(resource_path.clone())
.unwrap();
assert_eq!(observers.len(), 1);
}
#[test]
fn register_replace() {
let resource_path = "temp";
let mut request1 = CoapRequest::new();
request1.source = Some(String::from("0.0.0.0"));
request1.set_method(Method::Get);
request1.set_path(resource_path.clone());
request1.message.set_token(vec![0x00, 0x00]);
request1.set_observe_flag(ObserveOption::Register);
let mut request2 = CoapRequest::new();
request2.source = Some(String::from("0.0.0.0"));
request2.set_method(Method::Get);
request2.set_path(resource_path.clone());
request2.message.set_token(vec![0xff, 0xff]);
request2.set_observe_flag(ObserveOption::Register);
let mut subject: Subject<Endpoint> = Subject::default();
subject.register(&request1);
subject.register(&request2);
let observers = subject
.get_resource_observers(resource_path.clone())
.unwrap();
assert_eq!(observers.len(), 1);
let observer = observers.get(0).unwrap();
assert_eq!(observer.token, vec![0xff, 0xff]);
}
#[test]
fn ack_flow_ok() {
let resource_path = "temp";
let mut request1 = CoapRequest::new();
request1.source = Some(String::from("0.0.0.0"));
request1.set_method(Method::Get);
request1.set_path(resource_path.clone());
request1.message.set_token(vec![0x00, 0x00]);
request1.set_observe_flag(ObserveOption::Register);
let mut subject: Subject<Endpoint> = Subject::default();
subject.register(&request1);
let sequence1 = subject.get_resource(resource_path).unwrap().sequence;
subject.resource_changed(resource_path, 1);
let sequence2 = subject.get_resource(resource_path).unwrap().sequence;
assert!(sequence2 > sequence1);
{
let observers = subject
.get_resource_observers(resource_path.clone())
.unwrap();
let observer = observers.get(0).unwrap();
assert_eq!(observer.unacknowledged_messages, 1);
}
let mut ack = CoapRequest::new();
ack.source = Some(String::from("0.0.0.0"));
ack.message.header.set_type(MessageType::Acknowledgement);
ack.set_path(resource_path.clone());
ack.message.set_token(vec![0x00, 0x00]);
ack.message.header.message_id = 1;
subject.acknowledge(&ack);
{
let observers = subject
.get_resource_observers(resource_path.clone())
.unwrap();
let observer = observers.get(0).unwrap();
assert_eq!(observer.unacknowledged_messages, 0);
}
}
#[test]
fn ack_flow_forget_observer() {
let resource_path = "temp";
let mut request1 = CoapRequest::new();
request1.source = Some(String::from("0.0.0.0"));
request1.set_method(Method::Get);
request1.set_path(resource_path.clone());
request1.message.set_token(vec![0x00, 0x00]);
request1.set_observe_flag(ObserveOption::Register);
let mut subject: Subject<Endpoint> = Subject::default();
subject.set_unacknowledged_limit(5);
subject.register(&request1);
subject.resource_changed(resource_path, 1);
subject.resource_changed(resource_path, 2);
subject.resource_changed(resource_path, 3);
subject.resource_changed(resource_path, 4);
subject.resource_changed(resource_path, 5);
subject.resource_changed(resource_path, 6);
let observers = subject
.get_resource_observers(resource_path.clone())
.unwrap();
assert_eq!(observers.len(), 0);
}
}