1use std::task::{Poll, ready};
2
3use futures_util::Stream;
4use serde::Serialize;
5use thiserror::Error;
6use tokio::sync::oneshot;
7
8use crate::{
9 protocol::{
10 ClientPluginMessage, InspectorContext, PluginId, ServerPluginMessage, TileIcon, TileId,
11 TileLabel,
12 },
13 subscription::{Subscriber, Subscriptions},
14 ws::{WsMessage, WsRx, WsTx},
15};
16
17#[derive(Debug, Error)]
18pub enum SessionError {
19 #[error(transparent)]
21 Serde(#[from] serde_json::Error),
22
23 #[error("session closed")]
26 Closed,
27
28 #[error("unexpected message")]
30 UnexpectedMessage,
31}
32
33#[derive(Clone)]
35pub struct PluginSessionHandle {
36 tx: WsTx,
37 subscriptions: Subscriptions,
38}
39
40impl PluginSessionHandle {
41 pub(crate) fn new(tx: WsTx, subscriptions: Subscriptions) -> Self {
42 Self { tx, subscriptions }
43 }
44}
45
46impl PluginSessionHandle {
47 pub(crate) fn send_message(&self, msg: ClientPluginMessage) -> Result<(), SessionError> {
49 let msg = serde_json::to_string(&msg)?;
50 let message = WsMessage::text(msg);
51 tracing::debug!(?message, "sending message to server");
52 self.tx.send(message).map_err(|_| SessionError::Closed)?;
53 Ok(())
54 }
55
56 pub(crate) fn register(&self, plugin_id: PluginId) -> Result<(), SessionError> {
58 self.send_message(ClientPluginMessage::RegisterPlugin { plugin_id })?;
59 Ok(())
60 }
61
62 pub fn request_properties(&self) -> Result<(), SessionError> {
64 self.send_message(ClientPluginMessage::GetProperties {})?;
65 Ok(())
66 }
67
68 pub async fn get_properties(&self) -> Result<serde_json::Value, SessionError> {
71 let (tx, rx) = oneshot::channel();
72
73 self.subscriptions.add(Subscriber::new(
74 |msg| matches!(msg, ServerPluginMessage::Properties { .. }),
75 tx,
76 ));
77
78 self.request_properties()?;
79
80 let msg = rx.await.map_err(|_| SessionError::Closed)?;
82 let msg = match msg {
83 ServerPluginMessage::Properties { properties } => properties,
84 _ => return Err(SessionError::UnexpectedMessage),
85 };
86
87 Ok(msg)
88 }
89
90 pub fn set_properties<T>(&self, properties: T) -> Result<(), SessionError>
97 where
98 T: Serialize,
99 {
100 let properties = serde_json::to_value(properties)?;
101 self.send_message(ClientPluginMessage::SetProperties {
102 properties,
103 partial: false,
104 })
105 }
106
107 pub fn set_properties_partial<T>(&self, properties: T) -> Result<(), SessionError>
114 where
115 T: Serialize,
116 {
117 let properties = serde_json::to_value(properties)?;
118 self.send_message(ClientPluginMessage::SetProperties {
119 properties,
120 partial: true,
121 })
122 }
123
124 pub fn request_tile_properties(&self, tile_id: TileId) -> Result<(), SessionError> {
126 self.send_message(ClientPluginMessage::GetTileProperties { tile_id })?;
127 Ok(())
128 }
129
130 pub async fn get_tile_properties(
133 &self,
134 tile_id: TileId,
135 ) -> Result<serde_json::Value, SessionError> {
136 let (tx, rx) = oneshot::channel();
137
138 self.subscriptions.add(Subscriber::new(
139 move |msg| match msg {
140 ServerPluginMessage::TileProperties {
141 tile_id: other_id, ..
142 } => other_id.eq(&tile_id),
143 _ => false,
144 },
145 tx,
146 ));
147
148 self.request_tile_properties(tile_id)?;
149
150 let msg = rx.await.map_err(|_| SessionError::Closed)?;
152 let msg = match msg {
153 ServerPluginMessage::TileProperties { properties, .. } => properties,
154 _ => return Err(SessionError::UnexpectedMessage),
155 };
156
157 Ok(msg)
158 }
159
160 pub fn set_tile_properties<T>(&self, tile_id: TileId, properties: T) -> Result<(), SessionError>
170 where
171 T: Serialize,
172 {
173 let properties = serde_json::to_value(properties)?;
174 self.send_message(ClientPluginMessage::SetTileProperties {
175 tile_id,
176 properties,
177 partial: false,
178 })
179 }
180
181 pub fn set_tile_properties_partial<T>(
191 &self,
192 tile_id: TileId,
193 properties: T,
194 ) -> Result<(), SessionError>
195 where
196 T: Serialize,
197 {
198 let properties = serde_json::to_value(properties)?;
199 self.send_message(ClientPluginMessage::SetTileProperties {
200 tile_id,
201 properties,
202 partial: true,
203 })
204 }
205
206 pub fn set_tile_icon(&self, tile_id: TileId, icon: TileIcon) -> Result<(), SessionError> {
211 self.send_message(ClientPluginMessage::SetTileIcon { tile_id, icon })
212 }
213
214 pub fn set_tile_label(&self, tile_id: TileId, label: TileLabel) -> Result<(), SessionError> {
219 self.send_message(ClientPluginMessage::SetTileLabel { tile_id, label })
220 }
221
222 pub fn send_to_inspector<T>(&self, ctx: InspectorContext, msg: T) -> Result<(), SessionError>
225 where
226 T: Serialize,
227 {
228 let message = serde_json::to_value(msg)?;
229 self.send_message(ClientPluginMessage::SendToInspector { ctx, message })
230 }
231
232 pub fn open_url(&self, url: String) -> Result<(), SessionError> {
235 self.send_message(ClientPluginMessage::OpenUrl { url })
236 }
237}
238
239pub(crate) struct PluginSessionRx {
240 rx: WsRx,
241}
242
243impl PluginSessionRx {
244 pub(crate) fn new(rx: WsRx) -> Self {
245 Self { rx }
246 }
247}
248
249impl Stream for PluginSessionRx {
250 type Item = Result<ServerPluginMessage, SessionError>;
251
252 fn poll_next(
253 self: std::pin::Pin<&mut Self>,
254 cx: &mut std::task::Context<'_>,
255 ) -> std::task::Poll<Option<Self::Item>> {
256 let this = self.get_mut();
257
258 loop {
259 let msg = match ready!(this.rx.poll_recv(cx)) {
261 Some(value) => value,
262 None => return Poll::Ready(None),
263 };
264
265 let msg = match msg {
266 WsMessage::Text(utf8_bytes) => utf8_bytes,
267
268 WsMessage::Ping(_) | WsMessage::Pong(_) | WsMessage::Frame(_) => continue,
270
271 WsMessage::Binary(_) => {
273 return Poll::Ready(Some(Err(SessionError::UnexpectedMessage)));
274 }
275
276 WsMessage::Close(_) => return Poll::Ready(None),
278 };
279
280 tracing::debug!(?msg, "received message from server");
281
282 let msg: ServerPluginMessage = match serde_json::from_str(msg.as_str()) {
283 Ok(value) => value,
284 Err(cause) => {
285 tracing::error!(?cause, "invalid or unknown message");
286 continue;
287 }
288 };
289
290 return Poll::Ready(Some(Ok(msg)));
291 }
292 }
293}