1use std::sync::Arc;
2
3pub use alloy_provider;
4use alloy_rpc_types::TransactionReceipt;
5use anyhow::Context;
6
7use crate::network::{PodNetwork, PodTransactionRequest};
8use alloy_json_rpc::{RpcError, RpcRecv, RpcSend};
9use alloy_network::{EthereumWallet, Network, NetworkWallet, TransactionBuilder};
10use alloy_provider::{
11 fillers::{JoinFill, RecommendedFillers, TxFiller, WalletFiller},
12 Identity, PendingTransactionBuilder, Provider, ProviderBuilder, ProviderLayer, RootProvider,
13 SendableTx,
14};
15use alloy_pubsub::Subscription;
16use async_trait::async_trait;
17
18use alloy_transport::{TransportError, TransportResult};
19use pod_types::{
20 consensus::Committee,
21 ledger::log::VerifiableLog,
22 metadata::{MetadataWrappedItem, RegularReceiptMetadata},
23 pagination::{ApiPaginatedResult, CursorPaginationRequest},
24 rpc::filter::LogFilter,
25};
26
27use alloy_primitives::{Address, U256};
28use pod_types::Timestamp;
29
30pub struct PodProviderBuilder<L, F>(ProviderBuilder<L, F, PodNetwork>);
31
32impl
33 PodProviderBuilder<
34 Identity,
35 JoinFill<Identity, <PodNetwork as RecommendedFillers>::RecommendedFillers>,
36 >
37{
38 pub fn with_recommended_settings() -> Self {
45 Self(PodProviderBuilder::default().0.with_recommended_fillers())
46 }
47}
48
49impl Default for PodProviderBuilder<Identity, Identity> {
50 fn default() -> Self {
51 Self(ProviderBuilder::<_, _, PodNetwork>::default())
52 }
53}
54
55impl PodProviderBuilder<Identity, Identity> {
56 pub fn new() -> Self {
57 Self::default()
58 }
59}
60
61impl<L, F> PodProviderBuilder<L, F> {
62 pub async fn on_url<U: AsRef<str>>(self, url: U) -> Result<PodProvider, TransportError>
66 where
67 L: ProviderLayer<RootProvider<PodNetwork>, PodNetwork>,
68 F: TxFiller<PodNetwork> + ProviderLayer<L::Provider, PodNetwork>,
69 F::Provider: 'static,
70 {
71 let alloy_provider = self.0.connect(url.as_ref()).await?;
72 Ok(PodProvider::new(alloy_provider))
73 }
74
75 pub fn wallet<W>(self, wallet: W) -> PodProviderBuilder<L, JoinFill<F, WalletFiller<W>>>
77 where
78 W: NetworkWallet<PodNetwork>,
79 {
80 PodProviderBuilder::<_, _>(self.0.wallet(wallet))
81 }
82
83 pub fn with_private_key(
84 self,
85 key: crate::SigningKey,
86 ) -> PodProviderBuilder<L, JoinFill<F, WalletFiller<EthereumWallet>>> {
87 let signer = crate::PrivateKeySigner::from_signing_key(key);
88
89 self.wallet(crate::EthereumWallet::new(signer))
90 }
91
92 pub async fn from_env(self) -> anyhow::Result<PodProvider>
99 where
100 L: ProviderLayer<RootProvider<PodNetwork>, PodNetwork>,
101 F: TxFiller<PodNetwork> + ProviderLayer<L::Provider, PodNetwork> + 'static,
102 L::Provider: 'static,
103 {
104 const PK_ENV: &str = "POD_PRIVATE_KEY";
105 fn load_private_key() -> anyhow::Result<crate::SigningKey> {
106 let pk_string = std::env::var(PK_ENV)?;
107 let pk_bytes = hex::decode(pk_string)?;
108 let pk = crate::SigningKey::from_slice(&pk_bytes)?;
109 Ok(pk)
110 }
111 let private_key = load_private_key()
112 .with_context(|| format!("{PK_ENV} env should contain hex-encoded ECDSA signer key"))?;
113
114 let rpc_url = std::env::var("POD_RPC_URL").unwrap_or("ws://127.0.0.1:8545".to_string());
115
116 let provider = self
117 .with_private_key(private_key)
118 .on_url(rpc_url.clone())
119 .await
120 .with_context(|| format!("attaching provider to URL {rpc_url}"))?;
121
122 Ok(provider)
123 }
124}
125
126pub struct PodProvider {
129 inner: Arc<dyn Provider<PodNetwork>>,
130}
131
132impl Clone for PodProvider {
133 fn clone(&self) -> Self {
134 Self {
135 inner: self.inner.clone(),
136 }
137 }
138}
139
140#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
141#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
142impl Provider<PodNetwork> for PodProvider {
143 fn root(&self) -> &RootProvider<PodNetwork> {
144 self.inner.root()
145 }
146
147 async fn send_transaction_internal(
152 &self,
153 tx: SendableTx<PodNetwork>,
154 ) -> TransportResult<PendingTransactionBuilder<PodNetwork>> {
155 self.inner.send_transaction_internal(tx).await
156 }
157}
158
159impl PodProvider {
160 pub fn new(provider: impl Provider<PodNetwork> + 'static) -> Self {
162 Self {
163 inner: Arc::new(provider),
164 }
165 }
166
167 pub async fn get_committee(&self) -> TransportResult<Committee> {
169 self.client().request_noparams("pod_getCommittee").await
170 }
171
172 pub async fn get_verifiable_logs(
173 &self,
174 filter: &LogFilter,
175 ) -> TransportResult<Vec<VerifiableLog>> {
176 self.client().request("eth_getLogs", (filter,)).await
177 }
178
179 pub async fn websocket_subscribe<Params, Resp>(
180 &self,
181 method: &str,
182 params: Params,
183 ) -> TransportResult<Subscription<Resp>>
184 where
185 Params: RpcSend,
186 Resp: RpcRecv,
187 {
188 let id = self
189 .client()
190 .request("eth_subscribe", (method, params))
191 .await?;
192 self.root().get_subscription(id).await
193 }
194
195 pub async fn subscribe_verifiable_logs(
196 &self,
197 filter: &LogFilter,
198 ) -> TransportResult<Subscription<VerifiableLog>> {
199 self.websocket_subscribe("logs", filter).await
200 }
201
202 pub async fn wait_past_perfect_time(&self, timestamp: Timestamp) -> TransportResult<()> {
203 const INVALID_PARAMS_CODE: i64 = -32602;
204 const PPT_TOO_FAR_MSG: &str = "Requested PPT is too far in the future";
205 const MAX_RETRIES: u32 = 100;
206
207 const SLEEP_DURATION_MILLIS: u64 = 100;
208
209 let mut retries = 0;
210 loop {
211 let result = self
212 .client()
213 .request::<_, String>("pod_waitPastPerfectTime", (timestamp.as_micros() as u64,))
214 .await;
215
216 match &result {
217 Err(e)
218 if retries < MAX_RETRIES
219 && e.as_error_resp().is_some_and(|r| {
220 r.code == INVALID_PARAMS_CODE && r.message == PPT_TOO_FAR_MSG
221 }) =>
222 {
223 retries += 1;
224 tokio::time::sleep(std::time::Duration::from_millis(SLEEP_DURATION_MILLIS))
225 .await;
226 continue;
227 }
228 _ => return Ok(()),
229 }
230 }
231 }
232
233 pub async fn subscribe_receipts(
238 &self,
239 address: Option<Address>,
240 since: Timestamp,
241 ) -> TransportResult<
242 Subscription<MetadataWrappedItem<TransactionReceipt, RegularReceiptMetadata>>,
243 > {
244 self.websocket_subscribe("pod_receipts", (address, since))
245 .await
246 }
247
248 pub async fn get_receipts(
249 &self,
250 address: Option<Address>,
251 since_micros: u64,
252 paginator: Option<CursorPaginationRequest>,
253 ) -> TransportResult<ApiPaginatedResult<<PodNetwork as Network>::ReceiptResponse>> {
254 self.client()
255 .request("pod_listReceipts", &(address, since_micros, paginator))
256 .await
257 }
258
259 pub async fn transfer(
261 &self,
262 to: Address,
263 amount: U256,
264 ) -> Result<<PodNetwork as Network>::ReceiptResponse, Box<dyn std::error::Error>> {
265 let tx = PodTransactionRequest::default()
266 .with_to(to)
267 .with_value(amount);
268
269 let pending_tx = self.send_transaction(tx).await?;
270
271 let receipt = pending_tx.get_receipt().await?;
272
273 Ok(receipt)
274 }
275
276 pub async fn past_perfect_time(&self, contract: Address) -> TransportResult<Timestamp> {
277 let micros_str: String = self
278 .client()
279 .request("pod_pastPerfectTime", (contract,)) .await?;
281
282 let micros: u128 = micros_str.parse().map_err(|e| {
283 RpcError::local_usage_str(&format!("invalid micros from pod_pastPerfectTime: {e}"))
284 })?;
285
286 Ok(Timestamp::from_micros(micros))
287 }
288}