pondsocket_client/
typed.rs1use std::marker::PhantomData;
2use std::time::Duration;
3
4use pondsocket_common::{
5 ChannelEvent, PondEvent, PondSchema, PresenceMessage, ServerMessage, from_pond_map,
6 from_pond_value, to_pond_map,
7};
8use tokio::sync::broadcast;
9
10use crate::{Channel, ClientError, PondClient};
11
12#[derive(Clone)]
13pub struct TypedChannel<S> {
14 raw: Channel,
15 _schema: PhantomData<S>,
16}
17
18impl<S> TypedChannel<S>
19where
20 S: PondSchema,
21{
22 pub fn new(raw: Channel) -> Self {
23 Self {
24 raw,
25 _schema: PhantomData,
26 }
27 }
28
29 pub fn raw(&self) -> &Channel {
30 &self.raw
31 }
32
33 pub fn name(&self) -> &str {
34 self.raw.name()
35 }
36
37 pub fn state(&self) -> pondsocket_common::ChannelState {
38 self.raw.state()
39 }
40
41 pub fn subscribe_events(&self) -> broadcast::Receiver<ChannelEvent> {
42 self.raw.subscribe_events()
43 }
44
45 pub async fn presence(&self) -> Result<Vec<S::Presence>, ClientError> {
46 self.raw
47 .presence()
48 .await
49 .into_iter()
50 .map(from_pond_map)
51 .collect::<serde_json::Result<Vec<_>>>()
52 .map_err(ClientError::Serialization)
53 }
54
55 pub async fn join(&self) {
56 self.raw.join().await;
57 }
58
59 pub async fn leave(&self) {
60 self.raw.leave().await;
61 }
62
63 pub async fn send<E>(&self, payload: &E::Payload) -> Result<(), ClientError>
64 where
65 E: PondEvent,
66 {
67 self.raw
68 .send_message(E::NAME, Some(to_pond_map(payload)?))
69 .await;
70 Ok(())
71 }
72
73 pub async fn request<E>(
74 &self,
75 payload: &E::Payload,
76 timeout: Option<Duration>,
77 ) -> Result<E::Response, ClientError>
78 where
79 E: PondEvent,
80 {
81 let response = self
82 .raw
83 .send_for_response(E::NAME, Some(to_pond_map(payload)?), timeout)
84 .await?;
85 from_pond_map(response).map_err(ClientError::Serialization)
86 }
87
88 pub fn decode_message<E>(
89 &self,
90 message: &ServerMessage,
91 ) -> Result<Option<E::Payload>, ClientError>
92 where
93 E: PondEvent,
94 {
95 if message.event != E::NAME {
96 return Ok(None);
97 }
98 from_pond_map(message.payload.clone())
99 .map(Some)
100 .map_err(ClientError::Serialization)
101 }
102
103 pub fn decode_presence(
104 &self,
105 message: &PresenceMessage,
106 ) -> Result<(S::Presence, Vec<S::Presence>), ClientError> {
107 let changed =
108 from_pond_map(message.payload.changed.clone()).map_err(ClientError::Serialization)?;
109 let presence = message
110 .payload
111 .presence
112 .iter()
113 .cloned()
114 .map(from_pond_map)
115 .collect::<serde_json::Result<Vec<_>>>()
116 .map_err(ClientError::Serialization)?;
117 Ok((changed, presence))
118 }
119
120 pub fn decode_event<E>(&self, event: ChannelEvent) -> Result<Option<E::Payload>, ClientError>
121 where
122 E: PondEvent,
123 {
124 match event {
125 ChannelEvent::Message(message) => self.decode_message::<E>(&message),
126 ChannelEvent::Presence(_) => Ok(None),
127 }
128 }
129}
130
131impl PondClient {
132 pub async fn create_typed_channel<S>(
133 &self,
134 name: impl Into<String>,
135 params: Option<&S::JoinParams>,
136 ) -> Result<TypedChannel<S>, ClientError>
137 where
138 S: PondSchema,
139 {
140 let params = params.map(to_pond_map).transpose()?;
141 let channel = self.create_channel(name, params).await;
142 Ok(TypedChannel::new(channel))
143 }
144}
145
146pub fn decode_payload<E>(message: ServerMessage) -> Result<E::Payload, ClientError>
147where
148 E: PondEvent,
149{
150 from_pond_map(message.payload).map_err(ClientError::Serialization)
151}
152
153pub fn decode_presence_value<S>(value: serde_json::Value) -> Result<S::Presence, ClientError>
154where
155 S: PondSchema,
156{
157 from_pond_value(value).map_err(ClientError::Serialization)
158}