1use std::task::{Poll, ready};
2
3use futures_util::Stream;
4use serde::Serialize;
5use thiserror::Error;
6use tokio::sync::oneshot;
7
8use crate::{
9 DeviceId, DeviceIndicator,
10 protocol::{
11 ClientPluginMessage, InspectorContext, PluginId, ServerPluginMessage, TileIcon, TileId,
12 TileLabel, TileModel,
13 },
14 subscription::{Subscriber, Subscriptions},
15 ws::{WsMessage, WsRx, WsTx},
16};
17
18#[derive(Debug, Error)]
19pub enum SessionError {
20 #[error(transparent)]
22 Serde(#[from] serde_json::Error),
23
24 #[error("session closed")]
27 Closed,
28
29 #[error("unexpected message")]
31 UnexpectedMessage,
32}
33
34#[derive(Clone)]
36pub struct PluginSessionHandle {
37 tx: WsTx,
38 subscriptions: Subscriptions,
39}
40
41impl PluginSessionHandle {
42 pub(crate) fn new(tx: WsTx, subscriptions: Subscriptions) -> Self {
43 Self { tx, subscriptions }
44 }
45}
46
47impl PluginSessionHandle {
48 pub(crate) fn send_message(&self, msg: ClientPluginMessage) -> Result<(), SessionError> {
50 let msg = serde_json::to_string(&msg)?;
51 let message = WsMessage::text(msg);
52 tracing::debug!(?message, "sending message to server");
53 self.tx.send(message).map_err(|_| SessionError::Closed)?;
54 Ok(())
55 }
56
57 pub(crate) fn register(&self, plugin_id: PluginId) -> Result<(), SessionError> {
59 self.send_message(ClientPluginMessage::RegisterPlugin { plugin_id })?;
60 Ok(())
61 }
62
63 pub fn request_properties(&self) -> Result<(), SessionError> {
65 self.send_message(ClientPluginMessage::GetProperties {})?;
66 Ok(())
67 }
68
69 pub async fn get_properties(&self) -> Result<serde_json::Value, SessionError> {
72 let (tx, rx) = oneshot::channel();
73
74 self.subscriptions.add(Subscriber::new(
75 |msg| matches!(msg, ServerPluginMessage::Properties { .. }),
76 tx,
77 ));
78
79 self.request_properties()?;
80
81 let msg = rx.await.map_err(|_| SessionError::Closed)?;
83 let msg = match msg {
84 ServerPluginMessage::Properties { properties } => properties,
85 _ => return Err(SessionError::UnexpectedMessage),
86 };
87
88 Ok(msg)
89 }
90
91 pub fn set_properties<T>(&self, properties: T) -> Result<(), SessionError>
98 where
99 T: Serialize,
100 {
101 let properties = serde_json::to_value(properties)?;
102 self.send_message(ClientPluginMessage::SetProperties {
103 properties,
104 partial: false,
105 })
106 }
107
108 pub fn set_properties_partial<T>(&self, properties: T) -> Result<(), SessionError>
115 where
116 T: Serialize,
117 {
118 let properties = serde_json::to_value(properties)?;
119 self.send_message(ClientPluginMessage::SetProperties {
120 properties,
121 partial: true,
122 })
123 }
124
125 pub fn request_tile_properties(&self, tile_id: TileId) -> Result<(), SessionError> {
127 self.send_message(ClientPluginMessage::GetTileProperties { tile_id })?;
128 Ok(())
129 }
130
131 pub async fn get_tile_properties(
134 &self,
135 tile_id: TileId,
136 ) -> Result<serde_json::Value, SessionError> {
137 let (tx, rx) = oneshot::channel();
138
139 self.subscriptions.add(Subscriber::new(
140 move |msg| match msg {
141 ServerPluginMessage::TileProperties {
142 tile_id: other_id, ..
143 } => other_id.eq(&tile_id),
144 _ => false,
145 },
146 tx,
147 ));
148
149 self.request_tile_properties(tile_id)?;
150
151 let msg = rx.await.map_err(|_| SessionError::Closed)?;
153 let msg = match msg {
154 ServerPluginMessage::TileProperties { properties, .. } => properties,
155 _ => return Err(SessionError::UnexpectedMessage),
156 };
157
158 Ok(msg)
159 }
160
161 pub fn request_visible_tiles(&self) -> Result<(), SessionError> {
163 self.send_message(ClientPluginMessage::GetVisibleTiles)?;
164 Ok(())
165 }
166
167 pub async fn get_visible_tiles(&self) -> Result<Vec<TileModel>, SessionError> {
170 let (tx, rx) = oneshot::channel();
171
172 self.subscriptions.add(Subscriber::new(
173 move |msg| matches!(msg, ServerPluginMessage::VisibleTiles { .. }),
174 tx,
175 ));
176
177 self.request_visible_tiles()?;
178
179 let msg = rx.await.map_err(|_| SessionError::Closed)?;
181 let msg = match msg {
182 ServerPluginMessage::VisibleTiles { tiles } => tiles,
183 _ => return Err(SessionError::UnexpectedMessage),
184 };
185
186 Ok(msg)
187 }
188
189 pub fn display_indicator(
195 &self,
196 device_id: DeviceId,
197 tile_id: TileId,
198 indicator: DeviceIndicator,
199 duration: u32,
200 ) -> Result<(), SessionError> {
201 self.send_message(ClientPluginMessage::DisplayIndicator {
202 device_id,
203 tile_id,
204 indicator,
205 duration,
206 })
207 }
208
209 pub fn set_tile_properties<T>(&self, tile_id: TileId, properties: T) -> Result<(), SessionError>
219 where
220 T: Serialize,
221 {
222 let properties = serde_json::to_value(properties)?;
223 self.send_message(ClientPluginMessage::SetTileProperties {
224 tile_id,
225 properties,
226 partial: false,
227 })
228 }
229
230 pub fn set_tile_properties_partial<T>(
240 &self,
241 tile_id: TileId,
242 properties: T,
243 ) -> Result<(), SessionError>
244 where
245 T: Serialize,
246 {
247 let properties = serde_json::to_value(properties)?;
248 self.send_message(ClientPluginMessage::SetTileProperties {
249 tile_id,
250 properties,
251 partial: true,
252 })
253 }
254
255 pub fn set_tile_icon(&self, tile_id: TileId, icon: TileIcon) -> Result<(), SessionError> {
260 self.send_message(ClientPluginMessage::SetTileIcon { tile_id, icon })
261 }
262
263 pub fn set_tile_label(&self, tile_id: TileId, label: TileLabel) -> Result<(), SessionError> {
268 self.send_message(ClientPluginMessage::SetTileLabel { tile_id, label })
269 }
270
271 pub fn send_to_inspector<T>(&self, ctx: InspectorContext, msg: T) -> Result<(), SessionError>
274 where
275 T: Serialize,
276 {
277 let message = serde_json::to_value(msg)?;
278 self.send_message(ClientPluginMessage::SendToInspector { ctx, message })
279 }
280
281 pub fn open_url(&self, url: String) -> Result<(), SessionError> {
284 self.send_message(ClientPluginMessage::OpenUrl { url })
285 }
286}
287
288pub(crate) struct PluginSessionRx {
289 rx: WsRx,
290}
291
292impl PluginSessionRx {
293 pub(crate) fn new(rx: WsRx) -> Self {
294 Self { rx }
295 }
296}
297
298impl Stream for PluginSessionRx {
299 type Item = Result<ServerPluginMessage, SessionError>;
300
301 fn poll_next(
302 self: std::pin::Pin<&mut Self>,
303 cx: &mut std::task::Context<'_>,
304 ) -> std::task::Poll<Option<Self::Item>> {
305 let this = self.get_mut();
306
307 loop {
308 let msg = match ready!(this.rx.poll_recv(cx)) {
310 Some(value) => value,
311 None => return Poll::Ready(None),
312 };
313
314 let msg = match msg {
315 WsMessage::Text(utf8_bytes) => utf8_bytes,
316
317 WsMessage::Ping(_) | WsMessage::Pong(_) | WsMessage::Frame(_) => continue,
319
320 WsMessage::Binary(_) => {
322 return Poll::Ready(Some(Err(SessionError::UnexpectedMessage)));
323 }
324
325 WsMessage::Close(_) => return Poll::Ready(None),
327 };
328
329 tracing::debug!(?msg, "received message from server");
330
331 let msg: ServerPluginMessage = match serde_json::from_str(msg.as_str()) {
332 Ok(value) => value,
333 Err(cause) => {
334 tracing::error!(?cause, "invalid or unknown message");
335 continue;
336 }
337 };
338
339 return Poll::Ready(Some(Ok(msg)));
340 }
341 }
342}