use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_recursion::async_recursion;
use tracing::{info, trace};
use super::signer::Signer;
use crate::errors::{ProtocolError, Result};
use crate::protocol::dh_fill_pool::DhFillPoolRequest;
use crate::types::{Asset, Blockchain, Market};
#[derive(Debug)]
pub struct State {
pub signer: Option<Signer>,
pub asset_nonces: Option<HashMap<String, Vec<u32>>>,
pub markets: Option<HashMap<String, Market>>,
pub assets: Option<Vec<Asset>>,
pub remaining_orders: AtomicU64,
pub affiliate_code: Option<String>,
pub assets_nonces_refresh: bool,
pub dont_sign_states: bool,
pub place_order_semaphore: Arc<tokio::sync::Semaphore>,
pub sign_all_states_semaphore: Arc<tokio::sync::Semaphore>,
pub k1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
pub r1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
}
impl State {
pub fn new(signer: Option<Signer>) -> Self {
Self {
signer,
asset_nonces: None,
markets: None,
assets: None,
remaining_orders: AtomicU64::new(0),
affiliate_code: None,
assets_nonces_refresh: false,
dont_sign_states: false,
place_order_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
sign_all_states_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
k1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
r1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
}
}
pub fn from_keys_path(keys_path: Option<&str>) -> Result<Self> {
let signer = match keys_path {
Some(path) => Some(Signer::new(path)?),
None => None,
};
Ok(Self::new(signer))
}
pub fn from_keys(secret: &str, session: &str) -> Result<Self> {
let signer = Some(Signer::from_data(secret, session)?);
Ok(Self::new(signer))
}
pub fn signer(&self) -> Result<&Signer> {
self.signer
.as_ref()
.ok_or(ProtocolError("Signer not initiated"))
}
pub fn get_market(&self, market_name: &str) -> Result<Market> {
let market_map = self
.markets
.as_ref()
.ok_or(ProtocolError("Market map does not exist"))?;
market_map
.get(market_name)
.ok_or(ProtocolError("Market name does not exist"))
.map(|m| m.clone())
}
pub fn get_remaining_orders(&self) -> u64 {
return self.remaining_orders.load(Ordering::Relaxed);
}
pub fn set_remaining_orders(&self, n: u64) {
return self.remaining_orders.store(n, Ordering::Relaxed);
}
pub fn decr_remaining_orders(&self) {
self.remaining_orders.fetch_sub(1, Ordering::Relaxed);
}
#[async_recursion]
pub async fn acquire_fill_pool_schedules(
&self,
chains: Option<&'async_recursion Vec<Blockchain>>,
r_val_fill_pool_threshold: Option<u32>,
) -> Result<Vec<(DhFillPoolRequest, tokio::sync::OwnedSemaphorePermit)>> {
let mut schedules = Vec::new();
let mut schedules_pool_types = HashSet::new();
let threshold = r_val_fill_pool_threshold.unwrap_or(R_VAL_FILL_POOL_THRESHOLD);
for chain in chains.unwrap_or(Blockchain::all().as_ref()) {
let remaining = self.signer()?.get_remaining_r_vals(chain);
info!("{:?}: {}", chain, remaining);
if remaining < threshold {
let (semaphore, pool_type) = match chain {
Blockchain::Bitcoin => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
Blockchain::Ethereum => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
Blockchain::NEO => (&self.r1_fill_pool_semaphore, RValPoolTypes::R1),
};
if !schedules_pool_types.contains(&pool_type) {
if let Ok(permit) = semaphore.clone().try_acquire_owned() {
let fill_size = MAX_R_VAL_POOL_SIZE - remaining;
schedules.push((DhFillPoolRequest::new(chain.clone(), fill_size)?, permit));
schedules_pool_types.insert(pool_type);
trace!(?chain, %remaining ,%fill_size, "created fill pool request");
} else {
let _ = semaphore
.acquire()
.await
.expect("Who closed the semaphore?");
return self
.acquire_fill_pool_schedules(chains, r_val_fill_pool_threshold)
.await;
}
}
}
}
Ok(schedules)
}
}
#[derive(Hash, PartialEq, Eq)]
enum RValPoolTypes {
R1,
K1,
}
pub const MAX_R_VAL_POOL_SIZE: u32 = 100;
pub const R_VAL_FILL_POOL_THRESHOLD: u32 = 60;