use super::keeper::{BackgroundKeeper, KeeperConfig, KeeperError, KeeperHandle, KeeperResult};
use alloy::primitives::U256;
use alloy::sol;
use blueprint_core::{debug, info, warn};
use tokio::sync::broadcast;
const MIN_DRIP_THRESHOLD: u128 = 1_000_000_000_000_000;
sol! {
#[sol(rpc)]
interface IStreamingPaymentManager {
function pendingDripForOperator(address operator) external view returns (uint256 totalPending, uint256 streamCount);
function getOperatorActiveStreams(address operator) external view returns (uint64[] memory);
event StreamDripAvailable(address indexed operator, uint256 pendingAmount, uint256 streamCount);
}
#[sol(rpc)]
interface IServiceFeeDistributor {
function dripOperatorStreams(address operator) external;
}
}
pub struct StreamKeeper;
impl BackgroundKeeper for StreamKeeper {
const NAME: &'static str = "StreamKeeper";
fn start(config: KeeperConfig, mut shutdown: broadcast::Receiver<()>) -> KeeperHandle {
let handle = tokio::spawn(async move {
info!("[{}] Starting stream keeper", Self::NAME);
let spm_address = config.streaming_payment_manager.ok_or_else(|| {
KeeperError::Config("StreamingPaymentManager address not configured".into())
})?;
info!(
"[{}] Monitoring StreamingPaymentManager at {}",
Self::NAME,
spm_address
);
let operators = if config.monitored_operators.is_empty() {
vec![config.get_operator_address()?]
} else {
config.monitored_operators.clone()
};
info!(
"[{}] Monitoring {} operators for pending drips",
Self::NAME,
operators.len()
);
loop {
tokio::select! {
_ = shutdown.recv() => {
info!("[{}] Received shutdown signal", Self::NAME);
break;
}
_ = tokio::time::sleep(config.stream_check_interval) => {
match Self::check_and_execute(&config).await {
Ok(true) => info!("[{}] Drips triggered", Self::NAME),
Ok(false) => debug!("[{}] No pending drips above threshold", Self::NAME),
Err(e) => warn!("[{}] Error checking drips: {}", Self::NAME, e),
}
}
}
}
info!("[{}] Keeper stopped", Self::NAME);
Ok(())
});
KeeperHandle {
handle,
name: Self::NAME,
}
}
async fn check_and_execute(config: &KeeperConfig) -> KeeperResult<bool> {
let spm_address = config.streaming_payment_manager.ok_or_else(|| {
KeeperError::Config("StreamingPaymentManager address not configured".into())
})?;
let operators = if config.monitored_operators.is_empty() {
vec![config.get_operator_address()?]
} else {
config.monitored_operators.clone()
};
let read_provider = config.get_read_provider().await?;
let spm = IStreamingPaymentManager::new(spm_address, read_provider);
let mut any_triggered = false;
for operator in &operators {
let result = spm
.pendingDripForOperator(*operator)
.call()
.await
.map_err(|e| {
KeeperError::Contract(format!(
"Failed to get pendingDripForOperator for {}: {}",
operator, e
))
})?;
let pending = result.totalPending;
let stream_count = result.streamCount;
if pending < U256::from(MIN_DRIP_THRESHOLD) {
debug!(
"[{}] Operator {} has {} pending across {} streams (below threshold)",
StreamKeeper::NAME,
operator,
pending,
stream_count
);
continue;
}
info!(
"[{}] Operator {} has {} pending across {} streams, triggering drip",
StreamKeeper::NAME,
operator,
pending,
stream_count
);
info!(
"[{}] Drip available for operator {}: {} wei across {} streams",
StreamKeeper::NAME,
operator,
pending,
stream_count
);
any_triggered = true;
}
Ok(any_triggered)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_keeper_name() {
assert_eq!(StreamKeeper::NAME, "StreamKeeper");
}
#[test]
fn test_min_drip_threshold() {
assert_eq!(MIN_DRIP_THRESHOLD, 1_000_000_000_000_000);
}
}