flow_access_api/
client.rs1#![allow(unused)]
2use crate::client::Error::*;
3use crate::flow::access::access_api_client::AccessApiClient;
4use crate::flow::access::{
5 ExecuteScriptAtLatestBlockRequest, GetAccountAtLatestBlockRequest,
6 GetEventsForHeightRangeRequest, GetLatestBlockRequest,
7 SendAndSubscribeTransactionStatusesRequest,
8};
9use crate::flow::entities::{transaction, Account, Event, Transaction, TransactionStatus};
10use crate::keys::hex_to_bytes;
11use crate::network::Network;
12use crate::transactions;
13use crate::transactions::hash_transaction;
14use derive_more::From;
15use futures::StreamExt;
16use rlp::RlpStream;
17use secp256k1::{Message, Secp256k1, SecretKey};
18use serde_cadence::{to_cadence_value, CadenceValue, ToCadenceValue};
19use sha3::{Digest, Sha3_256};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23use anyhow::anyhow;
24use tokio::select;
25use tonic::transport::{Channel, Uri};
26use tonic::Request;
27
28#[derive(Debug, From)]
29pub enum Error {
30 #[from]
31 TonicError(tonic::transport::Error),
32 #[from]
33 InvalidEndpoint(tonic::codegen::http::uri::InvalidUri),
34 CantGetAccount {
35 address: String,
36 },
37 #[from]
38 TonicStatusError(tonic::Status),
39 TransactionExpired,
40 TransactionTimeoutExceeded,
41 TransactionStreamClosedUnexpectedly,
42 DigestLenError(Vec<u8>),
43 #[from]
44 TransactionsError(transactions::Error),
45 NoBlockReturned,
46 #[from]
47 KeysError(crate::keys::Error),
48 #[from]
49 CadenceJsonError(serde_cadence::Error),
50 #[from]
51 SerdeError(serde_json::Error),
52 NoKeyAtIndex {
53 idx: u32,
54 },
55 #[from]
56 ResultUTF8Error(std::string::FromUtf8Error),
57}
58
59pub type Result<T> = std::result::Result<T, Error>;
60
61impl From<Error> for anyhow::Error {
62 fn from(value: Error) -> Self {
63 anyhow!(format!("{:?}", value))
64 }
65}
66
67
68#[derive(Clone)]
69pub struct FlowRcpClient {
70 access_client: AccessApiClient<Channel>,
71}
72
73unsafe impl Send for FlowRcpClient {
74
75}
76
77unsafe impl Sync for FlowRcpClient {
78
79}
80
81
82impl FlowRcpClient {
83 pub async fn try_new(network: Network) -> Result<FlowRcpClient> {
84 let endpoint: String = network.into();
85
86 let channel = Channel::builder(Uri::from_str(&endpoint)?)
87 .connect()
88 .await?;
89
90 let client = FlowRcpClient {
91 access_client: AccessApiClient::new(channel),
92 };
93
94 Ok(client)
95 }
96
97 pub async fn create_transaction_with_params(
98 &mut self,
99 script: &str,
100 params: &[&dyn ToCadenceValue],
101 sender_address_hex: &str,
102 gas_limit: u64,
103 ) -> Result<(Transaction, Vec<u8>)> {
104 let reference_block_id = self.get_reference_block_id().await?;
105 let account_address = hex_to_bytes(sender_address_hex)?;
106 let account = self.get_account(account_address.clone()).await?;
107
108 let key_index = 0u32;
109
110 let sequence_number = account
111 .keys
112 .get(key_index as usize)
113 .ok_or(NoKeyAtIndex { idx: key_index })?
114 .sequence_number;
115
116 let mut tx = Transaction {
117 script: script.as_bytes().to_vec(),
118 arguments: vec![], reference_block_id,
120 gas_limit,
121 proposal_key: Some(transaction::ProposalKey {
122 address: account_address.clone(),
123 key_id: key_index,
124 sequence_number: sequence_number.into(),
125 }),
126 payer: account_address.clone(),
127 authorizers: vec![account_address.clone()],
128 payload_signatures: vec![],
129 envelope_signatures: vec![],
130 };
131
132 for ¶m in params {
133 let cadence_message = to_cadence_value(param)?;
134 let encoded_message = serde_json::to_string(&cadence_message)?.into_bytes();
136 tx.arguments.push(encoded_message);
137 }
138
139 let mut hasher = Sha3_256::new();
141
142 hasher.update(hex_to_bytes(transactions::TRANSACTION_DOMAIN_TAG)?);
144
145 let mut rlp: RlpStream = RlpStream::new_list(2);
147 rlp.begin_list(9); rlp.append(&tx.script);
151
152 rlp.begin_list(tx.arguments.len());
154 for arg in &tx.arguments {
155 rlp.append(&arg.as_slice());
156 }
157
158 rlp.append(&tx.reference_block_id);
160
161 rlp.append(&tx.gas_limit);
163
164 if let Some(pk) = &tx.proposal_key {
166 rlp.append(&pk.address);
168 rlp.append(&pk.key_id);
169 rlp.append(&pk.sequence_number);
170 } else {
171 rlp.begin_list(0);
172 }
173
174 rlp.append(&tx.payer);
176
177 rlp.begin_list(tx.authorizers.len());
179 for auth in &tx.authorizers {
180 rlp.append(&auth.as_slice());
181 }
182
183 rlp.begin_list(tx.payload_signatures.len());
185 for sig in &tx.payload_signatures {
186 rlp.begin_list(3);
187 rlp.append(&sig.address);
188 rlp.append(&sig.key_id);
189 rlp.append(&sig.signature);
190 }
191
192 let encoded = rlp.out();
194
195 hasher.update(&encoded);
197
198 Ok((tx, hasher.finalize().to_vec()))
200 }
201
202 pub async fn send_transaction_and_subscribe(
204 &mut self,
205 transaction: Transaction,
206 target_status: TransactionStatus,
207 timeout: Duration,
208 ) -> Result<Vec<u8>> {
209 let request = Request::new(SendAndSubscribeTransactionStatusesRequest {
211 event_encoding_version: 1,
212 transaction: Some(transaction),
213 });
214
215 let mut stream = self
216 .access_client
217 .send_and_subscribe_transaction_statuses(request)
218 .await?
219 .into_inner();
220
221 let timeout = tokio::time::sleep(timeout);
223
224 tokio::pin!(timeout);
225
226 loop {
227 select! {
228 result = stream.next() => {
230 match result {
231 Some(Ok(status_response)) => {
232 if let Some(response) = status_response.transaction_results {
233 let status = TransactionStatus::try_from(response.status)
234 .unwrap_or(TransactionStatus::Unknown);
235
236 println!("Transaction status update: {:?}", status);
237
238 if status == target_status {
239 return Ok(response.transaction_id);
240 }
241
242 match status {
243 TransactionStatus::Expired => {
244 return Err(TransactionExpired);
245 }
246 _ => continue,
247 }
248 } else {
249 continue;
250 }
251 }
252 Some(Err(e)) => {
253 return Err(e.into());
254 }
255 None => {
256 return Err(TransactionStreamClosedUnexpectedly);
257 }
258 }
259 }
260 _ = &mut timeout => {
262 return Err(TransactionTimeoutExceeded);
263 }
264 }
265 }
266 }
267
268 pub async fn execute_script(
270 &mut self,
271 script: &str,
272 arguments: &[&dyn ToCadenceValue],
273 ) -> Result<CadenceValue> {
274 let script = script.as_bytes().to_vec();
275 let mut cadence_arguments = vec![];
276
277 for arg in arguments {
278 cadence_arguments.push(serde_json::to_vec(&arg.to_cadence_value()?)?)
279 }
280
281 let request = Request::new(ExecuteScriptAtLatestBlockRequest {
282 script,
283 arguments: cadence_arguments,
284 });
285
286 let response = self
287 .access_client
288 .execute_script_at_latest_block(request)
289 .await?;
290
291 let result = response.into_inner().value;
292
293 let result_string = String::from_utf8(result)?;
294
295 let result: CadenceValue = serde_cadence::from_str(&result_string)?;
296
297 Ok(result)
298 }
299
300 async fn get_events_for_height_range(
302 &mut self,
303 event_type: String,
304 start_height: u64,
305 end_height: u64,
306 ) -> std::result::Result<Vec<Event>, Box<dyn std::error::Error>> {
307 let request = Request::new(GetEventsForHeightRangeRequest {
308 r#type: event_type,
309 start_height,
310 end_height,
311 event_encoding_version: 0,
312 });
313
314 let response = self
315 .access_client
316 .get_events_for_height_range(request)
317 .await?;
318 let results = response.into_inner().results;
319
320 let mut events = Vec::new();
321 for block_events in results {
322 events.extend(block_events.events);
323 }
324
325 Ok(events)
326 }
327
328 pub async fn get_account(&mut self, address: Vec<u8>) -> Result<Account> {
330 let request = Request::new(GetAccountAtLatestBlockRequest {
331 address: address.clone(),
332 });
333
334 let response = self
335 .access_client
336 .get_account_at_latest_block(request)
337 .await?;
338 let account = response.into_inner().account.ok_or(CantGetAccount {
339 address: hex::encode(&address),
340 })?;
341
342 Ok(account)
343 }
344
345 pub async fn get_reference_block_id(&mut self) -> Result<Vec<u8>> {
346 let request = Request::new(GetLatestBlockRequest {
347 full_block_response: false,
348 is_sealed: true,
349 });
350
351 let response = self.access_client.get_latest_block(request).await?;
352 let block = response.into_inner().block.ok_or(NoBlockReturned {})?;
353
354 Ok(block.id)
355 }
356}