1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_recursion::async_recursion;
use tracing::{info, trace};
use super::signer::Signer;
use crate::errors::{ProtocolError, Result};
use crate::protocol::dh_fill_pool::DhFillPoolRequest;
use crate::types::{Asset, Blockchain, Market};
#[derive(Debug)]
pub struct State {
    
    
    
    
    pub signer: Option<Signer>,
    
    
    pub asset_nonces: Option<HashMap<String, Vec<u32>>>,
    
    pub markets: Option<HashMap<String, Market>>,
    
    pub assets: Option<Vec<Asset>>,
    
    
    pub remaining_orders: AtomicU64,
    
    pub affiliate_code: Option<String>,
    pub assets_nonces_refresh: bool,
    pub dont_sign_states: bool, 
    pub place_order_semaphore: Arc<tokio::sync::Semaphore>,
    pub sign_all_states_semaphore: Arc<tokio::sync::Semaphore>,
    pub k1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
    pub r1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
}
impl State {
    pub fn new(signer: Option<Signer>) -> Self {
        Self {
            signer,
            asset_nonces: None,
            markets: None,
            assets: None,
            remaining_orders: AtomicU64::new(0),
            affiliate_code: None,
            assets_nonces_refresh: false,
            dont_sign_states: false,
            
            place_order_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
            sign_all_states_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
            k1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
            r1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
        }
    }
    pub fn from_keys_path(keys_path: Option<&str>) -> Result<Self> {
        let signer = match keys_path {
            Some(path) => Some(Signer::new(path)?),
            None => None,
        };
        Ok(Self::new(signer))
    }
    pub fn from_keys(secret: &str, session: &str) -> Result<Self> {
        let signer = Some(Signer::from_data(secret, session)?);
        Ok(Self::new(signer))
    }
    pub fn signer(&self) -> Result<&Signer> {
        self.signer
            .as_ref()
            .ok_or(ProtocolError("Signer not initiated"))
    }
    pub fn get_market(&self, market_name: &str) -> Result<Market> {
        let market_map = self
            .markets
            .as_ref()
            .ok_or(ProtocolError("Market map does not exist"))?;
        market_map
            .get(market_name)
            .ok_or(ProtocolError("Market name does not exist"))
            .map(|m| m.clone())
    }
    pub fn get_remaining_orders(&self) -> u64 {
        return self.remaining_orders.load(Ordering::Relaxed);
    }
    pub fn set_remaining_orders(&self, n: u64) {
        return self.remaining_orders.store(n, Ordering::Relaxed);
    }
    pub fn decr_remaining_orders(&self) {
        self.remaining_orders.fetch_sub(1, Ordering::Relaxed);
    }
    
    #[async_recursion]
    pub async fn acquire_fill_pool_schedules(
        &self,
        chains: Option<&'async_recursion Vec<Blockchain>>,
        r_val_fill_pool_threshold: Option<u32>,
    ) -> Result<Vec<(DhFillPoolRequest, tokio::sync::OwnedSemaphorePermit)>> {
        let mut schedules = Vec::new();
        let mut schedules_pool_types = HashSet::new();
        let threshold = r_val_fill_pool_threshold.unwrap_or(R_VAL_FILL_POOL_THRESHOLD);
        for chain in chains.unwrap_or(Blockchain::all().as_ref()) {
            let remaining = self.signer()?.get_remaining_r_vals(chain);
            info!("{:?}: {}", chain, remaining);
            if remaining < threshold {
                let (semaphore, pool_type) = match chain {
                    Blockchain::Bitcoin => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
                    Blockchain::Ethereum => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
                    Blockchain::NEO => (&self.r1_fill_pool_semaphore, RValPoolTypes::R1),
                };
                
                if !schedules_pool_types.contains(&pool_type) {
                    if let Ok(permit) = semaphore.clone().try_acquire_owned() {
                        let fill_size = MAX_R_VAL_POOL_SIZE - remaining;
                        schedules.push((DhFillPoolRequest::new(chain.clone(), fill_size)?, permit));
                        schedules_pool_types.insert(pool_type);
                        trace!(?chain, %remaining ,%fill_size, "created fill pool request");
                    } else {
                        
                        
                        
                        
                        let _ = semaphore
                            .acquire()
                            .await
                            .expect("Who closed the semaphore?");
                        return self
                            .acquire_fill_pool_schedules(chains, r_val_fill_pool_threshold)
                            .await;
                    }
                }
            }
        }
        Ok(schedules)
    }
}
#[derive(Hash, PartialEq, Eq)]
enum RValPoolTypes {
    R1,
    K1,
}
pub const MAX_R_VAL_POOL_SIZE: u32 = 100;
pub const R_VAL_FILL_POOL_THRESHOLD: u32 = 60;