1use std::sync::{Arc, Mutex};
2
3use crate::{Event, LastWill, StatePayload};
4use async_trait::async_trait;
5use srad_types::{
6 payload::Payload,
7 topic::{DeviceTopic, NodeTopic, StateTopic, TopicFilter},
8};
9use tokio::sync::mpsc;
10
11#[derive(Clone)]
17pub struct ChannelClient {
18 tx: mpsc::UnboundedSender<OutboundMessage>,
19}
20
21#[async_trait]
22impl crate::Client for ChannelClient {
23 async fn disconnect(&self) -> Result<(), ()> {
24 match self.tx.send(OutboundMessage::Disconnect) {
25 Ok(_) => Ok(()),
26 Err(_) => Err(()),
27 }
28 }
29
30 async fn publish_state_message(
31 &self,
32 topic: StateTopic,
33 payload: StatePayload,
34 ) -> Result<(), ()> {
35 match self
36 .tx
37 .send(OutboundMessage::StateMessage { topic, payload })
38 {
39 Ok(_) => Ok(()),
40 Err(_) => Err(()),
41 }
42 }
43
44 async fn try_publish_state_message(
45 &self,
46 topic: StateTopic,
47 payload: StatePayload,
48 ) -> Result<(), ()> {
49 self.publish_state_message(topic, payload).await
50 }
51
52 async fn publish_node_message(&self, topic: NodeTopic, payload: Payload) -> Result<(), ()> {
53 match self
54 .tx
55 .send(OutboundMessage::NodeMessage { topic, payload })
56 {
57 Ok(_) => Ok(()),
58 Err(_) => Err(()),
59 }
60 }
61
62 async fn try_publish_node_message(&self, topic: NodeTopic, payload: Payload) -> Result<(), ()> {
63 self.publish_node_message(topic, payload).await
64 }
65
66 async fn publish_device_message(&self, topic: DeviceTopic, payload: Payload) -> Result<(), ()> {
67 match self
68 .tx
69 .send(OutboundMessage::DeviceMessage { topic, payload })
70 {
71 Ok(_) => Ok(()),
72 Err(_) => Err(()),
73 }
74 }
75
76 async fn try_publish_device_message(
77 &self,
78 topic: DeviceTopic,
79 payload: Payload,
80 ) -> Result<(), ()> {
81 self.publish_device_message(topic, payload).await
82 }
83
84 async fn subscribe_many(&self, topics: Vec<TopicFilter>) -> Result<(), ()> {
85 match self.tx.send(OutboundMessage::Subscribe(topics)) {
86 Ok(_) => Ok(()),
87 Err(_) => Err(()),
88 }
89 }
90}
91
92#[derive(Clone, Debug, PartialEq)]
94pub enum OutboundMessage {
95 Disconnect,
96 StateMessage {
97 topic: StateTopic,
98 payload: StatePayload,
99 },
100 NodeMessage {
101 topic: NodeTopic,
102 payload: Payload,
103 },
104 DeviceMessage {
105 topic: DeviceTopic,
106 payload: Payload,
107 },
108 Subscribe(Vec<TopicFilter>),
109}
110
111pub struct ChannelBroker {
135 pub rx_outbound: mpsc::UnboundedReceiver<OutboundMessage>,
136 pub tx_event: mpsc::UnboundedSender<Event>,
137 last_will: Arc<Mutex<Option<LastWill>>>,
138}
139
140impl ChannelBroker {
141 pub fn last_will(&self) -> Option<LastWill> {
143 self.last_will.lock().unwrap().clone()
144 }
145}
146
147pub struct ChannelEventLoop {
153 rx: mpsc::UnboundedReceiver<Event>,
154 last_will: Arc<Mutex<Option<LastWill>>>,
155}
156
157impl ChannelEventLoop {
158 pub fn new() -> (Self, ChannelClient, ChannelBroker) {
160 let (tx_event, rx_event) = mpsc::unbounded_channel();
161 let (tx_outbound, rx_outbound) = mpsc::unbounded_channel();
162 let last_will = Arc::new(Mutex::new(None));
163 let el = Self {
164 rx: rx_event,
165 last_will: last_will.clone(),
166 };
167 (
168 el,
169 ChannelClient { tx: tx_outbound },
170 ChannelBroker {
171 rx_outbound,
172 tx_event,
173 last_will,
174 },
175 )
176 }
177}
178
179#[async_trait]
180impl crate::EventLoop for ChannelEventLoop {
181 async fn poll(&mut self) -> Event {
182 self.rx.recv().await.unwrap()
183 }
184
185 fn set_last_will(&mut self, will: LastWill) {
186 let mut lw = self.last_will.lock().unwrap();
187 *lw = Some(will)
188 }
189}