use std::time::Duration;
use deno_broadcast_channel::BroadcastChannel;
use serde::{de::DeserializeOwned, Serialize};
use crate::{big_json_args, Error, Runtime};
pub struct BroadcastChannelWrapper<Channel: BroadcastChannel> {
channel: Channel,
resource: <Channel as BroadcastChannel>::Resource,
name: String,
}
impl<Channel: BroadcastChannel> BroadcastChannelWrapper<Channel> {
pub fn new(channel: &Channel, name: impl ToString) -> Result<Self, Error> {
let channel = channel.clone();
let resource = channel.subscribe()?;
let name = name.to_string();
Ok(Self {
channel,
resource,
name,
})
}
pub fn send_sync<T: Serialize>(&self, runtime: &mut Runtime, data: T) -> Result<(), Error> {
let tokio_rt = runtime.tokio_runtime();
tokio_rt.block_on(self.send(runtime, data))
}
pub async fn send<T: Serialize>(&self, runtime: &mut Runtime, data: T) -> Result<(), Error> {
let data: Vec<u8> = runtime
.call_function_async(None, "broadcast_serialize", &data)
.await?;
self.channel
.send(&self.resource, self.name.clone(), data)
.await?;
Ok(())
}
pub async fn recv<T: DeserializeOwned>(
&self,
runtime: &mut Runtime,
timeout: Option<Duration>,
) -> Result<Option<T>, Error> {
let msg = if let Some(timeout) = timeout {
tokio::select! {
msg = self.channel.recv(&self.resource) => msg,
() = tokio::time::sleep(timeout) => Ok(None),
}
} else {
self.channel.recv(&self.resource).await
}?;
let Some((name, data)) = msg else {
return Ok(None);
};
if name == self.name {
let data: T = runtime
.call_function_async(None, "broadcast_deserialize", big_json_args!(data))
.await?;
Ok(Some(data))
} else {
Ok(None)
}
}
pub fn recv_sync<T: DeserializeOwned>(
&self,
runtime: &mut Runtime,
timeout: Option<Duration>,
) -> Result<Option<T>, Error> {
let tokio_rt = runtime.tokio_runtime();
tokio_rt.block_on(self.recv(runtime, timeout))
}
}
impl<Channel: BroadcastChannel> Drop for BroadcastChannelWrapper<Channel> {
fn drop(&mut self) {
self.channel.unsubscribe(&self.resource).ok();
}
}