use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use super::subscribe::MirrorQueryExecute;
use crate::topic::TopicMessageQueryData;
use crate::{
MirrorQuery,
NodeAddress,
NodeAddressBookQueryData,
TopicMessage,
};
pub type AnyMirrorQuery = MirrorQuery<AnyMirrorQueryData>;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "ffi", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "ffi", serde(rename_all = "camelCase", tag = "$type"))]
pub enum AnyMirrorQueryData {
NodeAddressBook(NodeAddressBookQueryData),
TopicMessage(TopicMessageQueryData),
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "ffi", derive(serde::Serialize))]
#[cfg_attr(feature = "ffi", serde(rename_all = "camelCase", tag = "$type"))]
pub enum AnyMirrorQueryMessage {
NodeAddressBook(NodeAddress),
TopicMessage(TopicMessage),
}
#[cfg_attr(feature = "ffi", derive(serde::Serialize))]
#[cfg_attr(feature = "ffi", serde(rename_all = "camelCase", tag = "$type"))]
pub enum AnyMirrorQueryResponse {
NodeAddressBook(<NodeAddressBookQueryData as MirrorQueryExecute>::Response),
TopicMessage(<TopicMessageQueryData as MirrorQueryExecute>::Response),
}
impl MirrorQueryExecute for AnyMirrorQueryData {
type Item = AnyMirrorQueryMessage;
type Response = AnyMirrorQueryResponse;
type ItemStream<'a> = BoxStream<'a, crate::Result<Self::Item>>
where
Self: 'a;
fn subscribe_with_optional_timeout<'a>(
&self,
params: &crate::mirror_query::MirrorQueryCommon,
client: &'a crate::Client,
timeout: Option<std::time::Duration>,
) -> Self::ItemStream<'a>
where
Self: 'a,
{
match self {
AnyMirrorQueryData::NodeAddressBook(it) => Box::pin(
it.subscribe_with_optional_timeout(params, client, timeout)
.map_ok(Self::Item::from),
),
AnyMirrorQueryData::TopicMessage(it) => Box::pin(
it.subscribe_with_optional_timeout(params, client, timeout)
.map_ok(Self::Item::from),
),
}
}
fn execute_with_optional_timeout<'a>(
&'a self,
params: &'a super::MirrorQueryCommon,
client: &'a crate::Client,
timeout: Option<std::time::Duration>,
) -> BoxFuture<'a, crate::Result<Self::Response>> {
match self {
AnyMirrorQueryData::NodeAddressBook(it) => Box::pin(async move {
it.execute_with_optional_timeout(params, client, timeout)
.await
.map(Self::Response::from)
}),
AnyMirrorQueryData::TopicMessage(it) => Box::pin(async move {
it.execute_with_optional_timeout(params, client, timeout)
.await
.map(Self::Response::from)
}),
}
}
}
#[cfg(feature = "ffi")]
#[cfg_attr(feature = "ffi", derive(serde::Serialize, serde::Deserialize))]
struct AnyMirrorQueryProxy {
#[cfg_attr(feature = "ffi", serde(flatten))]
data: AnyMirrorQueryData,
#[cfg_attr(feature = "ffi", serde(flatten))]
common: super::MirrorQueryCommon,
}
#[cfg(feature = "ffi")]
impl<D> serde::Serialize for MirrorQuery<D>
where
D: MirrorQueryExecute + Clone,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
AnyMirrorQueryProxy { data: self.data.clone().into(), common: self.common.clone() }
.serialize(serializer)
}
}
#[cfg(feature = "ffi")]
impl<'de> serde::Deserialize<'de> for AnyMirrorQuery {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
<AnyMirrorQueryProxy as serde::Deserialize>::deserialize(deserializer)
.map(|query| Self { data: query.data, common: query.common })
}
}