use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::{self, Debug, Display, Formatter};
use std::io::{Cursor, Read};
use std::ops::Add;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, result};
use anyhow::{anyhow, bail, ensure};
use bech32::Variant::Bech32m;
use bech32::{FromBase32, ToBase32};
use bitcoin::secp256k1;
use bitcoin_hashes::sha256;
use fedimint_core::config::{ClientConfig, FederationId};
use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::endpoint_constants::{
    AWAIT_SESSION_OUTCOME_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
};
use fedimint_core::fmt_utils::AbbreviateDebug;
use fedimint_core::module::SerdeModuleEncoding;
use fedimint_core::task::{MaybeSend, MaybeSync, RwLock, RwLockWriteGuard};
use fedimint_core::time::now;
use fedimint_core::{
    apply, async_trait_maybe_send, dyn_newtype_define, ModuleDecoderRegistry, NumPeers, OutPoint,
    PeerId, TransactionId,
};
use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API};
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use jsonrpsee_core::client::ClientT;
use jsonrpsee_core::Error as JsonRpcError;
#[cfg(target_family = "wasm")]
use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
#[cfg(not(target_family = "wasm"))]
use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
use tracing::{debug, error, instrument, trace};
use crate::backup::ClientBackupSnapshot;
use crate::core::backup::SignedBackupRequest;
use crate::core::{Decoder, OutputOutcome};
use crate::encoding::DecodeError;
use crate::endpoint_constants::{
    AWAIT_OUTPUT_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
    CLIENT_CONFIG_ENDPOINT, RECOVER_ENDPOINT, SESSION_COUNT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT,
    VERSION_ENDPOINT,
};
use crate::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
use crate::query::{
    DiscoverApiVersionSet, FilterMap, QueryStep, QueryStrategy, ThresholdConsensus,
    UnionResponsesSingle,
};
use crate::session_outcome::SessionOutcome;
use crate::task;
use crate::transaction::{SerdeTransaction, Transaction, TransactionError};
use crate::util::SafeUrl;
pub type PeerResult<T> = Result<T, PeerError>;
pub type JsonRpcResult<T> = Result<T, jsonrpsee_core::Error>;
pub type FederationResult<T> = Result<T, FederationError>;
pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
#[derive(Debug, Error)]
pub enum PeerError {
    #[error("Response deserialization error: {0}")]
    ResponseDeserialization(anyhow::Error),
    #[error("Invalid peer id: {peer_id}")]
    InvalidPeerId { peer_id: PeerId },
    #[error("Rpc error: {0}")]
    Rpc(#[from] JsonRpcError),
    #[error("Invalid response: {0}")]
    InvalidResponse(String),
}
impl PeerError {
    pub fn is_retryable(&self) -> bool {
        match self {
            PeerError::ResponseDeserialization(_) => false,
            PeerError::InvalidPeerId { peer_id: _ } => false,
            PeerError::Rpc(rpc_e) => match rpc_e {
                JsonRpcError::Transport(_) => true,
                JsonRpcError::MaxSlotsExceeded => true,
                JsonRpcError::RequestTimeout => true,
                JsonRpcError::RestartNeeded(_) => true,
                JsonRpcError::Call(e) => e.code() == 404,
                _ => false,
            },
            PeerError::InvalidResponse(_) => false,
        }
    }
}
#[derive(Debug, Error)]
pub struct FederationError {
    general: Option<anyhow::Error>,
    peers: BTreeMap<PeerId, PeerError>,
}
impl Display for FederationError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("Federation rpc error {")?;
        if let Some(general) = self.general.as_ref() {
            f.write_fmt(format_args!("general => {general})"))?;
            if !self.peers.is_empty() {
                f.write_str(", ")?;
            }
        }
        for (i, (peer, e)) in self.peers.iter().enumerate() {
            f.write_fmt(format_args!("{peer} => {e})"))?;
            if i == self.peers.len() - 1 {
                f.write_str(", ")?;
            }
        }
        f.write_str("}")?;
        Ok(())
    }
}
impl FederationError {
    pub fn general(e: impl Into<anyhow::Error>) -> FederationError {
        FederationError {
            general: Some(e.into()),
            peers: Default::default(),
        }
    }
    pub fn is_retryable(&self) -> bool {
        self.peers.iter().any(|(_, e)| e.is_retryable())
    }
}
type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
#[derive(Debug, Error)]
pub enum OutputOutcomeError {
    #[error("Response deserialization error: {0}")]
    ResponseDeserialization(anyhow::Error),
    #[error("Federation error: {0}")]
    Federation(#[from] FederationError),
    #[error("Core error: {0}")]
    Core(#[from] anyhow::Error),
    #[error("Transaction rejected: {0}")]
    Rejected(String),
    #[error("Invalid output index {out_idx}, larger than {outputs_num} in the transaction")]
    InvalidVout { out_idx: u64, outputs_num: usize },
    #[error("Timeout reached after waiting {}s", .0.as_secs())]
    Timeout(Duration),
}
#[apply(async_trait_maybe_send!)]
pub trait IFederationApi: Debug + MaybeSend + MaybeSync {
    fn all_peers(&self) -> &BTreeSet<PeerId>;
    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
    async fn request_raw(
        &self,
        peer_id: PeerId,
        method: &str,
        params: &[Value],
    ) -> result::Result<Value, jsonrpsee_core::Error>;
}
#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
pub struct ApiVersionSet {
    pub core: ApiVersion,
    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
}
#[apply(async_trait_maybe_send!)]
pub trait FederationApiExt: IFederationApi {
    async fn request_with_strategy<PeerRet: serde::de::DeserializeOwned, FedRet: Debug>(
        &self,
        mut strategy: impl QueryStrategy<PeerRet, FedRet> + MaybeSend,
        method: String,
        params: ApiRequestErased,
    ) -> FederationResult<FedRet> {
        let timeout = strategy.request_timeout();
        #[cfg(not(target_family = "wasm"))]
        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
        #[cfg(target_family = "wasm")]
        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
        let peers = self.all_peers();
        for peer_id in peers {
            futures.push(Box::pin(async {
                let request = async {
                    self.request_raw(*peer_id, &method, &[params.to_json()])
                        .await
                        .map(AbbreviateDebug)
                };
                let result = if let Some(timeout) = timeout {
                    match fedimint_core::task::timeout(timeout, request).await {
                        Ok(result) => result,
                        Err(_timeout) => Err(JsonRpcError::RequestTimeout),
                    }
                } else {
                    request.await
                };
                PeerResponse {
                    peer: *peer_id,
                    result,
                }
            }));
        }
        let mut peer_delay_ms = BTreeMap::new();
        let max_delay_ms = 1000;
        loop {
            let response = futures.next().await;
            trace!(target: LOG_CLIENT_NET_API, ?response, method, params = ?AbbreviateDebug(params.to_json()), "Received peer response");
            match response {
                Some(PeerResponse { peer, result }) => {
                    let result: PeerResult<PeerRet> =
                        result.map_err(PeerError::Rpc).and_then(|o| {
                            serde_json::from_value::<PeerRet>(o.0)
                                .map_err(|e| PeerError::ResponseDeserialization(e.into()))
                        });
                    let strategy_step = strategy.process(peer, result);
                    trace!(
                        target: LOG_CLIENT_NET_API,
                        method,
                        ?params,
                        ?strategy_step,
                        "Taking strategy step to the response after peer response"
                    );
                    match strategy_step {
                        QueryStep::Retry(peers) => {
                            for retry_peer in peers {
                                let mut delay_ms =
                                    peer_delay_ms.get(&retry_peer).copied().unwrap_or(10);
                                delay_ms = cmp::min(max_delay_ms, delay_ms * 2);
                                peer_delay_ms.insert(retry_peer, delay_ms);
                                futures.push(Box::pin({
                                    let method = &method;
                                    let params = ¶ms;
                                    async move {
                                        task::sleep(Duration::from_millis(delay_ms)).await;
                                        PeerResponse {
                                            peer: retry_peer,
                                            result: self
                                                .request_raw(
                                                    retry_peer,
                                                    method,
                                                    &[params.to_json()],
                                                )
                                                .await
                                                .map(AbbreviateDebug),
                                        }
                                    }
                                }));
                            }
                        }
                        QueryStep::Continue => {}
                        QueryStep::Failure { general, peers } => {
                            return Err(FederationError { general, peers })
                        }
                        QueryStep::Success(response) => return Ok(response),
                    }
                }
                None => {
                    panic!("Query strategy ran out of peers to query without returning a result");
                }
            }
        }
    }
    async fn request_current_consensus<Ret>(
        &self,
        method: String,
        params: ApiRequestErased,
    ) -> FederationResult<Ret>
    where
        Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
    {
        self.request_with_strategy(
            ThresholdConsensus::overcome_evil(self.all_peers().total()),
            method,
            params,
        )
        .await
    }
}
#[apply(async_trait_maybe_send!)]
impl<T: ?Sized> FederationApiExt for T where T: IFederationApi {}
pub trait IModuleFederationApi: IFederationApi {}
dyn_newtype_define! {
    #[derive(Clone)]
    pub DynModuleApi(Arc<IModuleFederationApi>)
}
pub trait IGlobalFederationApi: IFederationApi {}
dyn_newtype_define! {
    #[derive(Clone)]
    pub DynGlobalApi(Arc<IGlobalFederationApi>)
}
impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
        self.inner.as_ref()
    }
}
#[apply(async_trait_maybe_send!)]
pub trait GlobalFederationApi {
    async fn submit_transaction(
        &self,
        tx: Transaction,
    ) -> FederationResult<SerdeModuleEncoding<Result<TransactionId, TransactionError>>>;
    async fn await_block(
        &self,
        block_index: u64,
        decoders: &ModuleDecoderRegistry,
    ) -> anyhow::Result<SessionOutcome>;
    async fn session_count(&self) -> FederationResult<u64>;
    async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId>;
    async fn await_output_outcome<R>(
        &self,
        outpoint: OutPoint,
        timeout: Duration,
        module_decoder: &Decoder,
    ) -> OutputOutcomeResult<R>
    where
        R: OutputOutcome;
    async fn download_client_config(&self, info: &InviteCode) -> FederationResult<ClientConfig>;
    async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
    async fn download_backup(
        &self,
        id: &secp256k1::PublicKey,
    ) -> FederationResult<Vec<ClientBackupSnapshot>>;
    async fn discover_api_version_set(
        &self,
        client_versions: &SupportedApiVersionsSummary,
    ) -> FederationResult<ApiVersionSet>;
}
pub fn deserialize_outcome<R>(
    outcome: SerdeOutputOutcome,
    module_decoder: &Decoder,
) -> OutputOutcomeResult<R>
where
    R: OutputOutcome + MaybeSend,
{
    let dyn_outcome = outcome
        .try_into_inner_known_module_kind(module_decoder)
        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
    let source_instance = dyn_outcome.module_instance_id();
    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
        let target_type = std::any::type_name::<R>();
        OutputOutcomeError::ResponseDeserialization(anyhow!(
            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
        ))
    })
}
#[apply(async_trait_maybe_send!)]
impl<T: ?Sized> GlobalFederationApi for T
where
    T: IGlobalFederationApi + MaybeSend + MaybeSync + 'static,
{
    async fn submit_transaction(
        &self,
        tx: Transaction,
    ) -> FederationResult<SerdeModuleEncoding<Result<TransactionId, TransactionError>>> {
        self.request_current_consensus(
            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
            ApiRequestErased::new(&SerdeTransaction::from(&tx)),
        )
        .await
    }
    async fn await_block(
        &self,
        block_index: u64,
        decoders: &ModuleDecoderRegistry,
    ) -> anyhow::Result<SessionOutcome> {
        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
            ApiRequestErased::new(block_index),
        )
        .await?
        .try_into_inner(decoders)
        .map_err(|e| anyhow!(e.to_string()))
    }
    async fn session_count(&self) -> FederationResult<u64> {
        self.request_current_consensus(
            SESSION_COUNT_ENDPOINT.to_owned(),
            ApiRequestErased::default(),
        )
        .await
    }
    async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId> {
        self.request_current_consensus(
            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
            ApiRequestErased::new(txid),
        )
        .await
    }
    async fn await_output_outcome<R>(
        &self,
        outpoint: OutPoint,
        timeout: Duration,
        module_decoder: &Decoder,
    ) -> OutputOutcomeResult<R>
    where
        R: OutputOutcome,
    {
        fedimint_core::task::timeout(timeout, async move {
            let outcome: SerdeOutputOutcome = self
                .request_current_consensus(
                    AWAIT_OUTPUT_OUTCOME_ENDPOINT.to_owned(),
                    ApiRequestErased::new(outpoint),
                )
                .await
                .map_err(OutputOutcomeError::Federation)?;
            deserialize_outcome(outcome, module_decoder)
        })
        .await
        .map_err(|_| OutputOutcomeError::Timeout(timeout))?
    }
    async fn download_client_config(
        &self,
        invite_code: &InviteCode,
    ) -> FederationResult<ClientConfig> {
        let id = invite_code.federation_id();
        let qs = FilterMap::new(
            move |cfg: ClientConfig| {
                if id.0 != cfg.global.api_endpoints.consensus_hash() {
                    bail!("Guardian api endpoint map does not hash to FederationId")
                }
                Ok(cfg.global.api_endpoints)
            },
            self.all_peers().total(),
        )
        .with_request_timeout(Duration::from_secs(5));
        let api_endpoints = self
            .request_with_strategy(
                qs,
                CLIENT_CONFIG_ENDPOINT.to_owned(),
                ApiRequestErased::default(),
            )
            .await?;
        let api_endpoints = api_endpoints
            .into_iter()
            .map(|(peer, url)| (peer, url.url))
            .collect();
        WsFederationApi::new(api_endpoints)
            .request_current_consensus(
                CLIENT_CONFIG_ENDPOINT.to_owned(),
                ApiRequestErased::default(),
            )
            .await
    }
    async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash> {
        self.request_current_consensus(
            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT.to_owned(),
            ApiRequestErased::default(),
        )
        .await
    }
    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
            .await
    }
    async fn download_backup(
        &self,
        id: &secp256k1::PublicKey,
    ) -> FederationResult<Vec<ClientBackupSnapshot>> {
        Ok(self
            .request_with_strategy(
                UnionResponsesSingle::<Option<ClientBackupSnapshot>>::new(self.all_peers().total()),
                RECOVER_ENDPOINT.to_owned(),
                ApiRequestErased::new(id),
            )
            .await?
            .into_iter()
            .flatten()
            .collect())
    }
    async fn discover_api_version_set(
        &self,
        client_versions: &SupportedApiVersionsSummary,
    ) -> FederationResult<ApiVersionSet> {
        let timeout = Duration::from_secs(60);
        self.request_with_strategy(
            DiscoverApiVersionSet::new(
                self.all_peers().len(),
                now().add(timeout),
                client_versions.clone(),
            ),
            VERSION_ENDPOINT.to_owned(),
            ApiRequestErased::default(),
        )
        .await
    }
}
#[derive(Debug, Clone)]
pub struct WsFederationApi<C = WsClient> {
    peer_ids: BTreeSet<PeerId>,
    peers: Arc<Vec<FederationPeer<C>>>,
    module_id: Option<ModuleInstanceId>,
}
#[derive(Debug)]
struct FederationPeer<C> {
    url: SafeUrl,
    peer_id: PeerId,
    client: RwLock<Option<C>>,
}
#[derive(Clone, Debug, Eq, PartialEq, Encodable)]
pub struct InviteCode(Vec<InviteCodeData>);
impl Decodable for InviteCode {
    fn consensus_decode<R: Read>(
        r: &mut R,
        modules: &ModuleDecoderRegistry,
    ) -> Result<Self, DecodeError> {
        let inner: Vec<InviteCodeData> = Decodable::consensus_decode(r, modules)?;
        if !inner
            .iter()
            .any(|data| matches!(data, InviteCodeData::Api { .. }))
        {
            return Err(DecodeError::from_str(
                "No API was provided in the invite code",
            ));
        }
        if !inner
            .iter()
            .any(|data| matches!(data, InviteCodeData::FederationId(_)))
        {
            return Err(DecodeError::from_str(
                "No Federation ID provided in invite code",
            ));
        }
        Ok(InviteCode(inner))
    }
}
impl InviteCode {
    pub fn new(url: SafeUrl, peer: PeerId, federation_id: FederationId) -> Self {
        InviteCode(vec![
            InviteCodeData::Api { url, peer },
            InviteCodeData::FederationId(federation_id),
        ])
    }
    pub fn url(&self) -> SafeUrl {
        self.0
            .iter()
            .find_map(|data| match data {
                InviteCodeData::Api { url, .. } => Some(url.clone()),
                _ => None,
            })
            .expect("Ensured by constructor")
    }
    pub fn peer(&self) -> PeerId {
        self.0
            .iter()
            .find_map(|data| match data {
                InviteCodeData::Api { peer, .. } => Some(*peer),
                _ => None,
            })
            .expect("Ensured by constructor")
    }
    pub fn federation_id(&self) -> FederationId {
        self.0
            .iter()
            .find_map(|data| match data {
                InviteCodeData::FederationId(federation_id) => Some(*federation_id),
                _ => None,
            })
            .expect("Ensured by constructor")
    }
}
#[derive(Clone, Debug, Eq, PartialEq, Encodable, Decodable)]
enum InviteCodeData {
    Api {
        url: SafeUrl,
        peer: PeerId,
    },
    FederationId(FederationId),
    #[encodable_default]
    Default { variant: u64, bytes: Vec<u8> },
}
const BECH32_HRP: &str = "fed1";
impl FromStr for InviteCode {
    type Err = anyhow::Error;
    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
        let (hrp, data, variant) = bech32::decode(encoded)?;
        ensure!(hrp == BECH32_HRP, "Invalid HRP in bech32 encoding");
        ensure!(variant == Bech32m, "Expected Bech32m encoding");
        let bytes: Vec<u8> = Vec::<u8>::from_base32(&data)?;
        let invite = InviteCode::consensus_decode(&mut Cursor::new(bytes), &Default::default())?;
        Ok(invite)
    }
}
impl Display for InviteCode {
    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
        let mut data = vec![];
        self.consensus_encode(&mut data)
            .expect("Vec<u8> provides capacity");
        let encode =
            bech32::encode(BECH32_HRP, data.to_base32(), Bech32m).map_err(|_| fmt::Error)?;
        formatter.write_str(&encode)
    }
}
impl Serialize for InviteCode {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        String::serialize(&self.to_string(), serializer)
    }
}
impl<'de> Deserialize<'de> for InviteCode {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let string = Cow::<str>::deserialize(deserializer)?;
        Self::from_str(&string).map_err(serde::de::Error::custom)
    }
}
impl<C: JsonRpcClient + Debug + 'static> IGlobalFederationApi for WsFederationApi<C> {}
impl<C: JsonRpcClient + Debug + 'static> IModuleFederationApi for WsFederationApi<C> {}
#[apply(async_trait_maybe_send!)]
impl<C: JsonRpcClient + Debug + 'static> IFederationApi for WsFederationApi<C> {
    fn all_peers(&self) -> &BTreeSet<PeerId> {
        &self.peer_ids
    }
    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
        WsFederationApi {
            peer_ids: self.peer_ids.clone(),
            peers: self.peers.clone(),
            module_id: Some(id),
        }
        .into()
    }
    async fn request_raw(
        &self,
        peer_id: PeerId,
        method: &str,
        params: &[Value],
    ) -> JsonRpcResult<Value> {
        let peer = self
            .peers
            .iter()
            .find(|m| m.peer_id == peer_id)
            .ok_or_else(|| JsonRpcError::Custom(format!("Invalid peer_id: {peer_id}")))?;
        let method = match self.module_id {
            None => method.to_string(),
            Some(id) => format!("module_{id}_{method}"),
        };
        peer.request(&method, params).await
    }
}
#[apply(async_trait_maybe_send!)]
pub trait JsonRpcClient: ClientT + Sized + MaybeSend + MaybeSync {
    async fn connect(url: &SafeUrl) -> result::Result<Self, JsonRpcError>;
    fn is_connected(&self) -> bool;
}
#[apply(async_trait_maybe_send!)]
impl JsonRpcClient for WsClient {
    async fn connect(url: &SafeUrl) -> result::Result<Self, JsonRpcError> {
        #[cfg(not(target_family = "wasm"))]
        return WsClientBuilder::default()
            .use_webpki_rustls()
            .max_concurrent_requests(u16::MAX as usize)
            .build(url.as_str())
            .await;
        #[cfg(target_family = "wasm")]
        WsClientBuilder::default()
            .max_concurrent_requests(u16::MAX as usize)
            .build(url.as_str())
            .await
    }
    fn is_connected(&self) -> bool {
        self.is_connected()
    }
}
impl WsFederationApi<WsClient> {
    pub fn new(peers: Vec<(PeerId, SafeUrl)>) -> Self {
        Self::new_with_client(peers)
    }
    pub fn from_config(config: &ClientConfig) -> Self {
        Self::new(
            config
                .global
                .api_endpoints
                .iter()
                .map(|(id, peer)| (*id, peer.url.clone()))
                .collect(),
        )
    }
    pub fn from_invite_code(info: &[InviteCode]) -> Self {
        Self::new(
            info.iter()
                .enumerate()
                .map(|(id, connect)| (PeerId::from(id as u16), connect.url()))
                .collect(),
        )
    }
}
impl<C> WsFederationApi<C> {
    pub fn peers(&self) -> Vec<PeerId> {
        self.peers.iter().map(|peer| peer.peer_id).collect()
    }
    pub fn new_with_client(peers: Vec<(PeerId, SafeUrl)>) -> Self {
        WsFederationApi {
            peer_ids: peers.iter().map(|m| m.0).collect(),
            peers: Arc::new(
                peers
                    .into_iter()
                    .map(|(peer_id, url)| {
                        assert!(
                            url.port_or_known_default().is_some(),
                            "API client requires a port"
                        );
                        assert!(url.host().is_some(), "API client requires a target host");
                        FederationPeer {
                            peer_id,
                            url,
                            client: RwLock::new(None),
                        }
                    })
                    .collect(),
            ),
            module_id: None,
        }
    }
}
#[derive(Debug)]
pub struct PeerResponse<R> {
    pub peer: PeerId,
    pub result: JsonRpcResult<R>,
}
impl<C: JsonRpcClient> FederationPeer<C> {
    #[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)]
    pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> {
        let rclient = self.client.read().await;
        match &*rclient {
            Some(client) if client.is_connected() => {
                return client.request::<_, _>(method, params).await;
            }
            _ => {}
        };
        debug!("web socket not connected, reconnecting");
        drop(rclient);
        let mut wclient = self.client.write().await;
        Ok(match &*wclient {
            Some(client) if client.is_connected() => {
                let rclient = RwLockWriteGuard::downgrade(wclient);
                rclient
                    .as_ref()
                    .unwrap()
                    .request::<_, _>(method, params)
                    .await?
            }
            _ => {
                match C::connect(&self.url).await {
                    Ok(client) => {
                        *wclient = Some(client);
                        let rclient = RwLockWriteGuard::downgrade(wclient);
                        rclient
                            .as_ref()
                            .unwrap()
                            .request::<_, _>(method, params)
                            .await?
                    }
                    Err(err) => {
                        debug!(
                            target: LOG_NET_API,
                            peer_id = %self.peer_id,
                            %err, "Unable to connect to peer");
                        return Err(err)?;
                    }
                }
            }
        })
    }
}
impl<C: JsonRpcClient> WsFederationApi<C> {}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct FederationStatus {
    pub session_count: u64,
    pub status_by_peer: HashMap<PeerId, PeerStatus>,
    pub peers_online: u64,
    pub peers_offline: u64,
    pub peers_flagged: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerStatus {
    pub last_contribution: Option<u64>,
    pub connection_status: PeerConnectionStatus,
    pub flagged: bool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerConnectionStatus {
    #[default]
    Disconnected,
    Connected,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServerStatus {
    #[default]
    AwaitingPassword,
    SharingConfigGenParams,
    ReadyForConfigGen,
    ConfigGenFailed,
    VerifyingConfigs,
    VerifiedConfigs,
    ConsensusRunning,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct StatusResponse {
    pub server: ServerStatus,
    pub federation: Option<FederationStatus>,
}
#[cfg(test)]
mod tests {
    use std::collections::HashSet;
    use std::fmt;
    use std::str::FromStr;
    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
    use std::sync::Mutex;
    use std::time::Duration;
    use anyhow::anyhow;
    use jsonrpsee_core::client::BatchResponse;
    use jsonrpsee_core::params::BatchRequestBuilder;
    use jsonrpsee_core::traits::ToRpcParams;
    use once_cell::sync::Lazy;
    use serde::de::DeserializeOwned;
    use tracing::error;
    use super::*;
    type Result<T = ()> = std::result::Result<T, JsonRpcError>;
    #[apply(async_trait_maybe_send!)]
    trait SimpleClient: Sized {
        async fn connect() -> Result<Self>;
        fn is_connected(&self) -> bool {
            true
        }
        async fn request(&self, method: &str) -> Result<String>;
    }
    struct Client<C: SimpleClient>(C);
    #[apply(async_trait_maybe_send!)]
    impl<C: SimpleClient + MaybeSend + MaybeSync> JsonRpcClient for Client<C> {
        fn is_connected(&self) -> bool {
            self.0.is_connected()
        }
        async fn connect(_url: &SafeUrl) -> Result<Self> {
            Ok(Self(C::connect().await?))
        }
    }
    #[apply(async_trait_maybe_send!)]
    impl<C: SimpleClient + MaybeSend + MaybeSync> ClientT for Client<C> {
        async fn request<R, P>(&self, method: &str, _params: P) -> Result<R>
        where
            R: jsonrpsee_core::DeserializeOwned,
            P: ToRpcParams + MaybeSend,
        {
            let json = self.0.request(method).await?;
            Ok(serde_json::from_str(&json).unwrap())
        }
        async fn notification<P>(&self, _method: &str, _params: P) -> Result<()>
        where
            P: ToRpcParams + MaybeSend,
        {
            unimplemented!()
        }
        async fn batch_request<'a, R>(
            &self,
            _batch: BatchRequestBuilder<'a>,
        ) -> std::result::Result<BatchResponse<'a, R>, jsonrpsee_core::Error>
        where
            R: DeserializeOwned + fmt::Debug + 'a,
        {
            unimplemented!()
        }
    }
    fn federation_peer<C: SimpleClient + MaybeSend + MaybeSync>() -> FederationPeer<Client<C>> {
        FederationPeer {
            url: SafeUrl::parse("http://127.0.0.1").expect("Could not parse"),
            peer_id: PeerId::from(0),
            client: RwLock::new(None),
        }
    }
    #[test_log::test(tokio::test)]
    async fn test_connect() {
        static CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0);
        static CONNECTED: AtomicBool = AtomicBool::new(true);
        struct Client;
        #[apply(async_trait_maybe_send!)]
        impl SimpleClient for Client {
            async fn connect() -> Result<Self> {
                CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst);
                Ok(Client)
            }
            fn is_connected(&self) -> bool {
                CONNECTED.load(Ordering::SeqCst)
            }
            async fn request(&self, _method: &str) -> Result<String> {
                Ok("null".to_string())
            }
        }
        let fed = federation_peer::<Client>();
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            0,
            "should not connect before first request"
        );
        fed.request("", &[]).await.unwrap();
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            1,
            "should connect once after first request"
        );
        fed.request("", &[]).await.unwrap();
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            1,
            "should not connect again before disconnect"
        );
        CONNECTED.store(false, Ordering::SeqCst);
        fed.request("", &[]).await.unwrap();
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            2,
            "should connect again after disconnect"
        );
    }
    #[test_log::test(tokio::test)]
    async fn concurrent_requests() {
        static CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0);
        static FAIL: Lazy<Mutex<HashSet<usize>>> = Lazy::new(|| Mutex::new(HashSet::new()));
        struct Client(usize);
        #[apply(async_trait_maybe_send!)]
        impl SimpleClient for Client {
            async fn connect() -> Result<Self> {
                error!(target: LOG_NET_API, "connect");
                let id = CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst);
                task::sleep(Duration::from_millis(100)).await;
                if FAIL.lock().unwrap().contains(&id) {
                    Err(jsonrpsee_core::Error::Transport(anyhow!(
                        "intentional error"
                    )))
                } else {
                    Ok(Client(id))
                }
            }
            fn is_connected(&self) -> bool {
                !FAIL.lock().unwrap().contains(&self.0)
            }
            async fn request(&self, _method: &str) -> Result<String> {
                if self.is_connected() {
                    Ok("null".to_string())
                } else {
                    Err(jsonrpsee_core::Error::Transport(anyhow!(
                        "client is disconnected"
                    )))
                }
            }
        }
        let fed = federation_peer::<Client>();
        FAIL.lock().unwrap().insert(0);
        assert!(
            fed.request("", &[]).await.is_err(),
            "connect for client 0 should fail"
        );
        fed.request("", &[]).await.unwrap();
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            2,
            "should connect again after error in first connect"
        );
        FAIL.lock().unwrap().insert(1);
        let (reqa, reqb) = tokio::join!(fed.request("", &[]), fed.request("", &[]));
        reqa.expect("both request should be successful");
        reqb.expect("both request should be successful");
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            3,
            "should connect once even for two concurrent requests",
        );
        FAIL.lock().unwrap().insert(2);
        FAIL.lock().unwrap().insert(3);
        let (reqa, reqb) = tokio::join!(fed.request("", &[]), fed.request("", &[]));
        assert_eq!(
            CONNECTION_COUNT.load(Ordering::SeqCst),
            5,
            "should connect again if first concurrent request fails",
        );
        assert!(
            reqa.is_err() ^ reqb.is_err(),
            "exactly one of two request should succeed"
        );
    }
    #[test]
    fn converts_invite_code() {
        let connect = InviteCode::new(
            "ws://test1".parse().unwrap(),
            PeerId(1),
            FederationId::dummy(),
        );
        let bech32 = connect.to_string();
        let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
        assert_eq!(connect, connect_parsed);
        let json = serde_json::to_string(&connect).unwrap();
        let connect_as_string: String = serde_json::from_str(&json).unwrap();
        assert_eq!(connect_as_string, bech32);
        let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
        assert_eq!(connect_parsed_json, connect_parsed);
    }
}