otx-pool 0.1.0

The open transaction pool service core
Documentation
use crate::notify::NotifyController;
use crate::pool::OtxPool;

use otx_format::jsonrpc_types::OpenTransaction;
use otx_format::types::OpenTxStatus;
use otx_pool_plugin_protocol::{HostServiceHandler, MessageFromHost, MessageFromPlugin};

use anyhow::{anyhow, Result};
use ckb_types::core::service::Request;
use ckb_types::H256;
use crossbeam_channel::{bounded, select, Sender};

use std::sync::Arc;
use std::thread::{self, JoinHandle};

#[derive(Debug)]
pub struct HostServiceProvider {
    handler: HostServiceHandler,
    stop_handler: Sender<()>,
    _thread: Option<JoinHandle<()>>,
}

impl HostServiceProvider {
    pub fn start(
        notify_ctrl: NotifyController,
        otx_pool: Arc<OtxPool>,
    ) -> Result<HostServiceProvider, String> {
        let (sender, receiver) = bounded(5);
        let (stop_sender, stop_receiver) = bounded(1);

        let handle = thread::spawn(move || loop {
            select! {
                recv(receiver) -> request => {
                    match request {
                        Err(err) => {
                            log::warn!("ServiceProvider receive request error: {:?}", err);
                            break;
                        }
                        Ok(Request {
                            responder,
                            arguments,
                        }) => {
                            log::debug!("ServiceProvider received a request: {:?}", arguments);
                            match arguments {
                                MessageFromPlugin::DiscardOtx(_id) => {
                                    let _ = responder.send(MessageFromHost::Ok);
                                }
                                MessageFromPlugin::SentToCkb(otx_hash) => {
                                    Self::handle_sent_ckb_tx(
                                        otx_hash,
                                        notify_ctrl.clone(),
                                        otx_pool.clone(),
                                    );
                                    let _ = responder.send(MessageFromHost::Ok);
                                }
                                MessageFromPlugin::MergeOtxsAndSentToCkb((otx_hashes, tx_hash)) => {
                                    Self::handle_merge_otxs_and_sent(
                                        otx_hashes,
                                        tx_hash,
                                        notify_ctrl.clone(),
                                        otx_pool.clone(),
                                    );
                                    let _ = responder.send(MessageFromHost::Ok);
                                }
                                MessageFromPlugin::NewMergedOtx((merged_otx, otx_hashes)) => {
                                    match Self::handle_new_merged_otx(
                                        merged_otx,
                                        otx_hashes,
                                        otx_pool.clone()
                                    ) {
                                        Ok(_) => {let _ = responder.send(MessageFromHost::Ok);}
                                        Err(err) => {
                                            log::warn!("handle new merged otx error: {:?}", err);
                                            let _ = responder.send(MessageFromHost::Error(err.to_string()));
                                        }
                                    }
                                }
                                _ => unreachable!(),
                            }
                        }
                    }
                }
                recv(stop_receiver) -> request => {
                    match request {
                        Err(err) => {
                            log::warn!("ServiceProvider receive stop request error: {:?}", err);
                            break;
                        }
                        Ok(_) => {
                            log::info!("ServiceProvider received stop signal");
                            break;
                        }
                    }
                }
            }
        });

        Ok(HostServiceProvider {
            handler: sender,
            stop_handler: stop_sender,
            _thread: Some(handle),
        })
    }

    pub fn handler(&self) -> HostServiceHandler {
        self.handler.clone()
    }

    fn handle_new_merged_otx(
        new_merged_otx: OpenTransaction,
        included_otx_hashes: Vec<H256>,
        otx_pool: Arc<OtxPool>,
    ) -> Result<()> {
        let merged_otx_hash = if let Ok(hash) = new_merged_otx.get_tx_hash() {
            hash
        } else {
            return Err(anyhow!("invalid merged otx"));
        };
        log::info!(
            "handle new merged otx: {:?}, includes otxs: {:?}",
            merged_otx_hash,
            included_otx_hashes
                .iter()
                .map(|hash| hash.to_string())
                .collect::<Vec<String>>()
        );
        for otx_hash in included_otx_hashes.iter() {
            otx_pool.update_otx_status(otx_hash, OpenTxStatus::Merged(merged_otx_hash.clone()));
        }
        otx_pool.insert(new_merged_otx).expect("insert merged otx");
        Ok(())
    }

    fn handle_sent_ckb_tx(
        final_otx_hash: H256,
        notify_ctrl: NotifyController,
        otx_pool: Arc<OtxPool>,
    ) {
        let otx_hashes: Vec<H256> = otx_pool
            .get_otxs_by_merged_otx_id(&final_otx_hash)
            .iter_mut()
            .map(|otx| otx.otx.get_or_insert_otx_id().expect("get otx id"))
            .collect();
        log::info!(
            "handle sent ckb tx: {:?}, includes otxs: {:?}",
            final_otx_hash.to_string(),
            otx_hashes
                .iter()
                .map(|hash| hash.to_string())
                .collect::<Vec<String>>()
        );

        for otx_hash in otx_hashes.iter() {
            otx_pool.update_otx_status(otx_hash, OpenTxStatus::Committed(final_otx_hash.clone()));
        }
        otx_pool.update_otx_status(
            &final_otx_hash,
            OpenTxStatus::Committed(final_otx_hash.clone()),
        );
        notify_ctrl.notify_commit_open_tx(otx_hashes.clone());
        otx_pool.insert_sent_tx(final_otx_hash, otx_hashes);
    }

    fn handle_merge_otxs_and_sent(
        otx_hashes: Vec<H256>,
        tx_hash: H256,
        notify_ctrl: NotifyController,
        otx_pool: Arc<OtxPool>,
    ) {
        log::info!(
            "handle sent ckb tx: {:?}, includes otxs: {:?}",
            tx_hash.to_string(),
            otx_hashes
                .iter()
                .map(|hash| hash.to_string())
                .collect::<Vec<String>>()
        );

        for otx_hash in otx_hashes.iter() {
            otx_pool.update_otx_status(otx_hash, OpenTxStatus::Committed(tx_hash.clone()));
        }
        notify_ctrl.notify_commit_open_tx(otx_hashes.clone());
        otx_pool.insert_sent_tx(tx_hash, otx_hashes);
    }
}

impl Drop for HostServiceProvider {
    fn drop(&mut self) {
        log::info!("HostServiceProvider drop");
        let _ = self.stop_handler.try_send(());
    }
}