alfred_core/
connection.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use crate::config::Config;
5use crate::message::{Message, MessageType};
6use crate::error::Error;
7use log::debug;
8use tokio::sync::Mutex;
9use crate::zmq_connection::{AlfredPublisher, AlfredSubscriber};
10
11pub const MODULE_INFO_TOPIC_REQUEST: &str = "module.info.request";
12pub const MODULE_INFO_TOPIC_RESPONSE: &str = "module.info.response";
13pub const TOPIC_PREFIX: &str = "event";
14
15#[derive(Clone)]
16pub struct Connection {
17    subscriber: Arc<Mutex<AlfredSubscriber>>,
18    publisher: Arc<Mutex<AlfredPublisher>>
19}
20
21impl Connection {
22    pub async fn new(config: &Config) -> Result<Self, Error> {
23        let subscriber = AlfredSubscriber::new(config.get_alfred_sub_url().as_str()).await?;
24        debug!("Connected as subscriber");
25        tokio::time::sleep(Duration::from_secs(1)).await;
26        let publisher = AlfredPublisher::new(config.get_alfred_pub_url().as_str()).await?;
27        debug!("Connected as publisher");
28        let mut connection = Self {
29            subscriber: Arc::new(Mutex::new(subscriber)),
30            publisher: Arc::new(Mutex::new(publisher))
31        };
32        connection.listen(MODULE_INFO_TOPIC_REQUEST).await?;
33        Ok(connection)
34    }
35
36    pub async fn listen(&mut self, topic: &str) -> Result<(), Error> {
37        self.subscriber.lock().await.listen(topic).await
38    }
39
40    pub async fn receive_all(&self) -> Result<(String, Message), Error> {
41        self.subscriber.lock().await.receive().await
42    }
43
44    async fn send_module_info(&self, module_name: &str, capabilities: &HashMap<String, String>) -> Result<(), Error> {
45        let info_msg = Message {
46            text: module_name.to_string(),
47            message_type: MessageType::Text,
48            params: capabilities.clone(),
49            ..Message::default()
50        };
51        self.send(MODULE_INFO_TOPIC_RESPONSE, &info_msg).await
52    }
53
54    pub async fn manage_module_info_request(&self, topic: &str, module_name: &str, capabilities: &HashMap<String, String>) -> Result<bool, Error> {
55        if topic != MODULE_INFO_TOPIC_REQUEST { return Ok(false); }
56        debug!("Received info request. Replying...");
57        self.send_module_info(module_name, capabilities).await?;
58        Ok(true)
59    }
60
61    pub async fn receive(&self, module_name: &str, capabilities: &HashMap<String, String>) -> Result<(String, Message), Error> {
62        loop {
63            let (topic, message) = self.receive_all().await?;
64            if self.manage_module_info_request(topic.as_str(), module_name, capabilities).await? {
65                continue;
66            }
67            return Ok((topic, message));
68        }
69    }
70
71    pub async fn send(&self, topic: &str, message: &Message) -> Result<(), Error> {
72        self.publisher.lock().await.send(topic, message).await
73    }
74
75    pub async fn send_event(&self, publisher_name: &str, event_name: &str, message: &Message) -> Result<(), Error> {
76        let topic = format!("{TOPIC_PREFIX}.{publisher_name}.{event_name}");
77        let topic_ref: &'static str = Box::leak(topic.into_boxed_str());
78        self.send(topic_ref, message).await
79    }
80}