use super::keeper::{BackgroundKeeper, KeeperConfig, KeeperError, KeeperHandle, KeeperResult};
use alloy::primitives::Address;
use alloy::sol;
use blueprint_core::{debug, info, warn};
use std::time::Instant;
use tokio::sync::broadcast;
sol! {
#[sol(rpc)]
interface ITangle {
function serviceCount() external view returns (uint64);
function getService(uint64 serviceId) external view returns (Service memory);
function getBillableServices(uint64[] calldata serviceIds) external view returns (uint64[] memory);
function billSubscriptionBatch(uint64[] calldata serviceIds) external returns (uint256 billed, uint256 failed);
struct Service {
uint64 blueprintId;
address owner;
uint64 createdAt;
uint64 ttl;
uint64 terminatedAt;
uint64 lastPaymentAt;
uint32 operatorCount;
uint32 minOperators;
uint32 maxOperators;
uint8 membership;
uint8 pricing;
uint8 status;
}
}
}
const PRICING_SUBSCRIPTION: u8 = 1;
const STATUS_ACTIVE: u8 = 1;
pub struct SubscriptionBillingKeeper;
impl BackgroundKeeper for SubscriptionBillingKeeper {
const NAME: &'static str = "SubscriptionBillingKeeper";
fn start(config: KeeperConfig, mut shutdown: broadcast::Receiver<()>) -> KeeperHandle {
let handle = tokio::spawn(async move {
info!("[{}] Starting subscription billing keeper", Self::NAME);
let tangle_address = config.tangle_contract.ok_or_else(|| {
KeeperError::Config("Tangle contract address not configured".into())
})?;
info!(
"[{}] Monitoring Tangle contract at {}",
Self::NAME,
tangle_address
);
let mut tracked_service_ids: Vec<u64> = Vec::new();
let mut last_rescan = Instant::now()
.checked_sub(config.billing_rescan_interval)
.unwrap_or_else(Instant::now);
loop {
tokio::select! {
_ = shutdown.recv() => {
info!("[{}] Received shutdown signal", Self::NAME);
break;
}
_ = tokio::time::sleep(config.billing_check_interval) => {
if last_rescan.elapsed() >= config.billing_rescan_interval {
match rescan_services(&config, tangle_address).await {
Ok(ids) => {
if ids.len() != tracked_service_ids.len() {
info!(
"[{}] Tracking {} subscription services (was {})",
Self::NAME, ids.len(), tracked_service_ids.len()
);
}
tracked_service_ids = ids;
last_rescan = Instant::now();
}
Err(e) => {
warn!("[{}] Error rescanning services: {}", Self::NAME, e);
}
}
}
if tracked_service_ids.is_empty() {
debug!("[{}] No subscription services to bill", Self::NAME);
continue;
}
match check_and_bill(
&config,
tangle_address,
&tracked_service_ids,
).await {
Ok(true) => info!("[{}] Billing executed successfully", Self::NAME),
Ok(false) => debug!("[{}] No services due for billing", Self::NAME),
Err(e) => warn!("[{}] Error during billing check: {}", Self::NAME, e),
}
}
}
}
info!("[{}] Keeper stopped", Self::NAME);
Ok(())
});
KeeperHandle {
handle,
name: Self::NAME,
}
}
async fn check_and_execute(config: &KeeperConfig) -> KeeperResult<bool> {
let tangle_address = config
.tangle_contract
.ok_or_else(|| KeeperError::Config("Tangle contract address not configured".into()))?;
let tracked = rescan_services(config, tangle_address).await?;
if tracked.is_empty() {
return Ok(false);
}
check_and_bill(config, tangle_address, &tracked).await
}
}
async fn rescan_services(config: &KeeperConfig, tangle_address: Address) -> KeeperResult<Vec<u64>> {
let provider = config.get_read_provider().await?;
let tangle = ITangle::new(tangle_address, provider);
let count = tangle
.serviceCount()
.call()
.await
.map_err(|e| KeeperError::Contract(format!("Failed to get service count: {}", e)))?;
let total = count;
let mut subscription_ids = Vec::new();
for id in 0..total {
match tangle.getService(id).call().await {
Ok(svc) => {
if svc.pricing == PRICING_SUBSCRIPTION && svc.status == STATUS_ACTIVE {
subscription_ids.push(id);
}
}
Err(e) => {
debug!(
"[{}] Failed to get service {}: {}",
SubscriptionBillingKeeper::NAME,
id,
e
);
}
}
}
Ok(subscription_ids)
}
async fn check_and_bill(
config: &KeeperConfig,
tangle_address: Address,
tracked_ids: &[u64],
) -> KeeperResult<bool> {
if tracked_ids.is_empty() {
return Ok(false);
}
let read_provider = config.get_read_provider().await?;
let tangle_read = ITangle::new(tangle_address, read_provider);
let billable = tangle_read
.getBillableServices(tracked_ids.to_vec())
.call()
.await
.map_err(|e| KeeperError::Contract(format!("Failed to get billable services: {}", e)))?;
let billable_ids = billable;
if billable_ids.is_empty() {
return Ok(false);
}
info!(
"[{}] Found {} billable services, submitting billing transaction",
SubscriptionBillingKeeper::NAME,
billable_ids.len()
);
let provider = config.get_provider().await?;
let tangle = ITangle::new(tangle_address, provider);
let mut any_billed = false;
for chunk in billable_ids.chunks(config.billing_max_batch_size) {
let batch: Vec<u64> = chunk.to_vec();
match tangle.billSubscriptionBatch(batch.clone()).send().await {
Ok(pending) => match pending.get_receipt().await {
Ok(receipt) => {
info!(
"[{}] Billed {} services, tx: {:?}",
SubscriptionBillingKeeper::NAME,
batch.len(),
receipt.transaction_hash
);
any_billed = true;
}
Err(e) => {
warn!(
"[{}] Failed to get billing receipt: {}",
SubscriptionBillingKeeper::NAME,
e
);
}
},
Err(e) => {
warn!(
"[{}] Failed to send billing tx: {}",
SubscriptionBillingKeeper::NAME,
e
);
}
}
}
Ok(any_billed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_billing_keeper_name() {
assert_eq!(SubscriptionBillingKeeper::NAME, "SubscriptionBillingKeeper");
}
}