1use std::task::{Poll, ready};
2
3use futures_util::Stream;
4use serde::Serialize;
5use thiserror::Error;
6use tokio::sync::oneshot;
7
8use crate::{
9 protocol::{
10 ClientPluginMessage, InspectorContext, PluginId, ServerPluginMessage, TileIcon, TileId,
11 TileLabel, TileModel,
12 },
13 subscription::{Subscriber, Subscriptions},
14 ws::{WsMessage, WsRx, WsTx},
15};
16
17#[derive(Debug, Error)]
18pub enum SessionError {
19 #[error(transparent)]
21 Serde(#[from] serde_json::Error),
22
23 #[error("session closed")]
26 Closed,
27
28 #[error("unexpected message")]
30 UnexpectedMessage,
31}
32
33#[derive(Clone)]
35pub struct PluginSessionHandle {
36 tx: WsTx,
37 subscriptions: Subscriptions,
38}
39
40impl PluginSessionHandle {
41 pub(crate) fn new(tx: WsTx, subscriptions: Subscriptions) -> Self {
42 Self { tx, subscriptions }
43 }
44}
45
46impl PluginSessionHandle {
47 pub(crate) fn send_message(&self, msg: ClientPluginMessage) -> Result<(), SessionError> {
49 let msg = serde_json::to_string(&msg)?;
50 let message = WsMessage::text(msg);
51 tracing::debug!(?message, "sending message to server");
52 self.tx.send(message).map_err(|_| SessionError::Closed)?;
53 Ok(())
54 }
55
56 pub(crate) fn register(&self, plugin_id: PluginId) -> Result<(), SessionError> {
58 self.send_message(ClientPluginMessage::RegisterPlugin { plugin_id })?;
59 Ok(())
60 }
61
62 pub fn request_properties(&self) -> Result<(), SessionError> {
64 self.send_message(ClientPluginMessage::GetProperties {})?;
65 Ok(())
66 }
67
68 pub async fn get_properties(&self) -> Result<serde_json::Value, SessionError> {
71 let (tx, rx) = oneshot::channel();
72
73 self.subscriptions.add(Subscriber::new(
74 |msg| matches!(msg, ServerPluginMessage::Properties { .. }),
75 tx,
76 ));
77
78 self.request_properties()?;
79
80 let msg = rx.await.map_err(|_| SessionError::Closed)?;
82 let msg = match msg {
83 ServerPluginMessage::Properties { properties } => properties,
84 _ => return Err(SessionError::UnexpectedMessage),
85 };
86
87 Ok(msg)
88 }
89
90 pub fn set_properties<T>(&self, properties: T) -> Result<(), SessionError>
97 where
98 T: Serialize,
99 {
100 let properties = serde_json::to_value(properties)?;
101 self.send_message(ClientPluginMessage::SetProperties {
102 properties,
103 partial: false,
104 })
105 }
106
107 pub fn set_properties_partial<T>(&self, properties: T) -> Result<(), SessionError>
114 where
115 T: Serialize,
116 {
117 let properties = serde_json::to_value(properties)?;
118 self.send_message(ClientPluginMessage::SetProperties {
119 properties,
120 partial: true,
121 })
122 }
123
124 pub fn request_tile_properties(&self, tile_id: TileId) -> Result<(), SessionError> {
126 self.send_message(ClientPluginMessage::GetTileProperties { tile_id })?;
127 Ok(())
128 }
129
130 pub async fn get_tile_properties(
133 &self,
134 tile_id: TileId,
135 ) -> Result<serde_json::Value, SessionError> {
136 let (tx, rx) = oneshot::channel();
137
138 self.subscriptions.add(Subscriber::new(
139 move |msg| match msg {
140 ServerPluginMessage::TileProperties {
141 tile_id: other_id, ..
142 } => other_id.eq(&tile_id),
143 _ => false,
144 },
145 tx,
146 ));
147
148 self.request_tile_properties(tile_id)?;
149
150 let msg = rx.await.map_err(|_| SessionError::Closed)?;
152 let msg = match msg {
153 ServerPluginMessage::TileProperties { properties, .. } => properties,
154 _ => return Err(SessionError::UnexpectedMessage),
155 };
156
157 Ok(msg)
158 }
159
160 pub fn request_visible_tiles(&self) -> Result<(), SessionError> {
162 self.send_message(ClientPluginMessage::GetVisibleTiles)?;
163 Ok(())
164 }
165
166 pub async fn get_visible_tiles(&self) -> Result<Vec<TileModel>, SessionError> {
169 let (tx, rx) = oneshot::channel();
170
171 self.subscriptions.add(Subscriber::new(
172 move |msg| matches!(msg, ServerPluginMessage::VisibleTiles { .. }),
173 tx,
174 ));
175
176 self.request_visible_tiles()?;
177
178 let msg = rx.await.map_err(|_| SessionError::Closed)?;
180 let msg = match msg {
181 ServerPluginMessage::VisibleTiles { tiles } => tiles,
182 _ => return Err(SessionError::UnexpectedMessage),
183 };
184
185 Ok(msg)
186 }
187
188 pub fn set_tile_properties<T>(&self, tile_id: TileId, properties: T) -> Result<(), SessionError>
198 where
199 T: Serialize,
200 {
201 let properties = serde_json::to_value(properties)?;
202 self.send_message(ClientPluginMessage::SetTileProperties {
203 tile_id,
204 properties,
205 partial: false,
206 })
207 }
208
209 pub fn set_tile_properties_partial<T>(
219 &self,
220 tile_id: TileId,
221 properties: T,
222 ) -> Result<(), SessionError>
223 where
224 T: Serialize,
225 {
226 let properties = serde_json::to_value(properties)?;
227 self.send_message(ClientPluginMessage::SetTileProperties {
228 tile_id,
229 properties,
230 partial: true,
231 })
232 }
233
234 pub fn set_tile_icon(&self, tile_id: TileId, icon: TileIcon) -> Result<(), SessionError> {
239 self.send_message(ClientPluginMessage::SetTileIcon { tile_id, icon })
240 }
241
242 pub fn set_tile_label(&self, tile_id: TileId, label: TileLabel) -> Result<(), SessionError> {
247 self.send_message(ClientPluginMessage::SetTileLabel { tile_id, label })
248 }
249
250 pub fn send_to_inspector<T>(&self, ctx: InspectorContext, msg: T) -> Result<(), SessionError>
253 where
254 T: Serialize,
255 {
256 let message = serde_json::to_value(msg)?;
257 self.send_message(ClientPluginMessage::SendToInspector { ctx, message })
258 }
259
260 pub fn open_url(&self, url: String) -> Result<(), SessionError> {
263 self.send_message(ClientPluginMessage::OpenUrl { url })
264 }
265}
266
267pub(crate) struct PluginSessionRx {
268 rx: WsRx,
269}
270
271impl PluginSessionRx {
272 pub(crate) fn new(rx: WsRx) -> Self {
273 Self { rx }
274 }
275}
276
277impl Stream for PluginSessionRx {
278 type Item = Result<ServerPluginMessage, SessionError>;
279
280 fn poll_next(
281 self: std::pin::Pin<&mut Self>,
282 cx: &mut std::task::Context<'_>,
283 ) -> std::task::Poll<Option<Self::Item>> {
284 let this = self.get_mut();
285
286 loop {
287 let msg = match ready!(this.rx.poll_recv(cx)) {
289 Some(value) => value,
290 None => return Poll::Ready(None),
291 };
292
293 let msg = match msg {
294 WsMessage::Text(utf8_bytes) => utf8_bytes,
295
296 WsMessage::Ping(_) | WsMessage::Pong(_) | WsMessage::Frame(_) => continue,
298
299 WsMessage::Binary(_) => {
301 return Poll::Ready(Some(Err(SessionError::UnexpectedMessage)));
302 }
303
304 WsMessage::Close(_) => return Poll::Ready(None),
306 };
307
308 tracing::debug!(?msg, "received message from server");
309
310 let msg: ServerPluginMessage = match serde_json::from_str(msg.as_str()) {
311 Ok(value) => value,
312 Err(cause) => {
313 tracing::error!(?cause, "invalid or unknown message");
314 continue;
315 }
316 };
317
318 return Poll::Ready(Some(Ok(msg)));
319 }
320 }
321}