1use crate::eth::{
2 get_attestation_details, get_balance, get_schema_details, Attestation, AttestationSchema,
3 GetBalanceArgs,
4};
5use crate::runtime::{PaymentRuntime, SharedState, TransferArgs, TransferType};
6use crate::server::ws::event_stream_websocket_endpoint;
7use crate::setup::{ChainSetup, PaymentSetup};
8use crate::transaction::create_token_transfer;
9use actix_files::NamedFile;
10use actix_web::dev::{ServiceRequest, ServiceResponse};
11use actix_web::error::ErrorBadRequest;
12use actix_web::http::header::HeaderValue;
13use actix_web::http::{header, StatusCode};
14use actix_web::web::Data;
15use actix_web::{web, HttpRequest, HttpResponse, Responder, Scope};
16use chrono::{DateTime, Utc};
17use erc20_payment_lib_common::model::DepositId;
18use erc20_payment_lib_common::ops::*;
19use erc20_payment_lib_common::{export_metrics_to_prometheus, FaucetData};
20use erc20_rpc_pool::VerifyEndpointResult;
21use serde::{Deserialize, Serialize};
22use serde_json::json;
23use sqlx::SqlitePool;
24use std::collections::BTreeMap;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::sync::Mutex;
29use web3::ethabi;
30use web3::types::{Address, H256, U256};
31
32pub struct ServerData {
33 pub shared_state: Arc<std::sync::Mutex<SharedState>>,
34 pub db_connection: Arc<Mutex<SqlitePool>>,
35 pub payment_setup: PaymentSetup,
36 pub payment_runtime: PaymentRuntime,
37}
38
39macro_rules! return_on_error {
40 ( $e:expr ) => {
41 match $e {
42 Ok(x) => x,
43 Err(err) => {
44 return web::Json(json!({
45 "error": err.to_string()
46 }));
47 }
48 }
49 }
50}
51
52pub async fn tx_details(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
53 let tx_id = req
54 .match_info()
55 .get("tx_id")
56 .map(|tx_id| i64::from_str(tx_id).ok())
57 .unwrap_or(None);
58
59 let tx_id = match tx_id {
60 Some(tx_id) => tx_id,
61 None => return web::Json(json!({"error": "failed to parse tx_id"})),
62 };
63
64 let tx = {
65 let db_conn = data.db_connection.lock().await;
66 match get_transaction(&*db_conn, tx_id).await {
67 Ok(allowances) => allowances,
68 Err(err) => {
69 return web::Json(json!({
70 "error": err.to_string()
71 }));
72 }
74 }
75 };
76
77 web::Json(json!({
106 "tx": tx,
107 }))
108}
109
110pub async fn rpc_pool(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
111 let my_data = data.shared_state.lock().unwrap();
112 let web3_rpc_pool_info = my_data
127 .web3_pool_ref
128 .lock()
129 .unwrap()
130 .iter()
131 .map(|(k, v)| {
132 (
133 *k,
134 v.try_lock_for(Duration::from_secs(5))
135 .unwrap()
136 .iter()
137 .map(|pair| pair.1.clone())
138 .collect::<Vec<_>>(),
139 )
140 })
141 .collect::<BTreeMap<_, _>>();
142
143 let mut array = Vec::with_capacity(web3_rpc_pool_info.len());
144
145 for (idx, val) in web3_rpc_pool_info {
146 let val = val
147 .iter()
148 .map(|v| json!(*v.try_read_for(Duration::from_secs(5)).unwrap()))
149 .collect::<Vec<_>>();
150 let chain_network = data
151 .payment_setup
152 .chain_setup
153 .get(&idx)
154 .map(|s| s.network.clone())
155 .unwrap_or("unknown".to_string());
156 array.push(json!(
157 {
158 "chainId": idx,
159 "chainNetwork": chain_network,
160 "endpoints": val,
161 }
162 ));
163 }
164 web::Json(json!({
165 "networks": array,
166 }))
167}
168
169struct MetricGroup {
170 metric_help: String,
171 metric_type: String,
172 metrics: Vec<Metric>,
173}
174
175struct Metric {
176 name: String,
177 params: Vec<(String, String)>,
178 value: String,
179}
180
181pub async fn rpc_pool_metrics(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
182 let pool_ref = data
183 .shared_state
184 .lock()
185 .unwrap()
186 .web3_pool_ref
187 .lock()
188 .unwrap()
189 .clone();
190
191 let mut metrics = Vec::with_capacity(100);
192
193 metrics.push(MetricGroup {
194 metric_help: "# HELP rpc_endpoint_effective_score Effective score of selected rpc endpoint"
195 .to_string(),
196 metric_type: "# TYPE rpc_endpoint_effective_score gauge".to_string(),
197 metrics: Vec::new(),
198 });
199 metrics.push(MetricGroup {
200 metric_help:
201 "# HELP rpc_endpoint_score_validation Score (from validation) of selected rpc endpoint"
202 .to_string(),
203 metric_type: "# TYPE rpc_endpoint_score_validation gauge".to_string(),
204 metrics: Vec::new(),
205 });
206 metrics.push(MetricGroup {
207 metric_help: "# HELP rpc_endpoint_error_count Number of error requests".to_string(),
208 metric_type: "# TYPE rpc_endpoint_error_count counter".to_string(),
209 metrics: Vec::new(),
210 });
211 metrics.push(MetricGroup {
212 metric_help: "# HELP rpc_endpoint_success_count Number of succeeded requests".to_string(),
213 metric_type: "# TYPE rpc_endpoint_success_count counter".to_string(),
214 metrics: Vec::new(),
215 });
216 metrics.push(MetricGroup {
217 metric_help: "# HELP rpc_endpoint_ms Endpoint validation time".to_string(),
218 metric_type: "# TYPE rpc_endpoint_ms gauge".to_string(),
219 metrics: Vec::new(),
220 });
221 metrics.push(MetricGroup {
222 metric_help: "# HELP rpc_endpoint_block_delay Time since last block head".to_string(),
223 metric_type: "# TYPE rpc_endpoint_block_delay gauge".to_string(),
224 metrics: Vec::new(),
225 });
226
227 for (_idx, vec) in pool_ref {
228 for (_idx, endpoint) in vec.try_lock_for(Duration::from_secs(5)).unwrap().iter() {
229 let endpoint = endpoint
230 .try_read_for(std::time::Duration::from_secs(5))
231 .unwrap();
232 let params = vec![
233 (
234 "chain_id".to_string(),
235 endpoint.web3_rpc_params.chain_id.to_string(),
236 ),
237 ("name".to_string(), endpoint.web3_rpc_params.name.clone()),
238 ];
239 let new_metric = Metric {
240 name: "rpc_endpoint_effective_score".into(),
241 params: params.clone(),
242 value: (endpoint.get_score()).to_string(),
243 };
244 metrics[0].metrics.push(new_metric);
245
246 let new_metric = Metric {
247 name: "rpc_endpoint_score_validation".into(),
248 params: params.clone(),
249 value: (endpoint.get_validation_score()).to_string(),
250 };
251 metrics[1].metrics.push(new_metric);
252
253 let new_metric = Metric {
254 name: "rpc_endpoint_error_count".into(),
255 params: params.clone(),
256 value: endpoint
257 .web3_rpc_info
258 .web3_rpc_stats
259 .request_count_total_error
260 .to_string(),
261 };
262 metrics[2].metrics.push(new_metric);
263
264 let new_metric = Metric {
265 name: "rpc_endpoint_success_count".into(),
266 params: params.clone(),
267 value: endpoint
268 .web3_rpc_info
269 .web3_rpc_stats
270 .request_count_total_succeeded
271 .to_string(),
272 };
273 metrics[3].metrics.push(new_metric);
274
275 let (head_behind, check_time_ms) = match &endpoint.web3_rpc_info.verify_result {
276 Some(VerifyEndpointResult::Ok(res)) => {
277 (res.head_seconds_behind as i64, res.check_time_ms as i64)
278 }
279 _ => (-1, -1),
280 };
281
282 let new_metric = Metric {
283 name: "rpc_endpoint_ms".into(),
284 params: params.clone(),
285 value: check_time_ms.to_string(),
286 };
287 metrics[4].metrics.push(new_metric);
288
289 let new_metric = Metric {
290 name: "rpc_endpoint_block_delay".into(),
291 params: params.clone(),
292 value: head_behind.to_string(),
293 };
294 metrics[5].metrics.push(new_metric);
295 }
296 }
297
298 let mut resp: String = String::with_capacity(1024 * 1024);
299 for metric_group in metrics {
300 resp += &format!("{}\n", metric_group.metric_help);
301 resp += &format!("{}\n", metric_group.metric_type);
302 for metric in metric_group.metrics {
303 resp += &format!("{}{{", metric.name);
304 for (idx, param) in metric.params.iter().enumerate() {
305 resp += &format!(
306 "{}=\"{}\"{}",
307 param.0,
308 param.1,
309 if idx < metric.params.len() - 1 {
310 ","
311 } else {
312 ""
313 }
314 );
315 }
316 resp += &format!("}} {}\n", metric.value);
317 }
318 resp += "\n";
319 }
320
321 resp
322}
323
324pub async fn allowances(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
325 data.shared_state.lock().unwrap().inserted += 1;
326 let allowances = {
327 let db_conn = data.db_connection.lock().await;
328 match get_all_allowances(&db_conn).await {
329 Ok(allowances) => allowances,
330 Err(err) => {
331 return web::Json(json!({
332 "error": err.to_string()
333 }));
334 }
336 }
337 };
338
339 web::Json(json!({
340 "allowances": allowances,
341 }))
342}
343
344pub async fn transactions_count(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
345 let queued_tx_count = {
346 let db_conn = data.db_connection.lock().await;
347 return_on_error!(get_transaction_count(&db_conn, Some(TRANSACTION_FILTER_QUEUED)).await)
348 };
349 let done_tx_count = {
350 let db_conn = data.db_connection.lock().await;
351 return_on_error!(get_transaction_count(&db_conn, Some(TRANSACTION_FILTER_DONE)).await)
352 };
353
354 let queued_transfer_count = {
355 let db_conn = data.db_connection.lock().await;
356 return_on_error!(
357 get_transfer_count(&db_conn, Some(TRANSFER_FILTER_QUEUED), None, None).await
358 )
359 };
360 let processed_transfer_count = {
361 let db_conn = data.db_connection.lock().await;
362 return_on_error!(
363 get_transfer_count(&db_conn, Some(TRANSFER_FILTER_PROCESSING), None, None).await
364 )
365 };
366 let done_transfer_count = {
367 let db_conn = data.db_connection.lock().await;
368 return_on_error!(get_transfer_count(&db_conn, Some(TRANSFER_FILTER_DONE), None, None).await)
369 };
370
371 web::Json(json!({
372 "transfersQueued": queued_transfer_count,
373 "transfersProcessing": processed_transfer_count,
374 "transfersDone": done_transfer_count,
375 "txQueued": queued_tx_count,
376 "txDone": done_tx_count,
377 }))
378}
379
380pub async fn config_endpoint(data: Data<Box<ServerData>>) -> impl Responder {
381 let payment_setup = data.payment_setup.clone();
382
383 web::Json(json!({
384 "config": payment_setup,
385 }))
386}
387
388pub async fn debug_endpoint(data: Data<Box<ServerData>>) -> impl Responder {
389 let shared_state = data.shared_state.lock().unwrap().clone();
390
391 web::Json(json!({
392 "sharedState": shared_state,
393 }))
394}
395
396pub async fn transactions(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
397 let txs = {
399 let db_conn = data.db_connection.lock().await;
400 return_on_error!(get_transactions(&*db_conn, None, None, None, None, None).await)
401 };
402 web::Json(json!({
403 "txs": txs,
404 }))
405}
406
407pub async fn skip_pending_operation(
408 data: Data<Box<ServerData>>,
409 req: HttpRequest,
410) -> impl Responder {
411 let tx_id = req
412 .match_info()
413 .get("tx_id")
414 .map(|tx_id| i64::from_str(tx_id).ok())
415 .unwrap_or(None);
416 if let Some(tx_id) = tx_id {
417 if data.shared_state.lock().unwrap().skip_tx(tx_id) {
418 web::Json(json!({
419 "success": "true",
420 }))
421 } else {
422 web::Json(json!({
423 "error": "Tx not found",
424 }))
425 }
426 } else {
427 web::Json(json!({
428 "error": "failed to parse tx_id",
429 }))
430 }
431}
432
433pub async fn transactions_next(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
434 let limit = req
435 .match_info()
436 .get("count")
437 .map(|tx_id| i64::from_str(tx_id).ok())
438 .unwrap_or(Some(10));
439
440 let txs = {
441 let db_conn = data.db_connection.lock().await;
442 return_on_error!(
443 get_transactions(
444 &*db_conn,
445 None,
446 Some(TRANSACTION_FILTER_QUEUED),
447 limit,
448 Some(TRANSACTION_ORDER_BY_CREATE_DATE),
449 None
450 )
451 .await
452 )
453 };
454 web::Json(json!({
455 "txs": txs,
456 }))
457}
458
459pub async fn transactions_current(
460 data: Data<Box<ServerData>>,
461 _req: HttpRequest,
462) -> impl Responder {
463 let txs = {
464 let db_conn = data.db_connection.lock().await;
465 return_on_error!(
466 get_transactions(
467 &*db_conn,
468 None,
469 Some(TRANSACTION_FILTER_PROCESSING),
470 None,
471 Some(TRANSACTION_ORDER_BY_CREATE_DATE),
472 None
473 )
474 .await
475 )
476 };
477 web::Json(json!({
478 "txs": txs,
479 }))
480}
481
482pub async fn transactions_last_processed(
483 data: Data<Box<ServerData>>,
484 req: HttpRequest,
485) -> impl Responder {
486 let limit = req
487 .match_info()
488 .get("count")
489 .map(|tx_id| i64::from_str(tx_id).ok())
490 .unwrap_or(Some(10));
491
492 let txs = {
493 let db_conn = data.db_connection.lock().await;
494 return_on_error!(
495 get_transactions(
496 &*db_conn,
497 None,
498 Some(TRANSACTION_FILTER_DONE),
499 limit,
500 Some(TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC),
501 None
502 )
503 .await
504 )
505 };
506 web::Json(json!({
507 "txs": txs,
508 }))
509}
510
511pub async fn transactions_feed(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
512 let limit_prev = req
513 .match_info()
514 .get("prev")
515 .map(|tx_id| i64::from_str(tx_id).ok())
516 .unwrap_or(Some(10));
517 let limit_next = req
518 .match_info()
519 .get("next")
520 .map(|tx_id| i64::from_str(tx_id).ok())
521 .unwrap_or(Some(10));
522 let mut txs = {
523 let db_conn = data.db_connection.lock().await;
524 let mut db_transaction = return_on_error!(db_conn.begin().await);
525 let mut txs = return_on_error!(
526 get_transactions(
527 &mut *db_transaction,
528 None,
529 Some(TRANSACTION_FILTER_DONE),
530 limit_prev,
531 Some(TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC),
532 None
533 )
534 .await
535 );
536 let txs_current = return_on_error!(
537 get_transactions(
538 &mut *db_transaction,
539 None,
540 Some(TRANSACTION_FILTER_PROCESSING),
541 None,
542 Some(TRANSACTION_ORDER_BY_CREATE_DATE),
543 None
544 )
545 .await
546 );
547 let tx_next = return_on_error!(
548 get_transactions(
549 &mut *db_transaction,
550 None,
551 Some(TRANSACTION_FILTER_QUEUED),
552 limit_next,
553 Some(TRANSACTION_ORDER_BY_CREATE_DATE),
554 None
555 )
556 .await
557 );
558 return_on_error!(db_transaction.commit().await);
559 txs.reverse();
561 txs.extend(txs_current);
562 txs.extend(tx_next);
563 txs
564 };
565
566 let current_tx = data.shared_state.lock().unwrap().current_tx_info.clone();
567 for tx in txs.iter_mut() {
568 if let Some(tx_info) = current_tx.get(&tx.id) {
569 tx.engine_error.clone_from(&tx_info.error);
570 tx.engine_message = Some(tx_info.message.clone());
571 }
572 }
573
574 web::Json(json!({
575 "txs": txs,
576 "current": current_tx,
577 }))
578}
579
580#[derive(Debug, Deserialize)]
581#[serde(rename_all = "camelCase")]
582struct TransactionRequest {
583 from: String,
584 to: String,
585 token: Option<String>,
586 amount: String,
587 chain: i64,
588 due_date: Option<String>,
589 payment_id: Option<String>,
590 deposit_id: Option<DepositId>,
591}
592
593async fn new_transfer(
594 data: Data<Box<ServerData>>,
595 _req: HttpRequest,
596 new_transfer: web::Json<TransactionRequest>,
597) -> actix_web::Result<String> {
598 let chain = data
601 .payment_setup
602 .chain_setup
603 .get(&new_transfer.chain)
604 .ok_or(actix_web::error::ErrorBadRequest("No config found"))?
605 .clone();
606
607 let tx_type = if let Some(_token) = &new_transfer.token {
608 TransferType::Token
609 } else {
610 TransferType::Gas
611 };
612
613 let due_date = if let Some(due_date) = &new_transfer.due_date {
614 Some(
615 chrono::DateTime::parse_from_rfc3339(due_date)
616 .map_err(|err| {
617 actix_web::error::ErrorBadRequest(format!("Invalid due_date: {}", err))
618 })?
619 .naive_utc()
620 .and_utc(),
621 )
622 } else {
623 None
624 };
625
626 let payment_id = if let Some(payment_id) = &new_transfer.payment_id {
627 payment_id.clone()
628 } else {
629 uuid::Uuid::new_v4().to_string()
630 };
631
632 let transfer_args = TransferArgs {
633 network: chain.network,
634 from: Address::from_str(&new_transfer.from).unwrap(),
635 receiver: Address::from_str(&new_transfer.to).unwrap(),
636 tx_type,
637 amount: U256::from_dec_str(&new_transfer.amount).unwrap(),
638 payment_id,
639 deadline: due_date,
640 deposit_id: new_transfer.deposit_id,
641 };
642
643 let account = match data
644 .shared_state
645 .lock()
646 .unwrap()
647 .accounts
648 .iter()
649 .find(|acc| acc.address == transfer_args.from)
650 {
651 Some(acc) => acc.clone(),
652 None => {
653 return Err(actix_web::error::ErrorBadRequest(format!(
654 "Account not found: {:#x}",
655 transfer_args.from
656 )));
657 }
658 };
659
660 if let Err(err) = data
661 .payment_runtime
662 .transfer_with_account(&account, transfer_args.clone())
663 .await
664 {
665 return Err(actix_web::error::ErrorInternalServerError(format!(
666 "Failed to create transfer: {}",
667 err
668 )));
669 };
670 log::warn!("Created transfer: {:?}", transfer_args);
671
672 Ok("success".to_string())
673}
674
675#[derive(Deserialize)]
676pub struct StatsTransferRequest {
677 receiver: Option<String>,
678 from: Option<String>,
679 to: Option<String>,
680 chain: Option<String>,
681}
682
683#[derive(Debug, Serialize)]
684pub struct StatsTransferResult {
685 request_time: f64,
686 transfers: Vec<ChainTransferRespObj>,
687}
688
689#[derive(Serialize, sqlx::FromRow, Debug, Clone)]
690#[serde(rename_all = "camelCase")]
691pub struct ChainTransferRespObj {
692 pub id: i64,
693 pub from_addr: String,
694 pub receiver_addr: String,
695 pub chain_id: i64,
696 pub token_addr: Option<String>,
697 pub token_amount: String,
698 pub tx_hash: String,
699 pub block_number: i64,
700 pub fee_paid: Option<String>,
701 pub block_date: DateTime<Utc>,
702 pub block_timestamp: i64,
703 pub to_addr: String,
704 pub caller_addr: String,
705}
706
707pub async fn stats_transfers(
708 data: Data<Box<ServerData>>,
709 info: web::Query<StatsTransferRequest>,
710) -> actix_web::Result<web::Json<StatsTransferResult>> {
711 let time_start = std::time::Instant::now();
712 let receiver = if info.receiver.clone() == Some("all".to_string()) {
713 None
714 } else {
715 let account = Address::from_str(
716 &info
717 .receiver
718 .clone()
719 .ok_or(actix_web::error::ErrorBadRequest("account not found"))?,
720 )
721 .map_err(|err| {
722 actix_web::error::ErrorBadRequest(format!("account has to be valid address {err}"))
723 })?;
724 Some(account)
725 };
726 let account_str = receiver.map(|account| format!("{:#x}", account));
727
728 let from = chrono::DateTime::from_timestamp(
729 i64::from_str(
730 &info
731 .from
732 .clone()
733 .ok_or(actix_web::error::ErrorBadRequest("From not found"))?,
734 )
735 .map_err(|err| {
736 actix_web::error::ErrorBadRequest(format!("From is not a valid timestamp {err}"))
737 })?,
738 0,
739 )
740 .ok_or(actix_web::error::ErrorBadRequest(
741 "From is not a valid timestamp.",
742 ))?;
743 let to = chrono::DateTime::from_timestamp(
744 i64::from_str(
745 &info
746 .to
747 .clone()
748 .ok_or(actix_web::error::ErrorBadRequest("To not found"))?,
749 )
750 .map_err(|err| {
751 actix_web::error::ErrorBadRequest(format!("To is not a valid timestamp {err}"))
752 })?,
753 0,
754 )
755 .ok_or(actix_web::error::ErrorBadRequest(
756 "To is not a valid timestamp.",
757 ))?;
758
759 let chain_id = i64::from_str(
760 &info
761 .chain
762 .clone()
763 .ok_or(actix_web::error::ErrorBadRequest("Chain id not found"))?,
764 )
765 .map_err(|err| actix_web::error::ErrorBadRequest(format!("Chain id a valid {err}")))?;
766
767 let conn = data.db_connection.lock().await.clone();
768 let transf = if let Some(receiver) = account_str.as_ref() {
769 let transf =
770 get_all_chain_transfers_by_receiver_ext(&conn, chain_id, from, to, receiver, None)
771 .await;
772 transf.map_err(|err| {
773 actix_web::error::ErrorBadRequest(format!("Unknown server error: {}", err))
774 })?
775 } else {
776 let transf = get_all_chain_transfers_ext(&conn, chain_id, from, to, None).await;
777 transf.map_err(|err| {
778 actix_web::error::ErrorBadRequest(format!("Unknown server error: {}", err))
779 })?
780 };
781
782 let mut resp = Vec::new();
783 for trans in transf.into_iter() {
784 let Some(blockchain_date) = trans.blockchain_date else {
785 continue;
786 };
787
788 if blockchain_date < from {
789 continue;
790 }
791 if let Some(account_str) = account_str.as_ref() {
792 if trans.receiver_addr != *account_str {
793 continue;
794 }
795 }
796
797 resp.push(ChainTransferRespObj {
798 id: trans.id,
799 from_addr: trans.from_addr,
800 receiver_addr: trans.receiver_addr,
801 chain_id: trans.chain_id,
802 token_addr: trans.token_addr,
803 token_amount: trans.token_amount,
804 tx_hash: trans.tx_hash,
805 block_number: trans.block_number,
806 fee_paid: trans.fee_paid,
807 block_date: blockchain_date,
808 block_timestamp: blockchain_date.timestamp(),
809 to_addr: trans.to_addr,
810 caller_addr: trans.caller_addr,
811 })
812 }
813
814 let time_end = time_start.elapsed().as_secs_f64();
815 Ok(web::Json(StatsTransferResult {
817 request_time: time_end,
818 transfers: resp,
819 }))
820}
821
822pub async fn transfers(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
823 let tx_id = req
824 .match_info()
825 .get("tx_id")
826 .map(|tx_id| i64::from_str(tx_id).ok())
827 .unwrap_or(None);
828
829 let transfers = {
832 let db_conn = data.db_connection.lock().await;
833 if let Some(tx_id) = tx_id {
834 match get_token_transfers_by_tx(&*db_conn, tx_id).await {
835 Ok(allowances) => allowances,
836 Err(err) => {
837 return web::Json(json!({
838 "error": err.to_string()
839 }));
840 }
841 }
842 } else {
843 match get_all_token_transfers(&db_conn, None).await {
844 Ok(allowances) => allowances,
845 Err(err) => {
846 return web::Json(json!({
847 "error": err.to_string()
848 }));
849 }
850 }
851 }
852 };
853
854 web::Json(json!({
872 "transfers": transfers,
873 }))
874}
875
876#[derive(Serialize)]
877#[serde(rename_all = "camelCase")]
878struct AccountBalanceResponse {
879 network_id: i64,
880 account: String,
881 gas_balance: String,
882 token_balance: String,
883 block_number: u64,
884 block_date: chrono::DateTime<chrono::Utc>,
885}
886
887async fn account_balance(
888 data: Data<Box<ServerData>>,
889 req: HttpRequest,
890) -> actix_web::Result<web::Json<AccountBalanceResponse>> {
891 let account = Address::from_str(
892 req.match_info()
893 .get("account")
894 .ok_or(actix_web::error::ErrorBadRequest("account not found"))?,
895 )
896 .map_err(|err| {
897 actix_web::error::ErrorBadRequest(format!("account has to be valid address {err}"))
898 })?;
899 let network_id = i64::from_str(
900 req.match_info()
901 .get("chain")
902 .ok_or(actix_web::error::ErrorBadRequest("chain-id not found"))?,
903 )
904 .map_err(|err| actix_web::error::ErrorBadRequest(format!("chain-id has to be int {err}")))?;
905
906 let chain = data
907 .payment_setup
908 .chain_setup
909 .get(&network_id)
910 .ok_or(actix_web::error::ErrorBadRequest("No config found"))?;
911
912 let args = GetBalanceArgs {
913 address: Default::default(),
914 token_address: Some(chain.glm_address),
915 call_with_details: chain.wrapper_contract_address,
916 block_number: None,
917 chain_id: Some(chain.chain_id as u64),
918 };
919 let balance_result = get_balance(chain.provider.clone(), args)
920 .await
921 .map_err(|err| {
922 actix_web::error::ErrorInternalServerError(format!("Failed to get balance {err}"))
923 })?;
924
925 Ok(web::Json(AccountBalanceResponse {
926 network_id,
927 account: format!("{:#x}", account),
928 gas_balance: balance_result
929 .gas_balance
930 .map(|b| b.to_string())
931 .unwrap_or("0".to_string()),
932 token_balance: balance_result
933 .token_balance
934 .map(|b| b.to_string())
935 .unwrap_or("0".to_string()),
936 block_number: balance_result.block_number,
937 block_date: balance_result.block_datetime,
938 }))
939}
940
941pub async fn accounts(data: Data<Box<ServerData>>, _req: HttpRequest) -> impl Responder {
942 let public_addr = data
947 .shared_state
948 .lock()
949 .unwrap()
950 .accounts
951 .iter()
952 .map(|sk| format!("{:#x}", sk.address))
953 .collect::<Vec<String>>();
954
955 web::Json(json!({
956 "publicAddr": public_addr
957 }))
958}
959
960pub async fn account_payments_in(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
961 let account = return_on_error!(req.match_info().get("account").ok_or("No account provided"));
962 let web3_account = return_on_error!(Address::from_str(account));
963 let account = format!("{web3_account:#x}");
964
965 let transfers_in = {
966 let db_conn = data.db_connection.lock().await;
967 return_on_error!(get_account_transfers_in(&db_conn, &account, None).await)
968 };
969 web::Json(json!({
975 "transfersIn": transfers_in,
976 }))
978}
979
980pub async fn account_details(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
981 let account = return_on_error!(req.match_info().get("account").ok_or("No account provided"));
982
983 let web3_account = return_on_error!(Address::from_str(account));
984
985 let account = format!("{web3_account:#x}");
986
987 let is_sender = if let Some(addr) = data
988 .shared_state
989 .lock()
990 .unwrap()
991 .accounts
992 .iter()
993 .map(|acc| format!("{:#x}", acc.address))
994 .find(|addr| addr == &account)
995 {
996 log::debug!("Found account: {}", addr);
997 true
998 } else {
999 false
1000 };
1001 let allowances = {
1002 let db_conn = data.db_connection.lock().await;
1003 return_on_error!(get_allowances_by_owner(&db_conn, &account).await)
1004 };
1005
1006 let mut queued_transfer_count = 0;
1007 let mut processed_transfer_count = 0;
1008 let mut done_transfer_count = 0;
1009
1010 if is_sender {
1011 queued_transfer_count = {
1012 let db_conn = data.db_connection.lock().await;
1013 return_on_error!(
1014 get_transfer_count(&db_conn, Some(TRANSFER_FILTER_QUEUED), Some(&account), None)
1015 .await
1016 )
1017 };
1018 processed_transfer_count = {
1019 let db_conn = data.db_connection.lock().await;
1020 return_on_error!(
1021 get_transfer_count(
1022 &db_conn,
1023 Some(TRANSFER_FILTER_PROCESSING),
1024 Some(&account),
1025 None
1026 )
1027 .await
1028 )
1029 };
1030 done_transfer_count = {
1031 let db_conn = data.db_connection.lock().await;
1032 return_on_error!(
1033 get_transfer_count(&db_conn, Some(TRANSFER_FILTER_DONE), Some(&account), None)
1034 .await
1035 )
1036 };
1037 }
1038 let received_transfer_count = {
1039 let db_conn = data.db_connection.lock().await;
1040
1041 return_on_error!(
1042 get_transfer_count(&db_conn, Some(TRANSFER_FILTER_ALL), None, Some(&account)).await
1043 )
1044 };
1045
1046 web::Json(json!({
1047 "account": account,
1048 "allowances": allowances,
1049 "transfersQueued": queued_transfer_count,
1050 "transfersProcessing": processed_transfer_count,
1051 "transfersDone": done_transfer_count,
1052 "receivedTransfers": received_transfer_count,
1053 }))
1054}
1055
1056pub async fn redirect_to_slash(req: HttpRequest) -> impl Responder {
1057 let mut response = HttpResponse::Ok();
1058 let target = match HeaderValue::from_str(&(req.uri().to_string() + "/")) {
1059 Ok(target) => target,
1060 Err(_err) => {
1061 return HttpResponse::InternalServerError().body("Failed to create redirect target");
1062 }
1063 };
1064
1065 response
1066 .status(StatusCode::PERMANENT_REDIRECT)
1067 .append_header((header::LOCATION, target))
1068 .finish()
1069}
1070
1071pub async fn metrics(_req: HttpRequest) -> impl Responder {
1072 export_metrics_to_prometheus().unwrap_or_else(|err| {
1073 log::error!("Failed to export metrics: {}", err);
1074 format!("Failed to export metrics: {}", err)
1075 })
1076}
1077
1078pub async fn greet(_req: HttpRequest) -> impl Responder {
1079 const VERSION: &str = env!("CARGO_PKG_VERSION");
1080 web::Json(json!({
1081 "name": "erc20_payment_lib",
1082 "version": VERSION,
1083 }))
1084}
1085
1086pub async fn faucet(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
1087 let target_addr = req.match_info().get("addr").unwrap_or("");
1088 let chain_id = req.match_info().get("chain").unwrap_or("");
1089 if !target_addr.is_empty() {
1090 let receiver_addr = return_on_error!(web3::types::Address::from_str(target_addr));
1091
1092 let chain_id = return_on_error!(i64::from_str(chain_id));
1093
1094 let chain: &ChainSetup = return_on_error!(data
1095 .payment_setup
1096 .chain_setup
1097 .get(&(chain_id))
1098 .ok_or("No config for given chain id"));
1099 let faucet_event_idx = format!("{receiver_addr:#x}_{chain_id}");
1100
1101 {
1102 let mut shared_state = data.shared_state.lock().unwrap();
1103 let faucet_data = match shared_state.faucet {
1104 Some(ref mut faucet_data) => faucet_data,
1105 None => {
1106 shared_state.faucet = Some(FaucetData {
1107 faucet_events: BTreeMap::new(),
1108 last_cleanup: chrono::Utc::now(),
1109 });
1110 shared_state
1111 .faucet
1112 .as_mut()
1113 .expect("Faucet data should be set here")
1114 }
1115 };
1116
1117 const MIN_SECONDS: i64 = 120;
1118 if let Some(el) = faucet_data.faucet_events.get(&faucet_event_idx) {
1119 let ago = (chrono::Utc::now().time() - el.time()).num_seconds();
1120 if ago < MIN_SECONDS {
1121 return web::Json(json!({
1122 "error": format!("Already sent to this address {ago} seconds ago. Try again after {MIN_SECONDS} seconds")
1123 }));
1124 } else {
1125 faucet_data
1126 .faucet_events
1127 .insert(faucet_event_idx, chrono::Utc::now());
1128 }
1129 } else {
1130 faucet_data
1131 .faucet_events
1132 .insert(faucet_event_idx, chrono::Utc::now());
1133 }
1134
1135 const FAUCET_CLEANUP_AFTER: i64 = 120;
1137 let curr_time = chrono::Utc::now();
1138 if (curr_time.time() - faucet_data.last_cleanup.time()).num_seconds()
1139 > FAUCET_CLEANUP_AFTER
1140 {
1141 faucet_data.last_cleanup = curr_time;
1142 faucet_data
1143 .faucet_events
1144 .retain(|_, v| (curr_time.time() - v.time()).num_seconds() < MIN_SECONDS);
1145 }
1146 }
1147
1148 let glm_address = chain.glm_address;
1149
1150 let from = data
1151 .shared_state
1152 .lock()
1153 .unwrap()
1154 .accounts
1155 .first()
1156 .unwrap()
1157 .address;
1158
1159 let faucet_eth_amount = return_on_error!(chain
1160 .faucet_eth_amount
1161 .ok_or("Faucet amount not set on chain"));
1162 let faucet_glm_amount = return_on_error!(chain
1163 .faucet_glm_amount
1164 .ok_or("Faucet GLM amount not set on chain"));
1165
1166 let token_transfer_eth = {
1167 let tt = create_token_transfer(
1168 from,
1169 receiver_addr,
1170 chain_id,
1171 Some(&uuid::Uuid::new_v4().to_string()),
1172 None,
1173 faucet_eth_amount,
1174 None,
1175 );
1176 let db_conn = data.db_connection.lock().await;
1177 return_on_error!(insert_token_transfer(&*db_conn, &tt).await)
1178 };
1179 let token_transfer_glm = {
1180 let tt = create_token_transfer(
1181 from,
1182 receiver_addr,
1183 chain_id,
1184 Some(&uuid::Uuid::new_v4().to_string()),
1185 Some(glm_address),
1186 faucet_glm_amount,
1187 None,
1188 );
1189 let db_conn = data.db_connection.lock().await;
1190 return_on_error!(insert_token_transfer(&*db_conn, &tt).await)
1191 };
1192
1193 return web::Json(json!({
1194 "transfer_gas_id": token_transfer_eth.id,
1195 "transfer_gas_payment_id": token_transfer_eth.payment_id,
1196 "transfer_glm_id": token_transfer_glm.id,
1197 "transfer_glm_payment_id": token_transfer_glm.payment_id,
1198 }));
1199 }
1200
1201 web::Json(json!({
1202 "status": "faucet enabled"
1203 }))
1204}
1205
1206#[derive(Debug, Serialize)]
1207struct AttestationItemInfo {
1208 name: String,
1209 #[serde(rename = "type")]
1210 typ: String,
1211 value: serde_json::Value,
1212}
1213
1214#[derive(Debug, Serialize)]
1215#[serde(rename_all = "camelCase")]
1216pub struct AttestationCheckResult {
1217 chain_id: u64,
1218 chain: String,
1219 attestation: Attestation,
1220 schema: AttestationSchema,
1221 params: Vec<AttestationItemInfo>,
1222}
1223
1224fn ethabi_token_to_json(token: ðabi::Token) -> serde_json::Value {
1225 match token {
1226 ethabi::Token::Address(addr) => serde_json::Value::String(format!("{:#x}", addr)),
1227 ethabi::Token::FixedBytes(bytes) => {
1228 serde_json::Value::String(format!("0x{}", hex::encode(bytes)))
1229 }
1230 ethabi::Token::Int(int) => {
1231 if int <= &U256::from(2147483647) {
1232 serde_json::Value::Number(serde_json::Number::from(int.as_u32()))
1233 } else {
1234 serde_json::Value::String(format!("{}", int))
1235 }
1236 }
1237 ethabi::Token::Uint(uint) => {
1238 if uint <= &U256::from(2147483647) {
1239 serde_json::Value::Number(serde_json::Number::from(uint.as_u32()))
1240 } else {
1241 serde_json::Value::String(format!("{}", uint))
1242 }
1243 }
1244 ethabi::Token::Bool(b) => serde_json::Value::Bool(*b),
1245 ethabi::Token::String(s) => serde_json::Value::String(s.clone()),
1246 ethabi::Token::Bytes(bytes) => {
1247 serde_json::Value::String(format!("0x{}", hex::encode(bytes)))
1248 }
1249 ethabi::Token::Array(vec) | ethabi::Token::FixedArray(vec) | ethabi::Token::Tuple(vec) => {
1250 serde_json::Value::Array(
1251 vec.iter()
1252 .map(ethabi_token_to_json)
1253 .collect::<Vec<serde_json::Value>>(),
1254 )
1255 }
1256 }
1257}
1258
1259pub async fn check_attestation(
1260 data: Data<Box<ServerData>>,
1261 req: HttpRequest,
1262) -> actix_web::Result<web::Json<AttestationCheckResult>> {
1263 let attestation_uid = req.match_info().get("uid").unwrap_or("");
1264 let chain_name = req.match_info().get("chain").unwrap_or("");
1265 let chain: &ChainSetup = data
1266 .payment_setup
1267 .chain_setup
1268 .iter()
1269 .find(|(_, chain)| chain.network == chain_name)
1270 .ok_or(actix_web::error::ErrorBadRequest(format!(
1271 "No config found for network {}",
1272 chain_name
1273 )))?
1274 .1;
1275
1276 let web3 = data
1277 .payment_setup
1278 .get_provider(chain.chain_id)
1279 .map_err(|e| ErrorBadRequest(format!("Failed to get provider: {}", e)))?;
1280
1281 let decoded_bytes = match hex::decode(attestation_uid.replace("0x", "")) {
1282 Ok(bytes) => bytes,
1283 Err(e) => {
1284 return Err(ErrorBadRequest(format!(
1285 "Failed to decode attestation id: {}",
1286 e
1287 )));
1288 }
1289 };
1290
1291 let contract = chain
1292 .eas_contract_settings
1293 .clone()
1294 .ok_or(ErrorBadRequest(format!(
1295 "No contract settings found for chain {}",
1296 chain_name
1297 )))?;
1298
1299 let schema_contract = chain
1300 .eas_schema_registry_settings
1301 .clone()
1302 .ok_or(ErrorBadRequest(format!(
1303 "No schema contract settings found for chain {}",
1304 chain_name
1305 )))?;
1306
1307 let uid = ethabi::Bytes::from(decoded_bytes);
1308
1309 let uid = if uid.len() != 32 {
1310 return Err(ErrorBadRequest(format!(
1311 "Invalid attestation id length: {}, expected 32",
1312 uid.len()
1313 )));
1314 } else {
1315 H256::from_slice(uid.as_slice())
1316 };
1317 log::info!("Querying attestation contract: {:#x}", contract.address);
1318
1319 let attestation = match get_attestation_details(web3.clone(), uid, contract.address).await {
1320 Ok(Some(attestation)) => attestation,
1321 Ok(None) => {
1322 return Err(ErrorBadRequest(format!(
1323 "Attestation with uid: {:#x} not found on chain {}",
1324 uid, chain_name
1325 )));
1326 }
1327 Err(e) => {
1328 log::error!("Failed to get attestation details: {}", e);
1329 return Err(ErrorBadRequest(format!(
1330 "Failed to get attestation details: {}",
1331 e
1332 )));
1333 }
1334 };
1335
1336 let attestation_schema =
1337 match get_schema_details(web3, attestation.schema, schema_contract.address).await {
1338 Ok(attestation_schema) => attestation_schema,
1339 Err(e) => {
1340 log::error!("Failed to get attestation details: {}", e);
1341 return Err(ErrorBadRequest(format!(
1342 "Failed to get attestation details: {}",
1343 e
1344 )));
1345 }
1346 };
1347
1348 log::info!("Querying schema contract: {:#x}", schema_contract.address);
1349
1350 println!(
1351 "attestation: {}",
1352 serde_json::to_string_pretty(&attestation).map_err(|e| ErrorBadRequest(format!(
1353 "Failed to serialize attestation details: {}",
1354 e
1355 )))?
1356 );
1357
1358 println!(
1359 "schema: {}",
1360 serde_json::to_string_pretty(&attestation_schema).map_err(|e| ErrorBadRequest(format!(
1361 "Failed to serialize attestation details: {}",
1362 e
1363 )))?
1364 );
1365
1366 let items = attestation_schema.schema.split(',').collect::<Vec<&str>>();
1367 log::debug!("There are {} items in the schema", items.len());
1368 let mut param_types = Vec::new();
1369 let mut param_names = Vec::new();
1370
1371 for item in items {
1372 let items2 = item.trim().split(' ').collect::<Vec<&str>>();
1373 if items2.len() != 2 {
1374 log::error!("Invalid item in schema: {}", item);
1375 return Err(ErrorBadRequest(format!("Invalid item in schema: {}", item)));
1376 }
1377 let item_type = items2[0].trim();
1378 let item_name = items2[1].trim();
1379
1380 log::debug!("Item name: {}, Item type: {}", item_name, item_type);
1381 let param_type = ethabi::param_type::Reader::read(item_type)
1382 .map_err(|e| ErrorBadRequest(format!("Failed to read param type: {}", e)))?;
1383 param_types.push(param_type);
1384 param_names.push(item_name);
1385 }
1386
1387 let decoded_tokens = ethabi::decode(¶m_types, &attestation.data.0)
1388 .map_err(|e| ErrorBadRequest(format!("Failed to decode attestation data: {}", e)))?;
1389
1390 let mut decoded_items = Vec::new();
1391 for ((token, token_name), token_type) in decoded_tokens
1392 .iter()
1393 .zip(param_names.iter())
1394 .zip(param_types.iter())
1395 {
1396 println!("Token {}: {}", token_name, token);
1397 decoded_items.push(AttestationItemInfo {
1398 name: token_name.to_string(),
1399 typ: token_type.to_string(),
1400 value: ethabi_token_to_json(token),
1401 });
1402 }
1403
1404 Ok(web::Json(AttestationCheckResult {
1405 chain_id: chain.chain_id as u64,
1406 chain: chain_name.to_string(),
1407 attestation,
1408 schema: attestation_schema,
1409 params: decoded_items,
1410 }))
1411}
1412
1413pub fn runtime_web_scope(
1414 scope: Scope,
1415 server_data: Data<Box<ServerData>>,
1416 enable_faucet: bool,
1417 enable_transfers: bool,
1418 debug: bool,
1419 frontend: bool,
1420) -> Scope {
1421 let api_scope = Scope::new("/api");
1422 let mut api_scope = api_scope
1423 .app_data(server_data)
1424 .route(
1425 "/attestation/{chain}/{uid}",
1426 web::get().to(check_attestation),
1427 )
1428 .route("/allowances", web::get().to(allowances))
1429 .route("/balance/{account}/{chain}", web::get().to(account_balance))
1430 .route("/rpc_pool", web::get().to(rpc_pool))
1431 .route("/rpc_pool/metrics", web::get().to(rpc_pool_metrics))
1432 .route("/config", web::get().to(config_endpoint))
1433 .route("/stats/transfers", web::get().to(stats_transfers))
1434 .route("/transactions", web::get().to(transactions))
1435 .route("/transactions/count", web::get().to(transactions_count))
1436 .route("/transactions/next", web::get().to(transactions_next))
1437 .route(
1438 "/transactions/feed/{prev}/{next}",
1439 web::get().to(transactions_feed),
1440 )
1441 .route(
1442 "/transactions/next/{count}",
1443 web::get().to(transactions_next),
1444 )
1445 .route("/transactions/current", web::get().to(transactions_current))
1446 .route(
1447 "/transactions/last",
1448 web::get().to(transactions_last_processed),
1449 )
1450 .route(
1451 "/transactions/last/{count}",
1452 web::get().to(transactions_last_processed),
1453 )
1454 .route("/tx/skip/{tx_id}", web::post().to(skip_pending_operation))
1455 .route("/tx/{tx_id}", web::get().to(tx_details))
1456 .route("/transfers", web::get().to(transfers))
1457 .route("/transfers/{tx_id}", web::get().to(transfers))
1458 .route("/accounts", web::get().to(accounts))
1459 .route("/account/{account}", web::get().to(account_details))
1460 .route("/account/{account}/in", web::get().to(account_payments_in))
1461 .route("/metrics", web::get().to(metrics))
1462 .route("/", web::get().to(greet))
1463 .route(
1464 "/event_stream",
1465 web::get().to(event_stream_websocket_endpoint),
1466 )
1467 .route("/version", web::get().to(greet));
1468
1469 if enable_transfers {
1470 api_scope = api_scope.route("/transfers/new", web::post().to(new_transfer))
1471 }
1472 if enable_faucet {
1473 log::info!("Faucet endpoints enabled");
1474 api_scope = api_scope.route("/faucet", web::get().to(faucet));
1475 api_scope = api_scope.route("/faucet/{chain}/{addr}", web::get().to(faucet));
1476 }
1477 if debug {
1478 log::info!("Debug endpoints enabled");
1479 api_scope = api_scope.route("/debug", web::get().to(debug_endpoint));
1480 }
1481
1482 let scope = scope.route("/api", web::get().to(greet));
1484 let mut scope = scope.service(api_scope);
1485
1486 if frontend {
1487 log::info!("Frontend endpoint enabled");
1488 let static_files = actix_files::Files::new("/frontend", "./frontend/dist")
1490 .index_file("index.html")
1491 .default_handler(|req: ServiceRequest| {
1492 let (http_req, _payload) = req.into_parts();
1493
1494 async {
1495 let response = NamedFile::open("./frontend/dist/index.html")
1496 .unwrap()
1497 .into_response(&http_req);
1498 Ok(ServiceResponse::new(http_req, response))
1499 }
1500 });
1501
1502 scope = scope.route("/frontend", web::get().to(redirect_to_slash));
1503 scope = scope.service(static_files);
1504 }
1505 scope
1506}