pipegate/middleware/stream_payment/
verify.rs

1use alloy::{
2    hex::{self},
3    primitives::Signed,
4    providers::ProviderBuilder,
5    sol,
6};
7use reqwest::Client;
8use serde_json::json;
9
10use crate::{
11    error::AuthError,
12    middleware::stream_payment::{
13        types::{SignedStream, StreamsConfig},
14        utils::create_stream_message,
15    },
16};
17
18sol!(
19    #[allow(missing_docs)]
20    #[sol(rpc)]
21    CFAv1Forwarder,
22    "src/abi/CFAv1Forwarder.json"
23);
24
25#[allow(dead_code)]
26pub(crate) async fn verify_stream_via_indexer(
27    stream: SignedStream,
28    config: StreamsConfig,
29) -> Result<bool, AuthError> {
30    // Creating the message
31    let reconstructed_message = create_stream_message(stream.sender);
32    println!("Message: 0x{}", hex::encode(&reconstructed_message));
33
34    let signature = stream.signature;
35    println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
36
37    // Recovering the address from the signature
38    let recovered = match signature.recover_address_from_msg(reconstructed_message) {
39        Ok(address) => address,
40        Err(_) => return Err(AuthError::InvalidSignature),
41    };
42    println!("Recovered address: {}", recovered);
43
44    // Verify the recovered address against the sender for the stream
45    if recovered != stream.sender {
46        println!("Failed: Recovered address mismatch");
47        return Err(AuthError::InvalidSignature);
48    }
49
50    let client = Client::new();
51
52    let url = "https://subgraph-endpoints.superfluid.dev/base-sepolia/protocol-v1";
53
54    let query = json!({
55        "query": "query ($recipient: String!, $sender: String!, $amount: String!, $token: String!) {
56            account(id: $recipient) {
57                inflows(where: { sender: $sender, currentFlowRate: $amount, token: $token }) {
58                    currentFlowRate
59                    createdAtTimestamp
60                    id
61                }
62            }
63        }",
64        "variables": {
65            "recipient": config.recipient.to_string().to_lowercase(),
66            "sender": stream.sender.to_string().to_lowercase(),
67            "amount": config.amount.to_string(),
68            "token": config.token_address.to_string().to_lowercase(),
69        }
70    });
71
72    let response = client
73        .post(url)
74        .header("Content-Type", "application/json")
75        .json(&query)
76        .send()
77        .await
78        .map_err(|e| {
79            println!("Failed: Network error: {}", e);
80            AuthError::NetworkError("Failed to fetch stream data from indexer".to_string())
81        })?
82        .json::<serde_json::Value>()
83        .await
84        .map_err(|e| {
85            println!("Failed: JSON parse error: {}", e);
86            AuthError::NetworkError("Failed to parse stream data from indexer".to_string())
87        })?;
88
89    if let Some(inflows) = response["data"]["account"]["inflows"].as_array() {
90        if !inflows.is_empty() {
91            println!("✅ Stream is active! Inflow record found.");
92        } else {
93            println!("❌ No active inflow detected.");
94            return Err(AuthError::InvalidStream(
95                "No active inflow detected".to_string(),
96            ));
97        }
98    } else {
99        println!("❌ No inflow data found.");
100        return Err(AuthError::InvalidStream("No inflow data found".to_string()));
101    }
102
103    Ok(true)
104}
105
106pub async fn verify_stream(stream: SignedStream, config: StreamsConfig) -> Result<bool, AuthError> {
107    // Creating the message
108    let reconstructed_message = create_stream_message(stream.sender);
109    println!("Message: 0x{}", hex::encode(&reconstructed_message));
110
111    let signature = stream.signature;
112    println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
113
114    // Recovering the address from the signature
115    let recovered = match signature.recover_address_from_msg(reconstructed_message) {
116        Ok(address) => address,
117        Err(_) => return Err(AuthError::InvalidSignature),
118    };
119    println!("Recovered address: {}", recovered);
120
121    // Verify the recovered address against the sender for the stream
122    if recovered != stream.sender {
123        println!("Failed: Recovered address mismatch");
124        return Err(AuthError::InvalidSignature);
125    }
126
127    let provider = ProviderBuilder::new().on_http(config.rpc_url.parse().unwrap());
128
129    let cfav1_forwarder = CFAv1Forwarder::new(config.cfa_forwarder, provider);
130
131    // Fetch the stream flow from sender to recipient, if it exists, using CFAv1Forwarder
132    let flow_info = cfav1_forwarder
133        .getFlowInfo(config.token_address, stream.sender, config.recipient)
134        .call()
135        .await
136        .map_err(|e| AuthError::ContractError(e.to_string()))?;
137
138    // Check if the flow exists
139    if flow_info.flowrate == Signed::ZERO {
140        println!("Failed: No stream flow found");
141        return Err(AuthError::InvalidStream("No stream flow found".to_string()));
142    } else {
143        println!("Stream flow found");
144        println!("Flow rate: {}", flow_info.flowrate);
145        // check the flowRate matches with what recipient expects
146        if flow_info.flowrate != config.amount {
147            println!("Failed: Invalid stream flow rate");
148            return Err(AuthError::InvalidStream(
149                "Invalid stream flow rate".to_string(),
150            ));
151        }
152    }
153
154    Ok(true)
155}