use crate::options::{
ChannelsOption, ChannelsOptions, DisconnectOption, DisconnectOptions, HistoryOption,
HistoryOptions, PublishOption, PublishOptions, SubscribeOption, SubscribeOptions,
UnsubscribeOption, UnsubscribeOptions,
};
use serde::{Deserialize, Serialize};
pub use std::error::Error;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct Pipe {
pub commands: Arc<Mutex<Vec<Command>>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PublishRequest {
pub channel: String,
pub data: serde_json::Value,
pub options: PublishOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BroadcastRequest {
pub channels: Vec<String>,
pub data: serde_json::Value,
pub options: PublishOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SubscribeRequest {
pub channel: String,
pub user: String,
pub options: SubscribeOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UnsubscribeRequest {
pub channel: String,
pub user: String,
pub options: UnsubscribeOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DisconnectRequest {
pub user: String,
pub options: DisconnectOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HistoryRequest {
pub channel: String,
pub options: HistoryOptions,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelsRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum RequestKind {
ChannelsRequest(ChannelsRequest),
PublishRequest(PublishRequest),
BroadcastRequest(BroadcastRequest),
SubscribeRequest(SubscribeRequest),
UnsubscribeRequest(UnsubscribeRequest),
DisconnectRequest(DisconnectRequest),
HistoryRequest(HistoryRequest),
Value(serde_json::Value),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Command {
pub method: String,
pub params: RequestKind,
}
impl Pipe {
pub fn reset(&self) {
let mut commands = self.commands.lock().unwrap();
*commands = Vec::new();
}
pub fn add(&self, cmd: Command) -> Result<(), Box<dyn Error>> {
let mut commands = self.commands.lock().map_err(|_| "Lock poisoned")?;
commands.push(cmd);
Ok(())
}
pub fn add_publish(
&self,
channel: String,
data: &str,
opts: &[PublishOption],
) -> Result<(), Box<dyn Error>> {
let mut options = PublishOptions::default();
for opt in opts {
opt(&mut options)
}
let cmd = Command {
method: "publish".to_string(),
params: RequestKind::PublishRequest(PublishRequest {
channel,
data: serde_json::from_str(data)?,
options,
}),
};
self.add(cmd.clone())?;
Ok(())
}
pub fn add_broadcast(
&self,
channels: Vec<String>,
data: &str,
opts: &[PublishOption],
) -> Result<(), Box<dyn Error>> {
let mut options = PublishOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "broadcast".to_string(),
params: RequestKind::BroadcastRequest(BroadcastRequest {
channels: channels,
data: serde_json::from_str(data)?,
options,
}),
};
self.add(cmd)?;
Ok(())
}
pub fn add_subscribe(
&self,
channel: String,
user: String,
opts: &[SubscribeOption],
) -> Result<(), Box<dyn Error>> {
let mut options = SubscribeOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "subscribe".to_string(),
params: RequestKind::SubscribeRequest(SubscribeRequest {
channel,
user,
options,
}),
};
self.add(cmd)?;
Ok(())
}
pub fn add_unsubscribe(
&self,
channel: String,
user: String,
opts: &[UnsubscribeOption],
) -> Result<(), Box<dyn Error>> {
let mut options = UnsubscribeOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "unsubscribe".to_string(),
params: RequestKind::UnsubscribeRequest(UnsubscribeRequest {
channel,
user,
options,
}),
};
self.add(cmd)?;
Ok(())
}
pub fn add_disconnect(
&self,
user: String,
opts: &[DisconnectOption],
) -> Result<(), Box<dyn Error>> {
let mut options = DisconnectOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "disconnect".to_string(),
params: RequestKind::DisconnectRequest(DisconnectRequest { user, options }),
};
self.add(cmd)?;
Ok(())
}
pub fn add_presence(&self, channel: String) -> Result<(), Box<dyn Error>> {
let cmd = Command {
method: "presence".to_string(),
params: RequestKind::Value(serde_json::json!({
"channel": channel,
})),
};
self.add(cmd)?;
Ok(())
}
pub fn add_presence_stats(&self, channel: String) -> Result<(), Box<dyn Error>> {
let cmd = Command {
method: "presence_stats".to_string(),
params: RequestKind::Value(serde_json::json!({
"channel": channel,
})),
};
self.add(cmd)?;
Ok(())
}
pub fn add_history(
&self,
channel: String,
opts: &[HistoryOption],
) -> Result<(), Box<dyn Error>> {
let mut options = HistoryOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "history".to_string(),
params: RequestKind::HistoryRequest(HistoryRequest { channel, options }),
};
self.add(cmd)?;
Ok(())
}
pub fn add_history_remove(&self, channel: String) -> Result<(), Box<dyn Error>> {
let cmd = Command {
method: "history_remove".to_string(),
params: RequestKind::Value(serde_json::json!({
"channel": channel,
})),
};
self.add(cmd)?;
Ok(())
}
pub fn add_channels(&self, opts: &[ChannelsOption]) -> Result<(), Box<dyn Error>> {
let mut options = ChannelsOptions::default();
for opt in opts {
opt(&mut options);
}
let cmd = Command {
method: "channels".to_string(),
params: RequestKind::ChannelsRequest(ChannelsRequest {
pattern: options.pattern,
}),
};
self.add(cmd)?;
Ok(())
}
pub fn add_info(&self) -> Result<(), Box<dyn Error>> {
let cmd = Command {
method: "info".to_string(),
params: RequestKind::Value(serde_json::json!({})),
};
self.add(cmd)?;
Ok(())
}
}