flow_access_api/
client.rs

1#![allow(unused)]
2use crate::client::Error::*;
3use crate::flow::access::access_api_client::AccessApiClient;
4use crate::flow::access::{
5    ExecuteScriptAtLatestBlockRequest, GetAccountAtLatestBlockRequest,
6    GetEventsForHeightRangeRequest, GetLatestBlockRequest,
7    SendAndSubscribeTransactionStatusesRequest,
8};
9use crate::flow::entities::{transaction, Account, Event, Transaction, TransactionStatus};
10use crate::keys::hex_to_bytes;
11use crate::network::Network;
12use crate::transactions;
13use crate::transactions::hash_transaction;
14use derive_more::From;
15use futures::StreamExt;
16use rlp::RlpStream;
17use secp256k1::{Message, Secp256k1, SecretKey};
18use serde_cadence::{to_cadence_value, CadenceValue, ToCadenceValue};
19use sha3::{Digest, Sha3_256};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23use anyhow::anyhow;
24use tokio::select;
25use tonic::transport::{Channel, Uri};
26use tonic::Request;
27
28#[derive(Debug, From)]
29pub enum Error {
30    #[from]
31    TonicError(tonic::transport::Error),
32    #[from]
33    InvalidEndpoint(tonic::codegen::http::uri::InvalidUri),
34    CantGetAccount {
35        address: String,
36    },
37    #[from]
38    TonicStatusError(tonic::Status),
39    TransactionExpired,
40    TransactionTimeoutExceeded,
41    TransactionStreamClosedUnexpectedly,
42    DigestLenError(Vec<u8>),
43    #[from]
44    TransactionsError(transactions::Error),
45    NoBlockReturned,
46    #[from]
47    KeysError(crate::keys::Error),
48    #[from]
49    CadenceJsonError(serde_cadence::Error),
50    #[from]
51    SerdeError(serde_json::Error),
52    NoKeyAtIndex {
53        idx: u32,
54    },
55    #[from]
56    ResultUTF8Error(std::string::FromUtf8Error),
57}
58
59pub type Result<T> = std::result::Result<T, Error>;
60
61impl From<Error> for anyhow::Error {
62    fn from(value: Error) -> Self {
63        anyhow!(format!("{:?}", value))
64    }
65}
66
67
68#[derive(Clone)]
69pub struct FlowRcpClient {
70    access_client: AccessApiClient<Channel>,
71}
72
73unsafe impl Send for FlowRcpClient {
74
75}
76
77unsafe impl Sync for FlowRcpClient {
78
79}
80
81
82impl FlowRcpClient {
83    pub async fn try_new(network: Network) -> Result<FlowRcpClient> {
84        let endpoint: String = network.into();
85
86        let channel = Channel::builder(Uri::from_str(&endpoint)?)
87            .connect()
88            .await?;
89
90        let client = FlowRcpClient {
91            access_client: AccessApiClient::new(channel),
92        };
93
94        Ok(client)
95    }
96
97    pub async fn create_transaction_with_params(
98        &mut self,
99        script: &str,
100        params: &[&dyn ToCadenceValue],
101        sender_address_hex: &str,
102        gas_limit: u64,
103    ) -> Result<(Transaction, Vec<u8>)> {
104        let reference_block_id = self.get_reference_block_id().await?;
105        let account_address = hex_to_bytes(sender_address_hex)?;
106        let account = self.get_account(account_address.clone()).await?;
107
108        let key_index = 0u32;
109
110        let sequence_number = account
111            .keys
112            .get(key_index as usize)
113            .ok_or(NoKeyAtIndex { idx: key_index })?
114            .sequence_number;
115
116        let mut tx = Transaction {
117            script: script.as_bytes().to_vec(),
118            arguments: vec![], // We'll populate this with parameters
119            reference_block_id,
120            gas_limit,
121            proposal_key: Some(transaction::ProposalKey {
122                address: account_address.clone(),
123                key_id: key_index,
124                sequence_number: sequence_number.into(),
125            }),
126            payer: account_address.clone(),
127            authorizers: vec![account_address.clone()],
128            payload_signatures: vec![],
129            envelope_signatures: vec![],
130        };
131
132        for &param in params {
133            let cadence_message = to_cadence_value(param)?;
134            // Flow API requires JSON serialization for transaction arguments
135            let encoded_message = serde_json::to_string(&cadence_message)?.into_bytes();
136            tx.arguments.push(encoded_message);
137        }
138
139        // Use SHA3-256 instead of SHA2-256
140        let mut hasher = Sha3_256::new();
141
142        // Add domain tag
143        hasher.update(hex_to_bytes(transactions::TRANSACTION_DOMAIN_TAG)?);
144
145        // Use RLP encoding to ensure canonical format
146        let mut rlp: RlpStream = RlpStream::new_list(2);
147        rlp.begin_list(9); // Transaction has 9 fields
148
149        // 1. Script
150        rlp.append(&tx.script);
151
152        // 2. Arguments
153        rlp.begin_list(tx.arguments.len());
154        for arg in &tx.arguments {
155            rlp.append(&arg.as_slice());
156        }
157
158        // 3. Reference Block ID
159        rlp.append(&tx.reference_block_id);
160
161        // 4. Gas Limit
162        rlp.append(&tx.gas_limit);
163
164        // 5. Proposal Key
165        if let Some(pk) = &tx.proposal_key {
166            // No need to begin a list here as per the fix
167            rlp.append(&pk.address);
168            rlp.append(&pk.key_id);
169            rlp.append(&pk.sequence_number);
170        } else {
171            rlp.begin_list(0);
172        }
173
174        // 6. Payer
175        rlp.append(&tx.payer);
176
177        // 7. Authorizers
178        rlp.begin_list(tx.authorizers.len());
179        for auth in &tx.authorizers {
180            rlp.append(&auth.as_slice());
181        }
182
183        // 8. Payload Signatures
184        rlp.begin_list(tx.payload_signatures.len());
185        for sig in &tx.payload_signatures {
186            rlp.begin_list(3);
187            rlp.append(&sig.address);
188            rlp.append(&sig.key_id);
189            rlp.append(&sig.signature);
190        }
191
192        // Finish RLP encoding
193        let encoded = rlp.out();
194
195        // Hash the encoded transaction with SHA3-256
196        hasher.update(&encoded);
197
198        // Return the hash
199        Ok((tx, hasher.finalize().to_vec()))
200    }
201
202    /// Send a transaction and subscribe to status updates.
203    pub async fn send_transaction_and_subscribe(
204        &mut self,
205        transaction: Transaction,
206        target_status: TransactionStatus,
207        timeout: Duration,
208    ) -> Result<Vec<u8>> {
209        // Subscribe to transaction status updates
210        let request = Request::new(SendAndSubscribeTransactionStatusesRequest {
211            event_encoding_version: 1,
212            transaction: Some(transaction),
213        });
214
215        let mut stream = self
216            .access_client
217            .send_and_subscribe_transaction_statuses(request)
218            .await?
219            .into_inner();
220
221        // Set up a timeout for the subscription
222        let timeout = tokio::time::sleep(timeout);
223
224        tokio::pin!(timeout);
225
226        loop {
227            select! {
228                // Wait for the next status update or timeout
229                result = stream.next() => {
230                    match result {
231                        Some(Ok(status_response)) => {
232                            if let Some(response) = status_response.transaction_results {
233                                let status = TransactionStatus::try_from(response.status)
234                                    .unwrap_or(TransactionStatus::Unknown);
235
236                                println!("Transaction status update: {:?}", status);
237
238                                if status == target_status {
239                                    return Ok(response.transaction_id);
240                                }
241
242                                match status {
243                                    TransactionStatus::Expired => {
244                                        return Err(TransactionExpired);
245                                    }
246                                    _ => continue,
247                                }
248                            } else {
249                                continue;
250                            }
251                        }
252                        Some(Err(e)) => {
253                            return Err(e.into());
254                        }
255                        None => {
256                            return Err(TransactionStreamClosedUnexpectedly);
257                        }
258                    }
259                }
260                // Handle timeout
261                _ = &mut timeout => {
262                    return Err(TransactionTimeoutExceeded);
263                }
264            }
265        }
266    }
267
268    /// Execute a script at the latest block and return the result
269    pub async fn execute_script(
270        &mut self,
271        script: &str,
272        arguments: &[&dyn ToCadenceValue],
273    ) -> Result<CadenceValue> {
274        let script = script.as_bytes().to_vec();
275        let mut cadence_arguments = vec![];
276
277        for arg in arguments {
278            cadence_arguments.push(serde_json::to_vec(&arg.to_cadence_value()?)?)
279        }
280
281        let request = Request::new(ExecuteScriptAtLatestBlockRequest {
282            script,
283            arguments: cadence_arguments,
284        });
285
286        let response = self
287            .access_client
288            .execute_script_at_latest_block(request)
289            .await?;
290
291        let result = response.into_inner().value;
292
293        let result_string = String::from_utf8(result)?;
294
295        let result: CadenceValue = serde_cadence::from_str(&result_string)?;
296
297        Ok(result)
298    }
299
300    /// Get events for a specific height range.
301    async fn get_events_for_height_range(
302        &mut self,
303        event_type: String,
304        start_height: u64,
305        end_height: u64,
306    ) -> std::result::Result<Vec<Event>, Box<dyn std::error::Error>> {
307        let request = Request::new(GetEventsForHeightRangeRequest {
308            r#type: event_type,
309            start_height,
310            end_height,
311            event_encoding_version: 0,
312        });
313
314        let response = self
315            .access_client
316            .get_events_for_height_range(request)
317            .await?;
318        let results = response.into_inner().results;
319
320        let mut events = Vec::new();
321        for block_events in results {
322            events.extend(block_events.events);
323        }
324
325        Ok(events)
326    }
327
328    /// Get account details by address.
329    pub async fn get_account(&mut self, address: Vec<u8>) -> Result<Account> {
330        let request = Request::new(GetAccountAtLatestBlockRequest {
331            address: address.clone(),
332        });
333
334        let response = self
335            .access_client
336            .get_account_at_latest_block(request)
337            .await?;
338        let account = response.into_inner().account.ok_or(CantGetAccount {
339            address: hex::encode(&address),
340        })?;
341
342        Ok(account)
343    }
344
345    pub async fn get_reference_block_id(&mut self) -> Result<Vec<u8>> {
346        let request = Request::new(GetLatestBlockRequest {
347            full_block_response: false,
348            is_sealed: true,
349        });
350
351        let response = self.access_client.get_latest_block(request).await?;
352        let block = response.into_inner().block.ok_or(NoBlockReturned {})?;
353
354        Ok(block.id)
355    }
356}