eas_sdk_rs/eas/
timestamp.rs1use std::sync::Arc;
2
3use crate::contracts::eas::TimestampedFilter;
4use ethers::{
5 contract::{EthLogDecode, Event},
6 providers::{Middleware, PendingTransaction, PubsubClient},
7 types::{BlockNumber, H256},
8};
9use futures::{stream::BoxStream, Stream, StreamExt};
10use tracing::debug;
11
12use crate::transaction::{Error as TxError, PendingTx};
13
14use super::{Error, EAS};
15
16pub type Timestamp = u64;
17
18#[derive(Debug)]
19pub struct Timestamped {
20 pub data: H256,
21 pub timestamp: u64,
22}
23
24impl From<TimestampedFilter> for Timestamped {
25 fn from(event: TimestampedFilter) -> Self {
26 Self {
27 data: H256::from_slice(&event.data),
28 timestamp: event.timestamp,
29 }
30 }
31}
32
33impl<M: Middleware + 'static> EAS<M> {
34 pub async fn get_timestamped(&self, data: H256) -> Result<Timestamp, Error<M>> {
35 let binding = self.contract.get_timestamp(data.into());
36 let timestamp = binding.call().await?;
37 Ok(timestamp)
38 }
39
40 pub async fn timestamp(&self, data: H256) -> Result<PendingTx<Timestamped>, Error<M>> {
41 let binding = self.contract.timestamp(data.into());
42
43 let tx = binding.send().await?;
44 debug!("Transaction sent: {:?}", tx.tx_hash());
45
46 let tx_hash = tx.tx_hash();
48 let provider = self.contract.client().clone();
49
50 let future = async move {
51 let tx = PendingTransaction::new(tx_hash, provider.provider());
52 let receipt = tx.await?;
53 let receipt = receipt.ok_or_else(|| TxError::TransactionDropped(tx_hash))?;
54
55 debug!("Receipt: {:?}", receipt);
56
57 let log = receipt.logs.first().ok_or_else(|| TxError::MissingLogs)?;
58
59 let timestamped = TimestampedFilter::decode_log(&log.clone().into())?;
60
61 Ok(timestamped.into())
62 };
63
64 Ok(PendingTx::new(future))
65 }
66
67 pub async fn multi_timestamp(
68 &self,
69 data: Vec<H256>,
70 ) -> Result<PendingTx<Vec<Result<Timestamped, TxError>>>, Error<M>> {
71 let binding = self
72 .contract
73 .multi_timestamp(data.into_iter().map(|d| d.into()).collect());
74
75 let tx = binding.send().await?;
76 debug!("Transaction sent: {:?}", tx.tx_hash());
77
78 let tx_hash = tx.tx_hash();
80 let provider = self.contract.client().clone();
81
82 let future = async move {
83 let tx = PendingTransaction::new(tx_hash, provider.provider());
84 let receipt = tx.await?;
85 let receipt = receipt.ok_or_else(|| TxError::TransactionDropped(tx_hash))?;
86
87 debug!("Receipt: {:?}", receipt);
88
89 let timestamped: Vec<Result<Timestamped, TxError>> = receipt
90 .logs
91 .iter()
92 .map(|log| {
93 TimestampedFilter::decode_log(&log.clone().into())
94 .map(Into::into)
95 .map_err(TxError::from)
96 })
97 .collect();
98
99 Ok(timestamped)
100 };
101
102 Ok(PendingTx::new(future))
103 }
104
105 pub fn timestamped_event<T>(
106 &self,
107 start_block: T,
108 end_block: Option<T>,
109 uids: Option<Vec<H256>>,
110 ) -> TimestampedEvent<M>
111 where
112 T: Into<BlockNumber>,
113 {
114 let mut event = self.contract.timestamped_filter().from_block(start_block);
115
116 if let Some(end_block) = end_block {
117 event = event.to_block(end_block);
118 }
119
120 if let Some(uids) = uids {
121 event = event.topic0(uids);
122 }
123
124 TimestampedEvent::new(event)
125 }
126}
127
128pub struct TimestampedEvent<M: Middleware + 'static> {
129 inner: Event<Arc<M>, M, TimestampedFilter>,
130}
131
132impl<M: Middleware> TimestampedEvent<M> {
133 pub fn new(filter: Event<Arc<M>, M, TimestampedFilter>) -> Self {
134 Self { inner: filter }
135 }
136
137 pub async fn stream(&self) -> Result<BoxStream<Result<Timestamped, Error<M>>>, Error<M>> {
138 let stream = self
139 .inner
140 .stream()
141 .await
142 .map_err(Error::ContractError)?
143 .map(|e| Ok(Timestamped::from(e?)));
144 Ok(stream.boxed())
145 }
146
147 pub async fn query(&self) -> Result<Vec<Timestamped>, Error<M>> {
148 let logs = self
149 .inner
150 .query()
151 .await
152 .map_err(Error::ContractError)?
153 .iter()
154 .map(|e| Timestamped::from(e.clone()))
155 .collect();
156
157 Ok(logs)
158 }
159}
160
161impl<M: Middleware> TimestampedEvent<M>
162where
163 M::Provider: PubsubClient,
164{
165 pub async fn subscribe(
166 &self,
167 ) -> Result<impl Stream<Item = Result<Timestamped, Error<M>>> + '_, Error<M>> {
168 let stream = self
169 .inner
170 .subscribe()
171 .await
172 .map_err(Error::ContractError)?
173 .map(|e| Ok(Timestamped::from(e?)));
174 Ok(stream)
175 }
176}