srad_client/
channel.rs

1use std::sync::{Arc, Mutex};
2
3use crate::{Event, LastWill, StatePayload};
4use async_trait::async_trait;
5use srad_types::{
6    payload::Payload,
7    topic::{DeviceTopic, NodeTopic, StateTopic, TopicFilter},
8};
9use tokio::sync::mpsc;
10
11/// A [Client](crate::Client) implementation that uses channels for message passing.
12///
13/// # Examples
14///
15/// See [ChannelEventLoop]
16#[derive(Clone)]
17pub struct ChannelClient {
18    tx: mpsc::UnboundedSender<OutboundMessage>,
19}
20
21#[async_trait]
22impl crate::Client for ChannelClient {
23    async fn disconnect(&self) -> Result<(), ()> {
24        match self.tx.send(OutboundMessage::Disconnect) {
25            Ok(_) => Ok(()),
26            Err(_) => Err(()),
27        }
28    }
29
30    async fn publish_state_message(
31        &self,
32        topic: StateTopic,
33        payload: StatePayload,
34    ) -> Result<(), ()> {
35        match self
36            .tx
37            .send(OutboundMessage::StateMessage { topic, payload })
38        {
39            Ok(_) => Ok(()),
40            Err(_) => Err(()),
41        }
42    }
43
44    async fn try_publish_state_message(
45        &self,
46        topic: StateTopic,
47        payload: StatePayload,
48    ) -> Result<(), ()> {
49        self.publish_state_message(topic, payload).await
50    }
51
52    async fn publish_node_message(&self, topic: NodeTopic, payload: Payload) -> Result<(), ()> {
53        match self
54            .tx
55            .send(OutboundMessage::NodeMessage { topic, payload })
56        {
57            Ok(_) => Ok(()),
58            Err(_) => Err(()),
59        }
60    }
61
62    async fn try_publish_node_message(&self, topic: NodeTopic, payload: Payload) -> Result<(), ()> {
63        self.publish_node_message(topic, payload).await
64    }
65
66    async fn publish_device_message(&self, topic: DeviceTopic, payload: Payload) -> Result<(), ()> {
67        match self
68            .tx
69            .send(OutboundMessage::DeviceMessage { topic, payload })
70        {
71            Ok(_) => Ok(()),
72            Err(_) => Err(()),
73        }
74    }
75
76    async fn try_publish_device_message(
77        &self,
78        topic: DeviceTopic,
79        payload: Payload,
80    ) -> Result<(), ()> {
81        self.publish_device_message(topic, payload).await
82    }
83
84    async fn subscribe_many(&self, topics: Vec<TopicFilter>) -> Result<(), ()> {
85        match self.tx.send(OutboundMessage::Subscribe(topics)) {
86            Ok(_) => Ok(()),
87            Err(_) => Err(()),
88        }
89    }
90}
91
92/// An Enum representing different messages and requests a [ChannelClient] can send to the [ChannelBroker]
93#[derive(Clone, Debug, PartialEq)]
94pub enum OutboundMessage {
95    Disconnect,
96    StateMessage {
97        topic: StateTopic,
98        payload: StatePayload,
99    },
100    NodeMessage {
101        topic: NodeTopic,
102        payload: Payload,
103    },
104    DeviceMessage {
105        topic: DeviceTopic,
106        payload: Payload,
107    },
108    Subscribe(Vec<TopicFilter>),
109}
110
111/// A "broker" that manages the communication between a [ChannelClient] and an [ChannelEventLoop].
112///
113/// Used to send messages to the eventloop and inspect messages/requests produced by the client
114///
115/// # Examples
116///
117/// ```no_run
118/// use srad_client::{Event, channel::{ChannelEventLoop, ChannelClient}};
119/// use tokio::runtime::Runtime;
120///
121/// let rt = Runtime::new().unwrap();
122/// rt.block_on(async {
123///     let (mut eventloop, client, mut broker) = ChannelEventLoop::new();
124///
125///     //create application that uses the EventLoop and client
126///     
127///     //Send an event to the EventLoop
128///     broker.tx_event.send(Event::Online).unwrap();
129///
130///     //Receive a message or request from the Client
131///     let message = broker.rx_outbound.recv().await.unwrap();
132/// });
133/// ```
134pub struct ChannelBroker {
135    pub rx_outbound: mpsc::UnboundedReceiver<OutboundMessage>,
136    pub tx_event: mpsc::UnboundedSender<Event>,
137    last_will: Arc<Mutex<Option<LastWill>>>,
138}
139
140impl ChannelBroker {
141    /// Retrieves the current last will message set by the EventLoop, if set.
142    pub fn last_will(&self) -> Option<LastWill> {
143        self.last_will.lock().unwrap().clone()
144    }
145}
146
147/// An [EventLoop](crate::EventLoop) implementation that uses channels
148///
149/// # Examples
150///
151/// See [ChannelBroker]
152pub struct ChannelEventLoop {
153    rx: mpsc::UnboundedReceiver<Event>,
154    last_will: Arc<Mutex<Option<LastWill>>>,
155}
156
157impl ChannelEventLoop {
158    /// Creates a new event loop along with the corresponding client and broker.
159    pub fn new() -> (Self, ChannelClient, ChannelBroker) {
160        let (tx_event, rx_event) = mpsc::unbounded_channel();
161        let (tx_outbound, rx_outbound) = mpsc::unbounded_channel();
162        let last_will = Arc::new(Mutex::new(None));
163        let el = Self {
164            rx: rx_event,
165            last_will: last_will.clone(),
166        };
167        (
168            el,
169            ChannelClient { tx: tx_outbound },
170            ChannelBroker {
171                rx_outbound,
172                tx_event,
173                last_will,
174            },
175        )
176    }
177}
178
179#[async_trait]
180impl crate::EventLoop for ChannelEventLoop {
181    async fn poll(&mut self) -> Event {
182        self.rx.recv().await.unwrap()
183    }
184
185    fn set_last_will(&mut self, will: LastWill) {
186        let mut lw = self.last_will.lock().unwrap();
187        *lw = Some(will)
188    }
189}