Skip to main content

fe_rtos/ipc/
mod.rs

1mod subscriber;
2mod topic;
3
4extern crate alloc;
5use crate::ipc::subscriber::{MessageNode, Subscriber};
6use crate::ipc::topic::Topic;
7use crate::task::get_cur_task;
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use fe_osi::semaphore::Semaphore;
13
14pub(crate) struct TopicRegistry {
15    pub(crate) topic_lookup: BTreeMap<String, Topic>,
16}
17
18pub(crate) static mut TOPIC_REGISTERY_LOCK: Semaphore = Semaphore::new_mutex();
19pub(crate) static mut TOPIC_REGISTERY: TopicRegistry = TopicRegistry {
20    topic_lookup: BTreeMap::new(),
21};
22
23impl TopicRegistry {
24    pub(crate) fn publish_to_topic(&mut self, message_topic: &str, message: &[u8]) {
25        let owned_topic = String::from(message_topic);
26        self.topic_lookup
27            .entry(owned_topic)
28            .and_modify(|topic| topic.add_message(message));
29    }
30
31    pub(crate) fn subscribe_to_topic(&mut self, subscriber_topic: &str) {
32        let pid: usize = unsafe { get_cur_task().pid };
33        let subscriber = Subscriber::new();
34        let owned_topic = String::from(subscriber_topic);
35        self.topic_lookup
36            .entry(owned_topic)
37            .or_insert_with(Topic::new)
38            .add_subscriber(pid, subscriber);
39    }
40
41    pub(crate) fn unsubscribe_from_topic(&mut self, subscriber_topic: &str) -> Option<Subscriber> {
42        let pid: usize = unsafe { get_cur_task().pid };
43        self.topic_lookup
44            .get_mut(subscriber_topic)
45            .and_then(|topic| topic.remove_subscriber(pid))
46    }
47
48    pub(crate) fn get_ipc_message(&mut self, msg_topic: &str) -> Option<Vec<u8>> {
49        let cur_pid: usize = unsafe { get_cur_task().pid };
50
51        self.topic_lookup
52            .get_mut(msg_topic)
53            .and_then(|topic| topic.subscribers.get_mut(&cur_pid))
54            .and_then(|subscriber| {
55                let next_node: Arc<MessageNode> = match &subscriber.next_message {
56                    Some(m) => Arc::clone(&m),
57                    None => return None,
58                };
59                subscriber.next_message = match &*next_node.next.borrow() {
60                    Some(m) => Some(Arc::clone(&m)),
61                    None => None,
62                };
63                Some(next_node.data.clone())
64            })
65    }
66
67    pub(crate) fn get_subscriber_lock(&mut self, msg_topic: &str) -> Option<&Semaphore> {
68        let cur_pid: usize = unsafe { get_cur_task().pid };
69        self.topic_lookup
70            .get_mut(msg_topic)
71            .and_then(|topic| topic.subscribers.get_mut(&cur_pid))
72            .map(|subscriber| &subscriber.lock)
73    }
74}