pipegate/middleware/stream_payment/
verify.rs1use alloy::{
2 hex::{self},
3 primitives::Signed,
4 providers::ProviderBuilder,
5 sol,
6};
7use reqwest::Client;
8use serde_json::json;
9
10use crate::{
11 error::AuthError,
12 middleware::stream_payment::{
13 types::{SignedStream, StreamsConfig},
14 utils::create_stream_message,
15 },
16};
17
18sol!(
19 #[allow(missing_docs)]
20 #[sol(rpc)]
21 CFAv1Forwarder,
22 "src/abi/CFAv1Forwarder.json"
23);
24
25#[allow(dead_code)]
26pub(crate) async fn verify_stream_via_indexer(
27 stream: SignedStream,
28 config: StreamsConfig,
29) -> Result<bool, AuthError> {
30 let reconstructed_message = create_stream_message(stream.sender);
32 println!("Message: 0x{}", hex::encode(&reconstructed_message));
33
34 let signature = stream.signature;
35 println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
36
37 let recovered = match signature.recover_address_from_msg(reconstructed_message) {
39 Ok(address) => address,
40 Err(_) => return Err(AuthError::InvalidSignature),
41 };
42 println!("Recovered address: {}", recovered);
43
44 if recovered != stream.sender {
46 println!("Failed: Recovered address mismatch");
47 return Err(AuthError::InvalidSignature);
48 }
49
50 let client = Client::new();
51
52 let url = "https://subgraph-endpoints.superfluid.dev/base-sepolia/protocol-v1";
53
54 let query = json!({
55 "query": "query ($recipient: String!, $sender: String!, $amount: String!, $token: String!) {
56 account(id: $recipient) {
57 inflows(where: { sender: $sender, currentFlowRate: $amount, token: $token }) {
58 currentFlowRate
59 createdAtTimestamp
60 id
61 }
62 }
63 }",
64 "variables": {
65 "recipient": config.recipient.to_string().to_lowercase(),
66 "sender": stream.sender.to_string().to_lowercase(),
67 "amount": config.amount.to_string(),
68 "token": config.token_address.to_string().to_lowercase(),
69 }
70 });
71
72 let response = client
73 .post(url)
74 .header("Content-Type", "application/json")
75 .json(&query)
76 .send()
77 .await
78 .map_err(|e| {
79 println!("Failed: Network error: {}", e);
80 AuthError::NetworkError("Failed to fetch stream data from indexer".to_string())
81 })?
82 .json::<serde_json::Value>()
83 .await
84 .map_err(|e| {
85 println!("Failed: JSON parse error: {}", e);
86 AuthError::NetworkError("Failed to parse stream data from indexer".to_string())
87 })?;
88
89 if let Some(inflows) = response["data"]["account"]["inflows"].as_array() {
90 if !inflows.is_empty() {
91 println!("✅ Stream is active! Inflow record found.");
92 } else {
93 println!("❌ No active inflow detected.");
94 return Err(AuthError::InvalidStream(
95 "No active inflow detected".to_string(),
96 ));
97 }
98 } else {
99 println!("❌ No inflow data found.");
100 return Err(AuthError::InvalidStream("No inflow data found".to_string()));
101 }
102
103 Ok(true)
104}
105
106pub async fn verify_stream(stream: SignedStream, config: StreamsConfig) -> Result<bool, AuthError> {
107 let reconstructed_message = create_stream_message(stream.sender);
109 println!("Message: 0x{}", hex::encode(&reconstructed_message));
110
111 let signature = stream.signature;
112 println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
113
114 let recovered = match signature.recover_address_from_msg(reconstructed_message) {
116 Ok(address) => address,
117 Err(_) => return Err(AuthError::InvalidSignature),
118 };
119 println!("Recovered address: {}", recovered);
120
121 if recovered != stream.sender {
123 println!("Failed: Recovered address mismatch");
124 return Err(AuthError::InvalidSignature);
125 }
126
127 let provider = ProviderBuilder::new().on_http(config.rpc_url.parse().unwrap());
128
129 let cfav1_forwarder = CFAv1Forwarder::new(config.cfa_forwarder, provider);
130
131 let flow_info = cfav1_forwarder
133 .getFlowInfo(config.token_address, stream.sender, config.recipient)
134 .call()
135 .await
136 .map_err(|e| AuthError::ContractError(e.to_string()))?;
137
138 if flow_info.flowrate == Signed::ZERO {
140 println!("Failed: No stream flow found");
141 return Err(AuthError::InvalidStream("No stream flow found".to_string()));
142 } else {
143 println!("Stream flow found");
144 println!("Flow rate: {}", flow_info.flowrate);
145 if flow_info.flowrate != config.amount {
147 println!("Failed: Invalid stream flow rate");
148 return Err(AuthError::InvalidStream(
149 "Invalid stream flow rate".to_string(),
150 ));
151 }
152 }
153
154 Ok(true)
155}