use async_stream::stream;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::broadcast::{channel as create_channel, Sender};
use tokio_stream::Stream;
use crate::errors::*;
const MESSAGES_TO_BE_RETAINED: usize = 5;
#[derive(Serialize)]
struct GQLQueryBody<T: Serialize> {
query: String,
variables: T,
}
#[derive(Deserialize)]
struct GQLPublishResponse {
data: PublishResponse,
}
#[derive(Deserialize)]
struct PublishResponse {
publish: bool,
}
pub struct Publisher {
client: Client,
address: String,
token: String,
}
impl Publisher {
pub fn new(hostname: String, port: String, endpoint: String, token: String) -> Result<Self> {
let address = format!(
"{hostname}:{port}{endpoint}", hostname = hostname,
port = port,
endpoint = endpoint
);
let client = Client::new();
Ok(Self {
client,
address,
token,
})
}
pub async fn publish(&self, channel: &str, data: String) -> Result<()> {
let client = &self.client;
let mut variables = HashMap::new();
variables.insert("channel", channel.to_string());
variables.insert("data", data);
let body = GQLQueryBody {
query: "
mutation PublishData($channel: String!, $data: String!) {
publish(
channel: $channel,
data: $data
)
}
"
.to_string(),
variables,
};
let res = client
.post(&self.address)
.json(&body)
.header("Authorization", "Bearer ".to_string() + &self.token)
.send()
.await
.map_err(|_| ErrorKind::SubscriptionDataPublishFailed)?;
if res.status().to_string() != "200 OK" {
bail!(ErrorKind::SubscriptionDataPublishFailed)
}
let body: GQLPublishResponse = serde_json::from_str(
&res.text()
.await
.map_err(|_| ErrorKind::SubscriptionDataPublishFailed)?,
)
.map_err(|_| ErrorKind::SubscriptionDataPublishFailed)?;
match body.data.publish {
true => Ok(()),
_ => bail!(ErrorKind::SubscriptionDataPublishFailed),
}
}
}
pub struct PubSub {
channels: HashMap<String, Sender<String>>,
}
impl Default for PubSub {
fn default() -> Self {
Self {
channels: HashMap::new(),
}
}
}
impl PubSub {
fn get_channel(&mut self, channel: &str) -> Sender<String> {
let channel_sender = self.channels.get(channel);
let channel_sender = match channel_sender {
Some(sub) => sub,
None => {
let (channel_sender, _receiver) = create_channel(MESSAGES_TO_BE_RETAINED);
self.channels.insert(channel.to_string(), channel_sender); self.channels.get(channel).unwrap() }
};
channel_sender.clone()
}
pub fn subscribe(&mut self, channel: &str) -> impl Stream<Item = String> {
let channel_sender = self.get_channel(channel);
let mut receiver = channel_sender.subscribe();
stream! {
loop {
let message = receiver.recv().await;
match message {
Ok(message) => yield message,
_ => continue
}
}
}
}
pub fn publish(&mut self, channel: &str, data: String) {
let channel_sender = self.get_channel(channel);
let _ = channel_sender.send(data);
}
pub fn close_channel(&mut self, channel: &str) {
self.channels.remove(channel);
}
}