use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use binary_options_tools_core_pre::error::CoreError;
use binary_options_tools_core_pre::reimports::{
bounded_async, AsyncReceiver, AsyncSender, Message,
};
use binary_options_tools_core_pre::traits::{ApiModule, Rule, RunnerCommand};
use tokio::select;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::pocketoption::error::PocketResult;
use crate::pocketoption::state::State;
use crate::traits::ValidatorTrait;
use crate::validator::Validator;
pub use crate::pocketoption::types::Outgoing;
#[derive(Debug)]
pub enum Command {
Create {
validator: Validator,
keep_alive: Option<Outgoing>,
command_id: Uuid,
},
Remove {
id: Uuid,
command_id: Uuid,
},
Send(Outgoing),
}
#[derive(Debug)]
pub enum CommandResponse {
Created {
command_id: Uuid,
id: Uuid,
stream_receiver: AsyncReceiver<Arc<Message>>,
},
Removed {
command_id: Uuid,
id: Uuid,
existed: bool,
},
}
#[derive(Clone)]
pub struct RawHandle {
sender: AsyncSender<Command>,
receiver: AsyncReceiver<CommandResponse>,
}
impl RawHandle {
pub async fn create(
&self,
validator: Validator,
keep_alive: Option<Outgoing>,
) -> PocketResult<RawHandler> {
let command_id = Uuid::new_v4();
self.sender
.send(Command::Create {
validator,
keep_alive,
command_id,
})
.await
.map_err(CoreError::from)?;
loop {
match self.receiver.recv().await {
Ok(CommandResponse::Created {
command_id: cid,
id,
stream_receiver,
}) if cid == command_id => {
return Ok(RawHandler {
id,
sender: self.sender.clone(),
receiver: stream_receiver,
});
}
Ok(_) => continue,
Err(e) => return Err(CoreError::from(e).into()),
}
}
}
pub async fn remove(&self, id: Uuid) -> PocketResult<bool> {
let command_id = Uuid::new_v4();
self.sender
.send(Command::Remove { id, command_id })
.await
.map_err(CoreError::from)?;
loop {
match self.receiver.recv().await {
Ok(CommandResponse::Removed {
command_id: cid,
id: rid,
existed,
}) if cid == command_id && rid == id => return Ok(existed),
Ok(_) => continue,
Err(e) => return Err(CoreError::from(e).into()),
}
}
}
}
pub struct RawHandler {
id: Uuid,
sender: AsyncSender<Command>,
receiver: AsyncReceiver<Arc<Message>>,
}
impl RawHandler {
pub fn id(&self) -> Uuid {
self.id
}
pub async fn send_text(&self, text: impl Into<String>) -> PocketResult<()> {
self.sender
.send(Command::Send(Outgoing::Text(text.into())))
.await
.map_err(CoreError::from)?;
Ok(())
}
pub async fn send_binary(&self, data: impl Into<Vec<u8>>) -> PocketResult<()> {
self.sender
.send(Command::Send(Outgoing::Binary(data.into())))
.await
.map_err(CoreError::from)?;
Ok(())
}
pub async fn send_and_wait(&self, msg: Outgoing) -> PocketResult<Arc<Message>> {
self.sender
.send(Command::Send(msg))
.await
.map_err(CoreError::from)?;
self.wait_next().await
}
pub async fn wait_next(&self) -> PocketResult<Arc<Message>> {
self.receiver
.recv()
.await
.map_err(CoreError::from)
.map_err(Into::into)
}
pub fn subscribe(&self) -> AsyncReceiver<Arc<Message>> {
self.receiver.clone()
}
}
impl Drop for RawHandler {
fn drop(&mut self) {
let _ = self.sender.as_sync().send(Command::Remove {
id: self.id,
command_id: Uuid::new_v4(),
});
}
}
pub struct RawApiModule {
state: Arc<State>,
command_receiver: AsyncReceiver<Command>,
command_responder: AsyncSender<CommandResponse>,
message_receiver: AsyncReceiver<Arc<Message>>,
to_ws_sender: AsyncSender<Message>,
#[allow(clippy::type_complexity)]
sinks: Arc<RwLock<HashMap<Uuid, Arc<AsyncSender<Arc<Message>>>>>>,
keep_alive_msgs: Arc<RwLock<HashMap<Uuid, Outgoing>>>,
}
pub struct RawRule {
state: Arc<State>,
}
impl Rule for RawRule {
fn call(&self, msg: &Message) -> bool {
let msg_str = match msg {
Message::Binary(bin) => String::from_utf8_lossy(bin.as_ref()).into_owned(),
Message::Text(text) => text.to_string(),
_ => return false,
};
let validators = self
.state
.raw_validators
.read()
.expect("Failed to acquire read lock");
for (_id, v) in validators.iter() {
if v.call(msg_str.as_str()) {
return true;
}
}
false
}
fn reset(&self) {
}
}
#[async_trait]
impl ApiModule<State> for RawApiModule {
type Command = Command;
type CommandResponse = CommandResponse;
type Handle = RawHandle;
fn new(
shared_state: Arc<State>,
command_receiver: AsyncReceiver<Self::Command>,
command_responder: AsyncSender<Self::CommandResponse>,
message_receiver: AsyncReceiver<Arc<Message>>,
to_ws_sender: AsyncSender<Message>,
_: AsyncSender<RunnerCommand>,
) -> Self {
Self {
state: shared_state,
command_receiver,
command_responder,
message_receiver,
to_ws_sender,
sinks: Arc::new(RwLock::new(HashMap::new())),
keep_alive_msgs: Arc::new(RwLock::new(HashMap::new())),
}
}
fn create_handle(
sender: AsyncSender<Self::Command>,
receiver: AsyncReceiver<Self::CommandResponse>,
) -> Self::Handle {
RawHandle { sender, receiver }
}
async fn run(&mut self) -> binary_options_tools_core_pre::error::CoreResult<()> {
loop {
select! {
Ok(cmd) = self.command_receiver.recv() => {
match cmd {
Command::Create { validator, keep_alive, command_id } => {
let id = Uuid::new_v4();
self.state.add_raw_validator(id, validator);
if let Some(msg) = keep_alive.clone() {
self.keep_alive_msgs.write().await.insert(id, msg);
}
let (tx, rx) = bounded_async(64);
self.sinks.write().await.insert(id, Arc::new(tx));
self.command_responder.send(CommandResponse::Created { command_id, id, stream_receiver: rx }).await?;
}
Command::Remove { id, command_id } => {
let existed_state = self.state.remove_raw_validator(&id);
let existed_sink = self.sinks.write().await.remove(&id).is_some();
self.keep_alive_msgs.write().await.remove(&id);
self.command_responder.send(CommandResponse::Removed { command_id, id, existed: existed_state || existed_sink }).await?;
}
Command::Send(Outgoing::Text(text)) => {
self.to_ws_sender.send(Message::text(text)).await.map_err(CoreError::from)?;
}
Command::Send(Outgoing::Binary(data)) => {
self.to_ws_sender.send(Message::binary(data)).await.map_err(CoreError::from)?;
}
}
},
Ok(msg) = self.message_receiver.recv() => {
let content = match msg.as_ref() {
Message::Binary(bin) => String::from_utf8_lossy(bin.as_ref()).into_owned(),
Message::Text(t) => t.to_string(),
_ => String::new(),
};
if content.is_empty() { continue; }
let mut targets = Vec::new();
{
let validators = self.state.raw_validators.read().expect("Failed to acquire read lock");
for (id, validator) in validators.iter() {
if validator.call(content.as_str()) {
targets.push(*id);
}
}
}
if !targets.is_empty() {
let sinks = self.sinks.read().await;
for id in targets {
if let Some(tx) = sinks.get(&id) {
let _ = tx.send(msg.clone()).await; }
}
}
}
}
}
}
fn rule(state: Arc<State>) -> Box<dyn Rule + Send + Sync> {
Box::new(RawRule { state })
}
fn callback(
shared_state: Arc<State>,
_command_receiver: AsyncReceiver<Self::Command>,
_command_responder: AsyncSender<Self::CommandResponse>,
_message_receiver: AsyncReceiver<Arc<Message>>,
_to_ws_sender: AsyncSender<Message>,
) -> binary_options_tools_core_pre::error::CoreResult<
Option<Box<dyn binary_options_tools_core_pre::traits::ReconnectCallback<State>>>,
> {
struct CB {
msgs: Arc<RwLock<HashMap<Uuid, Outgoing>>>,
}
#[async_trait]
impl binary_options_tools_core_pre::traits::ReconnectCallback<State> for CB {
async fn call(
&self,
_state: Arc<State>,
ws_sender: &AsyncSender<Message>,
) -> binary_options_tools_core_pre::error::CoreResult<()> {
let msgs = self.msgs.read().await.clone();
for (_id, msg) in msgs.into_iter() {
match msg {
Outgoing::Text(t) => {
let _ = ws_sender.send(Message::text(t)).await;
}
Outgoing::Binary(b) => {
let _ = ws_sender.send(Message::binary(b)).await;
}
}
}
Ok(())
}
}
Ok(Some(Box::new(CB {
msgs: shared_state.raw_keep_alive.clone(),
})))
}
}