use std::marker::PhantomData;
use std::time::Duration;
use pondsocket_common::{
ChannelEvent, PondEvent, PondSchema, PresenceMessage, ServerMessage, from_pond_map,
from_pond_value, to_pond_map,
};
use tokio::sync::broadcast;
use crate::{Channel, ClientError, PondClient};
#[derive(Clone)]
pub struct TypedChannel<S> {
raw: Channel,
_schema: PhantomData<S>,
}
impl<S> TypedChannel<S>
where
S: PondSchema,
{
pub fn new(raw: Channel) -> Self {
Self {
raw,
_schema: PhantomData,
}
}
pub fn raw(&self) -> &Channel {
&self.raw
}
pub fn name(&self) -> &str {
self.raw.name()
}
pub fn state(&self) -> pondsocket_common::ChannelState {
self.raw.state()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<ChannelEvent> {
self.raw.subscribe_events()
}
pub async fn presence(&self) -> Result<Vec<S::Presence>, ClientError> {
self.raw
.presence()
.await
.into_iter()
.map(from_pond_map)
.collect::<serde_json::Result<Vec<_>>>()
.map_err(ClientError::Serialization)
}
pub async fn join(&self) {
self.raw.join().await;
}
pub async fn leave(&self) {
self.raw.leave().await;
}
pub async fn send<E>(&self, payload: &E::Payload) -> Result<(), ClientError>
where
E: PondEvent,
{
self.raw
.send_message(E::NAME, Some(to_pond_map(payload)?))
.await;
Ok(())
}
pub async fn request<E>(
&self,
payload: &E::Payload,
timeout: Option<Duration>,
) -> Result<E::Response, ClientError>
where
E: PondEvent,
{
let response = self
.raw
.send_for_response(E::NAME, Some(to_pond_map(payload)?), timeout)
.await?;
from_pond_map(response).map_err(ClientError::Serialization)
}
pub fn decode_message<E>(
&self,
message: &ServerMessage,
) -> Result<Option<E::Payload>, ClientError>
where
E: PondEvent,
{
if message.event != E::NAME {
return Ok(None);
}
from_pond_map(message.payload.clone())
.map(Some)
.map_err(ClientError::Serialization)
}
pub fn decode_presence(
&self,
message: &PresenceMessage,
) -> Result<(S::Presence, Vec<S::Presence>), ClientError> {
let changed =
from_pond_map(message.payload.changed.clone()).map_err(ClientError::Serialization)?;
let presence = message
.payload
.presence
.iter()
.cloned()
.map(from_pond_map)
.collect::<serde_json::Result<Vec<_>>>()
.map_err(ClientError::Serialization)?;
Ok((changed, presence))
}
pub fn decode_event<E>(&self, event: ChannelEvent) -> Result<Option<E::Payload>, ClientError>
where
E: PondEvent,
{
match event {
ChannelEvent::Message(message) => self.decode_message::<E>(&message),
ChannelEvent::Presence(_) => Ok(None),
}
}
}
impl PondClient {
pub async fn create_typed_channel<S>(
&self,
name: impl Into<String>,
params: Option<&S::JoinParams>,
) -> Result<TypedChannel<S>, ClientError>
where
S: PondSchema,
{
let params = params.map(to_pond_map).transpose()?;
let channel = self.create_channel(name, params).await;
Ok(TypedChannel::new(channel))
}
}
pub fn decode_payload<E>(message: ServerMessage) -> Result<E::Payload, ClientError>
where
E: PondEvent,
{
from_pond_map(message.payload).map_err(ClientError::Serialization)
}
pub fn decode_presence_value<S>(value: serde_json::Value) -> Result<S::Presence, ClientError>
where
S: PondSchema,
{
from_pond_value(value).map_err(ClientError::Serialization)
}