use crate::{AsGear, Event, Result, config::GearConfig};
use futures::{Stream, StreamExt};
use gear_core::ids::{ActorId, MessageId};
use gear_core_errors::ReplyCode;
use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError};
use sp_core::Bytes;
use std::{convert::TryInto, marker::Unpin, ops::Deref, pin::Pin, task::Poll};
use subxt::{
OnlineClient, backend::StreamOfResults, blocks::Block, events::Events as SubxtEvents,
ext::subxt_rpcs::client::RpcSubscription, utils::H256,
};
type SubxtBlock = Block<GearConfig, OnlineClient<GearConfig>>;
type BlockSubscription = StreamOfResults<SubxtBlock>;
pub struct Blocks(BlockSubscription);
impl Unpin for Blocks {}
impl Stream for Blocks {
type Item = Result<SubxtBlock>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.0.poll_next_unpin(cx));
Poll::Ready(res.map(|inner| inner.map_err(Into::into)))
}
}
impl Blocks {
pub async fn next_events(&mut self) -> Result<Option<BlockEvents>> {
let Some(next) = StreamExt::next(self).await else {
return Ok(None);
};
Ok(Some(BlockEvents::new(next?).await?))
}
}
impl From<BlockSubscription> for Blocks {
fn from(sub: BlockSubscription) -> Self {
Self(sub)
}
}
pub struct Events(Blocks);
impl Events {
pub async fn next(&mut self) -> Result<Vec<Event>> {
if let Some(es) = self.0.next_events().await? {
es.events()
} else {
Ok(Default::default())
}
}
}
impl From<BlockSubscription> for Events {
fn from(sub: BlockSubscription) -> Self {
Self(sub.into())
}
}
#[derive(Clone, Debug)]
pub struct BlockEvents {
block_hash: H256,
events: SubxtEvents<GearConfig>,
}
impl BlockEvents {
pub async fn new(block: Block<GearConfig, OnlineClient<GearConfig>>) -> Result<Self> {
Ok(Self {
block_hash: block.hash(),
events: block.events().await?,
})
}
pub fn block_hash(&self) -> H256 {
self.block_hash
}
pub fn events(&self) -> Result<Vec<Event>> {
self.events
.iter()
.map(|ev| ev?.as_gear())
.collect::<Result<Vec<_>>>()
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct ProgramStateChange {
pub block_hash: H256,
pub program_ids: Vec<H256>,
#[serde(default)]
pub ack: Option<bool>,
}
pub struct ProgramStateChanges(RpcSubscription<ProgramStateChange>);
impl ProgramStateChanges {
pub(crate) fn new(inner: RpcSubscription<ProgramStateChange>) -> Self {
Self(inner)
}
pub fn subscription_id(&self) -> Option<&str> {
self.0.subscription_id()
}
}
impl Stream for ProgramStateChanges {
type Item = Result<ProgramStateChange>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let next = futures::ready!(self.0.poll_next_unpin(cx));
match next {
Some(Ok(change)) if change.ack.unwrap_or(false) => continue,
Some(Ok(change)) => return Poll::Ready(Some(Ok(change))),
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
None => return Poll::Ready(None),
}
}
}
}
impl Unpin for ProgramStateChanges {}
#[derive(Clone, Debug, Default, Serialize)]
pub struct UserMessageSentFilter {
#[serde(skip_serializing_if = "Option::is_none")]
pub source: Option<H256>,
#[serde(skip_serializing_if = "Option::is_none")]
pub destination: Option<H256>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub payload_filters: Vec<PayloadFilter>,
#[serde(skip_serializing_if = "Option::is_none")]
pub from_block: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub finalized_only: Option<bool>,
}
#[derive(Clone, Debug, Serialize)]
pub struct PayloadFilter {
pub offset: u32,
pub pattern: Bytes,
}
impl PayloadFilter {
pub fn new(offset: u32, pattern: impl Into<Vec<u8>>) -> Self {
Self {
offset,
pattern: Bytes(pattern.into()),
}
}
}
impl UserMessageSentFilter {
pub fn new() -> Self {
Self::default()
}
fn actor_to_h256(actor: ActorId) -> H256 {
H256::from_slice(actor.as_ref())
}
pub fn with_source(mut self, source: ActorId) -> Self {
self.source = Some(Self::actor_to_h256(source));
self
}
pub fn with_destination(mut self, destination: ActorId) -> Self {
self.destination = Some(Self::actor_to_h256(destination));
self
}
pub fn with_payload_filter(mut self, offset: u32, pattern: impl Into<Vec<u8>>) -> Self {
self.payload_filters
.push(PayloadFilter::new(offset, pattern));
self
}
pub fn with_payload_prefix(self, prefix: impl Into<Vec<u8>>) -> Self {
self.with_payload_filter(0, prefix)
}
pub fn from_block(mut self, block: u64) -> Self {
self.from_block = Some(block);
self
}
pub fn finalized_only(mut self, finalized_only: bool) -> Self {
self.finalized_only = Some(finalized_only);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UserMessageSent {
pub block: H256,
pub index: u32,
pub id: MessageId,
pub source: ActorId,
pub destination: ActorId,
pub payload: Vec<u8>,
pub value: u128,
pub reply: Option<UserMessageReply>,
pub is_ack: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UserMessageReply {
pub to: MessageId,
pub code: ReplyCode,
pub code_text: Option<String>,
}
impl<'de> Deserialize<'de> for UserMessageSent {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct RawUserMessageSent {
block: H256,
index: u32,
id: [u8; 32],
source: [u8; 32],
destination: [u8; 32],
payload: Bytes,
value: String,
#[serde(default)]
reply: Option<RawReplyDetails>,
#[serde(default)]
ack: Option<bool>,
}
#[derive(Deserialize)]
struct RawReplyDetails {
to: [u8; 32],
code: Bytes,
code_description: Option<String>,
}
let raw = RawUserMessageSent::deserialize(deserializer)?;
let payload = raw.payload.0;
let reply = match raw.reply {
Some(reply) => {
let code_bytes: [u8; 4] = reply
.code
.0
.as_slice()
.try_into()
.map_err(|_| DeError::custom("invalid reply.code length"))?;
Some(UserMessageReply {
to: MessageId::from(reply.to),
code: ReplyCode::from_bytes(code_bytes),
code_text: reply.code_description,
})
}
None => None,
};
let value = raw.value.parse::<u128>().map_err(DeError::custom)?;
let is_ack = raw.ack.unwrap_or(false);
Ok(Self {
block: raw.block,
index: raw.index,
id: MessageId::from(raw.id),
source: ActorId::from(raw.source),
destination: ActorId::from(raw.destination),
payload,
value,
reply,
is_ack,
})
}
}
pub struct UserMessageSentSubscription(RpcSubscription<UserMessageSent>);
impl UserMessageSentSubscription {
pub(crate) fn new(inner: RpcSubscription<UserMessageSent>) -> Self {
Self(inner)
}
pub fn subscription_id(&self) -> Option<&str> {
self.0.subscription_id()
}
}
impl Stream for UserMessageSentSubscription {
type Item = Result<UserMessageSent>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let next = futures::ready!(self.0.poll_next_unpin(cx));
match next {
Some(Ok(message)) if message.is_ack => continue,
Some(Ok(message)) => return Poll::Ready(Some(Ok(message))),
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
None => return Poll::Ready(None),
}
}
}
}
impl Unpin for UserMessageSentSubscription {}
impl Deref for BlockEvents {
type Target = SubxtEvents<GearConfig>;
fn deref(&self) -> &Self::Target {
&self.events
}
}