1mod subscriber;
2mod topic;
3
4extern crate alloc;
5use crate::ipc::subscriber::{MessageNode, Subscriber};
6use crate::ipc::topic::Topic;
7use crate::task::get_cur_task;
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use fe_osi::semaphore::Semaphore;
13
14pub(crate) struct TopicRegistry {
15 pub(crate) topic_lookup: BTreeMap<String, Topic>,
16}
17
18pub(crate) static mut TOPIC_REGISTERY_LOCK: Semaphore = Semaphore::new_mutex();
19pub(crate) static mut TOPIC_REGISTERY: TopicRegistry = TopicRegistry {
20 topic_lookup: BTreeMap::new(),
21};
22
23impl TopicRegistry {
24 pub(crate) fn publish_to_topic(&mut self, message_topic: &str, message: &[u8]) {
25 let owned_topic = String::from(message_topic);
26 self.topic_lookup
27 .entry(owned_topic)
28 .and_modify(|topic| topic.add_message(message));
29 }
30
31 pub(crate) fn subscribe_to_topic(&mut self, subscriber_topic: &str) {
32 let pid: usize = unsafe { get_cur_task().pid };
33 let subscriber = Subscriber::new();
34 let owned_topic = String::from(subscriber_topic);
35 self.topic_lookup
36 .entry(owned_topic)
37 .or_insert_with(Topic::new)
38 .add_subscriber(pid, subscriber);
39 }
40
41 pub(crate) fn unsubscribe_from_topic(&mut self, subscriber_topic: &str) -> Option<Subscriber> {
42 let pid: usize = unsafe { get_cur_task().pid };
43 self.topic_lookup
44 .get_mut(subscriber_topic)
45 .and_then(|topic| topic.remove_subscriber(pid))
46 }
47
48 pub(crate) fn get_ipc_message(&mut self, msg_topic: &str) -> Option<Vec<u8>> {
49 let cur_pid: usize = unsafe { get_cur_task().pid };
50
51 self.topic_lookup
52 .get_mut(msg_topic)
53 .and_then(|topic| topic.subscribers.get_mut(&cur_pid))
54 .and_then(|subscriber| {
55 let next_node: Arc<MessageNode> = match &subscriber.next_message {
56 Some(m) => Arc::clone(&m),
57 None => return None,
58 };
59 subscriber.next_message = match &*next_node.next.borrow() {
60 Some(m) => Some(Arc::clone(&m)),
61 None => None,
62 };
63 Some(next_node.data.clone())
64 })
65 }
66
67 pub(crate) fn get_subscriber_lock(&mut self, msg_topic: &str) -> Option<&Semaphore> {
68 let cur_pid: usize = unsafe { get_cur_task().pid };
69 self.topic_lookup
70 .get_mut(msg_topic)
71 .and_then(|topic| topic.subscribers.get_mut(&cur_pid))
72 .map(|subscriber| &subscriber.lock)
73 }
74}