use core::mem;
use ibc_proto::google::protobuf::Any;
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::Height;
use prost::Message;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tendermint_rpc::HttpClient;
use tracing::debug;
use crate::chain::cosmos::encode::encoded_tx_metrics;
use crate::chain::cosmos::gas::gas_amount_to_fee;
use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry;
use crate::chain::cosmos::types::account::Account;
use crate::chain::cosmos::types::config::TxConfig;
use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult};
use crate::chain::cosmos::wait::wait_for_block_commits;
use crate::config::types::Memo;
use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::keyring::Secp256k1KeyPair;
pub async fn send_batched_messages_and_wait_commit(
    rpc_client: &HttpClient,
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &mut Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<IbcEventWithHeight>, Error> {
    if messages.is_empty() {
        return Ok(Vec::new());
    }
    let mut tx_sync_results =
        send_messages_as_batches(rpc_client, config, key_pair, account, tx_memo, messages).await?;
    wait_for_block_commits(
        &config.chain_id,
        rpc_client,
        &config.rpc_address,
        &config.rpc_timeout,
        &mut tx_sync_results,
    )
    .await?;
    let events = tx_sync_results
        .into_iter()
        .flat_map(|el| el.events)
        .collect();
    Ok(events)
}
pub async fn sequential_send_batched_messages_and_wait_commit(
    rpc_client: &HttpClient,
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &mut Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<IbcEventWithHeight>, Error> {
    if messages.is_empty() {
        return Ok(Vec::new());
    }
    let tx_sync_results = sequential_send_messages_as_batches(
        rpc_client, config, key_pair, account, tx_memo, messages,
    )
    .await?;
    let events = tx_sync_results
        .into_iter()
        .flat_map(|el| el.events)
        .collect();
    Ok(events)
}
pub async fn send_batched_messages_and_wait_check_tx(
    rpc_client: &HttpClient,
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &mut Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<Response>, Error> {
    if messages.is_empty() {
        return Ok(Vec::new());
    }
    let batches = batch_messages(config, key_pair, account, tx_memo, messages)?;
    let mut responses = Vec::new();
    for batch in batches {
        let response = send_tx_with_account_sequence_retry(
            rpc_client, config, key_pair, account, tx_memo, &batch,
        )
        .await?;
        responses.push(response);
    }
    Ok(responses)
}
async fn send_messages_as_batches(
    rpc_client: &HttpClient,
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &mut Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<TxSyncResult>, Error> {
    if messages.is_empty() {
        return Ok(Vec::new());
    }
    let message_count = messages.len();
    let batches = batch_messages(config, key_pair, account, tx_memo, messages)?;
    debug!(
        "sending {} messages as {} batches to chain {} in parallel",
        message_count,
        batches.len(),
        config.chain_id
    );
    let mut tx_sync_results = Vec::new();
    for batch in batches {
        let message_count = batch.len();
        let response = send_tx_with_account_sequence_retry(
            rpc_client, config, key_pair, account, tx_memo, &batch,
        )
        .await?;
        let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);
        tx_sync_results.push(tx_sync_result);
    }
    Ok(tx_sync_results)
}
async fn sequential_send_messages_as_batches(
    rpc_client: &HttpClient,
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &mut Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<TxSyncResult>, Error> {
    if messages.is_empty() {
        return Ok(Vec::new());
    }
    let message_count = messages.len();
    let batches = batch_messages(config, key_pair, account, tx_memo, messages)?;
    debug!(
        "sending {} messages as {} batches to chain {} in serial",
        message_count,
        batches.len(),
        config.chain_id
    );
    let mut tx_sync_results = Vec::new();
    for batch in batches {
        let message_count = batch.len();
        let response = send_tx_with_account_sequence_retry(
            rpc_client, config, key_pair, account, tx_memo, &batch,
        )
        .await?;
        let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);
        tx_sync_results.push(tx_sync_result);
        wait_for_block_commits(
            &config.chain_id,
            rpc_client,
            &config.rpc_address,
            &config.rpc_timeout,
            &mut tx_sync_results,
        )
        .await?;
    }
    Ok(tx_sync_results)
}
fn response_to_tx_sync_result(
    chain_id: &ChainId,
    message_count: usize,
    response: Response,
) -> TxSyncResult {
    if response.code.is_err() {
        let height = Height::new(chain_id.version(), 1).unwrap();
        let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!(
            "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
            chain_id, response.hash, response.code, response.log
        )), height); message_count];
        TxSyncResult {
            response,
            events: events_per_tx,
            status: TxStatus::ReceivedResponse,
        }
    } else {
        TxSyncResult {
            response,
            events: Vec::new(),
            status: TxStatus::Pending { message_count },
        }
    }
}
fn batch_messages(
    config: &TxConfig,
    key_pair: &Secp256k1KeyPair,
    account: &Account,
    tx_memo: &Memo,
    messages: Vec<Any>,
) -> Result<Vec<Vec<Any>>, Error> {
    let max_message_count = config.max_msg_num.to_usize();
    let max_tx_size = config.max_tx_size.into();
    let mut batches = vec![];
    let max_fee = gas_amount_to_fee(&config.gas_config, config.gas_config.max_gas);
    let tx_metrics = encoded_tx_metrics(config, key_pair, account, tx_memo, &[], &max_fee)?;
    let tx_envelope_len = tx_metrics.envelope_len;
    let empty_body_len = tx_metrics.body_bytes_len;
    fn tx_len(envelope_len: usize, body_len: usize) -> usize {
        debug_assert!(body_len != 0);
        envelope_len + 1 + prost::length_delimiter_len(body_len) + body_len
    }
    let mut current_count = 0;
    let mut current_len = empty_body_len;
    let mut current_batch = vec![];
    for message in messages {
        let message_len = message.encoded_len();
        let tagged_len = 1 + prost::length_delimiter_len(message_len) + message_len;
        if current_count >= max_message_count
            || tx_len(tx_envelope_len, current_len + tagged_len) > max_tx_size
        {
            let insert_batch = mem::take(&mut current_batch);
            if insert_batch.is_empty() {
                assert!(max_message_count != 0);
                return Err(Error::message_too_big_for_tx(message_len));
            }
            batches.push(insert_batch);
            current_count = 0;
            current_len = empty_body_len;
        }
        current_count += 1;
        current_len += tagged_len;
        current_batch.push(message);
    }
    if !current_batch.is_empty() {
        batches.push(current_batch);
    }
    Ok(batches)
}
#[allow(clippy::redundant_clone)]
#[cfg(test)]
mod tests {
    use super::batch_messages;
    use crate::chain::cosmos::encode::sign_and_encode_tx;
    use crate::chain::cosmos::gas::gas_amount_to_fee;
    use crate::chain::cosmos::types::account::{
        Account, AccountAddress, AccountNumber, AccountSequence,
    };
    use crate::chain::cosmos::types::config::TxConfig;
    use crate::config;
    use crate::config::types::{MaxMsgNum, MaxTxSize, Memo};
    use crate::keyring::{self, KeyRing, Secp256k1KeyPair, SigningKeyPair};
    use ibc_proto::google::protobuf::Any;
    use ibc_relayer_types::core::ics24_host::identifier::ChainId;
    use std::fs;
    const COSMOS_HD_PATH: &str = "m/44'/118'/0'/0/0";
    fn test_fixture() -> (TxConfig, Secp256k1KeyPair, Account) {
        let path = concat!(
            env!("CARGO_MANIFEST_DIR"),
            "/tests/config/fixtures/relayer_conf_example.toml"
        );
        let config = config::load(path).expect("could not parse config");
        let chain_id = ChainId::from_string("chain_A");
        let chain_config = config.find_chain(&chain_id).unwrap();
        let tx_config = TxConfig::try_from(chain_config).expect("could not obtain tx config");
        let path = concat!(
            env!("CARGO_MANIFEST_DIR"),
            "/tests/config/fixtures/relayer-seed.json"
        );
        let seed_file_content = fs::read_to_string(path).unwrap();
        let _keyring = KeyRing::new_secp256k1(
            keyring::Store::Memory,
            "cosmos",
            &chain_id,
            &chain_config.key_store_folder,
        )
        .unwrap();
        let hd_path = COSMOS_HD_PATH.parse().unwrap();
        let key_pair = Secp256k1KeyPair::from_seed_file(&seed_file_content, &hd_path).unwrap();
        let account = Account {
            address: AccountAddress::new("".to_owned()),
            number: AccountNumber::new(0),
            sequence: AccountSequence::new(0),
        };
        (tx_config, key_pair, account)
    }
    #[test]
    fn batch_does_not_exceed_max_tx_size() {
        let (config, key_pair, account) = test_fixture();
        let max_fee = gas_amount_to_fee(&config.gas_config, config.gas_config.max_gas);
        let mut messages = vec![Any {
            type_url: "/example.Baz".into(),
            value: vec![0; 2],
        }];
        let memo = Memo::new("").unwrap();
        for n in 0..100 {
            messages.insert(
                messages.len() - 1,
                Any {
                    type_url: "/example.Foo".into(),
                    value: vec![0; n],
                },
            );
            let expected_batch_len = messages.len() - 1;
            let tx_bytes = sign_and_encode_tx(
                &config,
                &key_pair,
                &account,
                &memo,
                &messages[..expected_batch_len],
                &max_fee,
            )
            .unwrap();
            let max_tx_size = MaxTxSize::new(tx_bytes.len()).unwrap();
            let mut limited_config = config.clone();
            limited_config.max_msg_num = MaxMsgNum::new(100).unwrap();
            limited_config.max_tx_size = max_tx_size;
            let batches = batch_messages(
                &limited_config,
                &key_pair,
                &account,
                &memo,
                messages.clone(),
            )
            .unwrap();
            assert_eq!(batches.len(), 2);
            assert_eq!(batches[0].len(), expected_batch_len);
            let tx_bytes =
                sign_and_encode_tx(&config, &key_pair, &account, &memo, &batches[0], &max_fee)
                    .unwrap();
            assert_eq!(tx_bytes.len(), max_tx_size.to_usize());
            assert_eq!(batches[1].len(), 1);
            assert_eq!(batches[1][0].type_url, "/example.Baz");
            assert_eq!(batches[1][0].value.len(), 2);
        }
    }
    #[test]
    fn batch_error_on_oversized_message() {
        const MAX_TX_SIZE: usize = 203;
        let (config, key_pair, account) = test_fixture();
        let messages = vec![Any {
            type_url: "/example.Foo".into(),
            value: vec![0; 6],
        }];
        let memo = Memo::new("example").unwrap();
        let mut limited_config = config.clone();
        limited_config.max_msg_num = MaxMsgNum::default();
        limited_config.max_tx_size = MaxTxSize::new(MAX_TX_SIZE).unwrap();
        let batches = batch_messages(
            &limited_config,
            &key_pair,
            &account,
            &memo,
            messages.clone(),
        )
        .unwrap();
        assert_eq!(batches.len(), 1);
        assert_eq!(batches[0].len(), 1);
        let max_fee = gas_amount_to_fee(&config.gas_config, config.gas_config.max_gas);
        let tx_bytes =
            sign_and_encode_tx(&config, &key_pair, &account, &memo, &batches[0], &max_fee).unwrap();
        assert_eq!(tx_bytes.len(), MAX_TX_SIZE);
        limited_config.max_tx_size = MaxTxSize::new(MAX_TX_SIZE - 1).unwrap();
        let res = batch_messages(&limited_config, &key_pair, &account, &memo, messages);
        assert!(res.is_err());
    }
    #[test]
    fn test_batches_are_structured_appropriately_per_max_msg_num() {
        let (config, key_pair, account) = test_fixture();
        let messages = vec![
            Any {
                type_url: "/example.Foo".into(),
                value: vec![0; 6],
            },
            Any {
                type_url: "/example.Bar".into(),
                value: vec![0; 4],
            },
            Any {
                type_url: "/example.Baz".into(),
                value: vec![0; 2],
            },
            Any {
                type_url: "/example.Bux".into(),
                value: vec![0; 3],
            },
            Any {
                type_url: "/example.Qux".into(),
                value: vec![0; 5],
            },
        ];
        let mut limited_config = config;
        limited_config.max_msg_num = MaxMsgNum::new(1).unwrap();
        limited_config.max_tx_size = MaxTxSize::default();
        let batches = batch_messages(
            &limited_config,
            &key_pair,
            &account,
            &Memo::new("").unwrap(),
            messages.clone(),
        )
        .unwrap();
        assert_eq!(batches.len(), 5);
        for batch in batches {
            assert_eq!(batch.len(), 1);
        }
        limited_config.max_msg_num = MaxMsgNum::new(100).unwrap();
        let batches = batch_messages(
            &limited_config,
            &key_pair,
            &account,
            &Memo::new("").unwrap(),
            messages,
        )
        .unwrap();
        assert_eq!(batches.len(), 1);
        assert_eq!(batches[0].len(), 5);
    }
    #[test]
    fn test_batches_are_structured_appropriately_per_max_tx_size() {
        const MAX_TX_SIZE: usize = 198;
        let (config, key_pair, account) = test_fixture();
        let messages = vec![
            Any {
                type_url: "/example.Foo".into(),
                value: vec![0; 10],
            },
            Any {
                type_url: "/example.Bar".into(),
                value: vec![0; 10],
            },
            Any {
                type_url: "/example.Baz".into(),
                value: vec![0; 10],
            },
            Any {
                type_url: "/example.Bux".into(),
                value: vec![0; 10],
            },
            Any {
                type_url: "/example.Qux".into(),
                value: vec![0; 10],
            },
        ];
        let memo = Memo::new("").unwrap();
        let mut limited_config = config.clone();
        limited_config.max_msg_num = MaxMsgNum::default();
        limited_config.max_tx_size = MaxTxSize::new(MAX_TX_SIZE).unwrap();
        let batches = batch_messages(
            &limited_config,
            &key_pair,
            &account,
            &memo,
            messages.clone(),
        )
        .unwrap();
        assert_eq!(batches.len(), 5);
        let max_fee = gas_amount_to_fee(&config.gas_config, config.gas_config.max_gas);
        for batch in batches {
            assert_eq!(batch.len(), 1);
            let tx_bytes =
                sign_and_encode_tx(&config, &key_pair, &account, &memo, &batch, &max_fee).unwrap();
            assert_eq!(tx_bytes.len(), MAX_TX_SIZE);
        }
        limited_config.max_tx_size = MaxTxSize::max();
        let batches = batch_messages(
            &limited_config,
            &key_pair,
            &account,
            &Memo::new("").unwrap(),
            messages,
        )
        .unwrap();
        assert_eq!(batches.len(), 1);
        assert_eq!(batches[0].len(), 5);
    }
    #[test]
    #[should_panic(expected = "`max_msg_num` must be greater than or equal to 1, found 0")]
    fn test_max_msg_num_of_zero_panics() {
        let (mut config, key_pair, account) = test_fixture();
        config.max_msg_num = MaxMsgNum::new(0).unwrap();
        let _batches = batch_messages(
            &config,
            &key_pair,
            &account,
            &Memo::new("").unwrap(),
            vec![],
        );
    }
}