use std::fmt;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use super::other::{get_network_type, sync_to_tip};
use ckb_index::{with_index_db, IndexDatabase};
use ckb_sdk::{GenesisInfo, HttpRpcClient};
use ckb_types::{
core::{service::Request, HeaderView},
prelude::*,
H256,
};
use ckb_util::RwLock;
use crossbeam_channel::Sender;
use serde_derive::{Deserialize, Serialize};
pub enum IndexRequest {
Kick,
RebuildCurrentDB,
UpdateUrl(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexResponse {
Ok,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapacityResult {
pub lock_hash: H256,
pub address: Option<String>,
pub capacity: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimpleBlockInfo {
epoch: (u64, u64, u64),
number: u64,
hash: H256,
}
impl From<HeaderView> for SimpleBlockInfo {
fn from(header: HeaderView) -> SimpleBlockInfo {
let epoch = header.epoch();
SimpleBlockInfo {
number: header.number(),
epoch: (epoch.number(), epoch.index(), epoch.length()),
hash: header.hash().unpack(),
}
}
}
#[derive(Debug, Clone)]
pub enum IndexThreadState {
WaitToStart,
StartInit,
Processing(Option<SimpleBlockInfo>, u64),
Error(String),
Stopped,
}
impl IndexThreadState {
pub fn start_init(&mut self) {
*self = IndexThreadState::StartInit;
}
pub fn processing(&mut self, header: Option<HeaderView>, tip_number: u64) {
let block_info = header.map(Into::into);
*self = IndexThreadState::Processing(block_info, tip_number);
}
pub fn error(&mut self, err: String) {
*self = IndexThreadState::Error(err);
}
pub fn stop(&mut self) {
*self = IndexThreadState::Stopped;
}
pub fn get_error(&self) -> Option<String> {
match self {
IndexThreadState::Error(err) => Some(err.clone()),
_ => None,
}
}
pub fn is_started(&self) -> bool {
!matches!(self, IndexThreadState::WaitToStart)
}
pub fn is_stopped(&self) -> bool {
matches!(self, IndexThreadState::Stopped)
}
pub fn is_synced(&self) -> bool {
match self {
IndexThreadState::Processing(Some(SimpleBlockInfo { number, .. }), tip_number) => {
number == tip_number
}
_ => false,
}
}
pub fn is_error(&self) -> bool {
matches!(self, IndexThreadState::Error(_))
}
#[cfg_attr(windows, allow(dead_code))]
pub fn is_processing(&self) -> bool {
matches!(self, IndexThreadState::Processing(Some(_), _))
}
}
impl fmt::Display for IndexThreadState {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let output = match self {
IndexThreadState::WaitToStart => "Waiting for first query".to_owned(),
IndexThreadState::StartInit => "Initializing".to_owned(),
IndexThreadState::Error(err) => format!("Error: {}", err),
IndexThreadState::Processing(Some(SimpleBlockInfo { number, .. }), tip_number) => {
let status = if tip_number == number {
"synced".to_owned()
} else {
format!("tip#{}", tip_number)
};
format!("Processed block#{} ({})", number, status)
}
IndexThreadState::Processing(None, tip_number) => {
format!("Initializing (tip#{})", tip_number)
}
IndexThreadState::Stopped => "Stopped".to_owned(),
};
write!(f, "{}", output)
}
}
impl Default for IndexThreadState {
fn default() -> IndexThreadState {
IndexThreadState::WaitToStart
}
}
pub struct IndexController {
state: Arc<RwLock<IndexThreadState>>,
sender: Sender<Request<IndexRequest, IndexResponse>>,
shutdown: Arc<AtomicBool>,
}
impl Clone for IndexController {
fn clone(&self) -> IndexController {
IndexController {
state: Arc::clone(&self.state),
shutdown: Arc::clone(&self.shutdown),
sender: self.sender.clone(),
}
}
}
impl IndexController {
pub fn new(
state: Arc<RwLock<IndexThreadState>>,
sender: Sender<Request<IndexRequest, IndexResponse>>,
shutdown: Arc<AtomicBool>,
) -> IndexController {
IndexController {
state,
sender,
shutdown,
}
}
pub fn state(&self) -> &Arc<RwLock<IndexThreadState>> {
&self.state
}
pub fn sender(&self) -> &Sender<Request<IndexRequest, IndexResponse>> {
&self.sender
}
pub fn shutdown(&self) {
let start_time = Instant::now();
self.shutdown.store(true, Ordering::Relaxed);
while self.state().read().is_started() && !self.state().read().is_stopped() {
if self.state().read().is_error() {
return;
}
if start_time.elapsed() < Duration::from_secs(10) {
thread::sleep(Duration::from_millis(50));
} else {
eprintln!(
"Stop index thread timeout(state: {}), give up",
self.state().read().to_string()
);
return;
}
}
}
}
pub fn with_db<F, T>(
func: F,
rpc_client: &mut HttpRpcClient,
genesis_info: GenesisInfo,
index_dir: &Path,
index_controller: IndexController,
wait_for_sync: bool,
) -> Result<T, String>
where
F: FnOnce(IndexDatabase) -> T,
{
if wait_for_sync {
sync_to_tip(&index_controller)?;
}
let network_type = get_network_type(rpc_client)?;
let genesis_hash: H256 = genesis_info.header().hash().unpack();
with_index_db(&index_dir, genesis_hash, |backend, cf| {
let db = IndexDatabase::from_db(backend, cf, network_type, genesis_info, false)?;
Ok(func(db))
})
.map_err(|_err| {
format!(
"Index database may not ready, sync process: {}",
index_controller.state().read().to_string()
)
})
}