eas_sdk_rs/eas/
timestamp.rs

1use std::sync::Arc;
2
3use crate::contracts::eas::TimestampedFilter;
4use ethers::{
5    contract::{EthLogDecode, Event},
6    providers::{Middleware, PendingTransaction, PubsubClient},
7    types::{BlockNumber, H256},
8};
9use futures::{stream::BoxStream, Stream, StreamExt};
10use tracing::debug;
11
12use crate::transaction::{Error as TxError, PendingTx};
13
14use super::{Error, EAS};
15
16pub type Timestamp = u64;
17
18#[derive(Debug)]
19pub struct Timestamped {
20    pub data: H256,
21    pub timestamp: u64,
22}
23
24impl From<TimestampedFilter> for Timestamped {
25    fn from(event: TimestampedFilter) -> Self {
26        Self {
27            data: H256::from_slice(&event.data),
28            timestamp: event.timestamp,
29        }
30    }
31}
32
33impl<M: Middleware + 'static> EAS<M> {
34    pub async fn get_timestamped(&self, data: H256) -> Result<Timestamp, Error<M>> {
35        let binding = self.contract.get_timestamp(data.into());
36        let timestamp = binding.call().await?;
37        Ok(timestamp)
38    }
39
40    pub async fn timestamp(&self, data: H256) -> Result<PendingTx<Timestamped>, Error<M>> {
41        let binding = self.contract.timestamp(data.into());
42
43        let tx = binding.send().await?;
44        debug!("Transaction sent: {:?}", tx.tx_hash());
45
46        // PendingTransaction is not copy and it is dropped there, but it is recreated from tx_hash and provider
47        let tx_hash = tx.tx_hash();
48        let provider = self.contract.client().clone();
49
50        let future = async move {
51            let tx = PendingTransaction::new(tx_hash, provider.provider());
52            let receipt = tx.await?;
53            let receipt = receipt.ok_or_else(|| TxError::TransactionDropped(tx_hash))?;
54
55            debug!("Receipt: {:?}", receipt);
56
57            let log = receipt.logs.first().ok_or_else(|| TxError::MissingLogs)?;
58
59            let timestamped = TimestampedFilter::decode_log(&log.clone().into())?;
60
61            Ok(timestamped.into())
62        };
63
64        Ok(PendingTx::new(future))
65    }
66
67    pub async fn multi_timestamp(
68        &self,
69        data: Vec<H256>,
70    ) -> Result<PendingTx<Vec<Result<Timestamped, TxError>>>, Error<M>> {
71        let binding = self
72            .contract
73            .multi_timestamp(data.into_iter().map(|d| d.into()).collect());
74
75        let tx = binding.send().await?;
76        debug!("Transaction sent: {:?}", tx.tx_hash());
77
78        // PendingTransaction is not copy and it is dropped there, but it is recreated from tx_hash and provider
79        let tx_hash = tx.tx_hash();
80        let provider = self.contract.client().clone();
81
82        let future = async move {
83            let tx = PendingTransaction::new(tx_hash, provider.provider());
84            let receipt = tx.await?;
85            let receipt = receipt.ok_or_else(|| TxError::TransactionDropped(tx_hash))?;
86
87            debug!("Receipt: {:?}", receipt);
88
89            let timestamped: Vec<Result<Timestamped, TxError>> = receipt
90                .logs
91                .iter()
92                .map(|log| {
93                    TimestampedFilter::decode_log(&log.clone().into())
94                        .map(Into::into)
95                        .map_err(TxError::from)
96                })
97                .collect();
98
99            Ok(timestamped)
100        };
101
102        Ok(PendingTx::new(future))
103    }
104
105    pub fn timestamped_event<T>(
106        &self,
107        start_block: T,
108        end_block: Option<T>,
109        uids: Option<Vec<H256>>,
110    ) -> TimestampedEvent<M>
111    where
112        T: Into<BlockNumber>,
113    {
114        let mut event = self.contract.timestamped_filter().from_block(start_block);
115
116        if let Some(end_block) = end_block {
117            event = event.to_block(end_block);
118        }
119
120        if let Some(uids) = uids {
121            event = event.topic0(uids);
122        }
123
124        TimestampedEvent::new(event)
125    }
126}
127
128pub struct TimestampedEvent<M: Middleware + 'static> {
129    inner: Event<Arc<M>, M, TimestampedFilter>,
130}
131
132impl<M: Middleware> TimestampedEvent<M> {
133    pub fn new(filter: Event<Arc<M>, M, TimestampedFilter>) -> Self {
134        Self { inner: filter }
135    }
136
137    pub async fn stream(&self) -> Result<BoxStream<Result<Timestamped, Error<M>>>, Error<M>> {
138        let stream = self
139            .inner
140            .stream()
141            .await
142            .map_err(Error::ContractError)?
143            .map(|e| Ok(Timestamped::from(e?)));
144        Ok(stream.boxed())
145    }
146
147    pub async fn query(&self) -> Result<Vec<Timestamped>, Error<M>> {
148        let logs = self
149            .inner
150            .query()
151            .await
152            .map_err(Error::ContractError)?
153            .iter()
154            .map(|e| Timestamped::from(e.clone()))
155            .collect();
156
157        Ok(logs)
158    }
159}
160
161impl<M: Middleware> TimestampedEvent<M>
162where
163    M::Provider: PubsubClient,
164{
165    pub async fn subscribe(
166        &self,
167    ) -> Result<impl Stream<Item = Result<Timestamped, Error<M>>> + '_, Error<M>> {
168        let stream = self
169            .inner
170            .subscribe()
171            .await
172            .map_err(Error::ContractError)?
173            .map(|e| Ok(Timestamped::from(e?)));
174        Ok(stream)
175    }
176}