use alloy::{
hex::{self},
primitives::Signed,
providers::ProviderBuilder,
sol,
};
use reqwest::Client;
use serde_json::json;
use crate::{
error::AuthError,
middleware::stream_payment::{
types::{SignedStream, StreamsConfig},
utils::create_stream_message,
},
};
sol!(
#[allow(missing_docs)]
#[sol(rpc)]
CFAv1Forwarder,
"src/abi/CFAv1Forwarder.json"
);
#[allow(dead_code)]
pub(crate) async fn verify_stream_via_indexer(
stream: SignedStream,
config: StreamsConfig,
) -> Result<bool, AuthError> {
let reconstructed_message = create_stream_message(stream.sender);
println!("Message: 0x{}", hex::encode(&reconstructed_message));
let signature = stream.signature;
println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
let recovered = match signature.recover_address_from_msg(reconstructed_message) {
Ok(address) => address,
Err(_) => return Err(AuthError::InvalidSignature),
};
println!("Recovered address: {}", recovered);
if recovered != stream.sender {
println!("Failed: Recovered address mismatch");
return Err(AuthError::InvalidSignature);
}
let client = Client::new();
let url = "https://subgraph-endpoints.superfluid.dev/base-sepolia/protocol-v1";
let query = json!({
"query": "query ($recipient: String!, $sender: String!, $amount: String!, $token: String!) {
account(id: $recipient) {
inflows(where: { sender: $sender, currentFlowRate: $amount, token: $token }) {
currentFlowRate
createdAtTimestamp
id
}
}
}",
"variables": {
"recipient": config.recipient.to_string().to_lowercase(),
"sender": stream.sender.to_string().to_lowercase(),
"amount": config.amount.to_string(),
"token": config.token_address.to_string().to_lowercase(),
}
});
let response = client
.post(url)
.header("Content-Type", "application/json")
.json(&query)
.send()
.await
.map_err(|e| {
println!("Failed: Network error: {}", e);
AuthError::NetworkError("Failed to fetch stream data from indexer".to_string())
})?
.json::<serde_json::Value>()
.await
.map_err(|e| {
println!("Failed: JSON parse error: {}", e);
AuthError::NetworkError("Failed to parse stream data from indexer".to_string())
})?;
if let Some(inflows) = response["data"]["account"]["inflows"].as_array() {
if !inflows.is_empty() {
println!("✅ Stream is active! Inflow record found.");
} else {
println!("❌ No active inflow detected.");
return Err(AuthError::InvalidStream(
"No active inflow detected".to_string(),
));
}
} else {
println!("❌ No inflow data found.");
return Err(AuthError::InvalidStream("No inflow data found".to_string()));
}
Ok(true)
}
pub async fn verify_stream(stream: SignedStream, config: StreamsConfig) -> Result<bool, AuthError> {
let reconstructed_message = create_stream_message(stream.sender);
println!("Message: 0x{}", hex::encode(&reconstructed_message));
let signature = stream.signature;
println!("Signature: 0x{}", hex::encode(&signature.as_bytes()));
let recovered = match signature.recover_address_from_msg(reconstructed_message) {
Ok(address) => address,
Err(_) => return Err(AuthError::InvalidSignature),
};
println!("Recovered address: {}", recovered);
if recovered != stream.sender {
println!("Failed: Recovered address mismatch");
return Err(AuthError::InvalidSignature);
}
let provider = ProviderBuilder::new().on_http(config.rpc_url.parse().unwrap());
let cfav1_forwarder = CFAv1Forwarder::new(config.cfa_forwarder, provider);
let flow_info = cfav1_forwarder
.getFlowInfo(config.token_address, stream.sender, config.recipient)
.call()
.await
.map_err(|e| AuthError::ContractError(e.to_string()))?;
if flow_info.flowrate == Signed::ZERO {
println!("Failed: No stream flow found");
return Err(AuthError::InvalidStream("No stream flow found".to_string()));
} else {
println!("Stream flow found");
println!("Flow rate: {}", flow_info.flowrate);
if flow_info.flowrate != config.amount {
println!("Failed: Invalid stream flow rate");
return Err(AuthError::InvalidStream(
"Invalid stream flow rate".to_string(),
));
}
}
Ok(true)
}