use crate::{processor::stream::StreamProcessor, proto::HubEvent, redis::stream::RedisStream};
use std::{sync::Arc, time::Duration};
use tokio::{sync::RwLock, task::JoinHandle, time};
use tracing::error;
const BASE_GROUP_NAME: &str = "default";
const MAX_EVENTS_PER_FETCH: u64 = 50; const MESSAGE_PROCESSING_CONCURRENCY: usize = 250; const CONSUMERS_PER_STREAM: usize = 3;
const EVENT_PROCESSING_TIMEOUT: Duration = Duration::from_secs(120);
const EVENT_DELETION_THRESHOLD: Duration = Duration::from_secs(24 * 60 * 60);
#[async_trait::async_trait]
pub trait EventProcessor: Send + Sync + 'static {
async fn process_event(
&self,
event: HubEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
fn as_any(&self) -> &dyn std::any::Any {
&() }
}
pub struct Consumer {
stream: Arc<RedisStream>,
base_stream_key: String,
processors: Vec<Arc<dyn EventProcessor>>,
shutdown: Arc<RwLock<bool>>,
stream_tasks: Vec<JoinHandle<()>>,
}
impl Consumer {
pub fn new(stream: Arc<RedisStream>, hub_host: String, _shard_key: String) -> Self {
Self {
stream,
base_stream_key: crate::types::get_stream_key(&hub_host, ""),
processors: Vec::new(),
shutdown: Arc::new(RwLock::new(false)),
stream_tasks: Vec::new(),
}
}
fn get_stream_keys(&self) -> impl Iterator<Item = (&str, &str)> {
[
("casts", "casts"),
("reactions", "reactions"),
("links", "links"),
("verifications", "verifications"),
("user_data", "user_data"),
("username_proofs", "username_proofs"),
("onchain:signer", "onchain"),
("onchain:signer_migrated", "onchain"),
("onchain:id_register", "onchain"),
("onchain:storage_rent", "onchain"),
]
.into_iter()
}
pub async fn start(&mut self) -> Result<(), crate::redis::error::Error> {
self.stream.wait_until_ready(Duration::from_secs(5)).await?;
let mut tasks = Vec::new();
for (event_type, group_suffix) in self.get_stream_keys() {
let parts: Vec<&str> = self.base_stream_key.split(':').collect();
let hub_host = if parts.len() >= 2 {
parts[1]
} else {
"localhost"
};
let stream_key = crate::types::get_stream_key(hub_host, event_type);
let group_name = format!("{}:{}", BASE_GROUP_NAME, group_suffix);
let _consumer_id = crate::redis::stream::RedisStream::get_stable_consumer_id();
let _ = self.stream
.start_consumer_rebalancing(Duration::from_secs(30)) .await;
for consumer_num in 0..CONSUMERS_PER_STREAM {
let consumer_instance_id = format!("waypoint-{}", consumer_num + 1);
let processor = StreamProcessor {
stream: Arc::clone(&self.stream),
stream_key: stream_key.clone(),
group_name: group_name.clone(),
processors: self.processors.clone(),
shutdown: Arc::clone(&self.shutdown),
max_events_per_fetch: MAX_EVENTS_PER_FETCH,
processing_concurrency: MESSAGE_PROCESSING_CONCURRENCY,
event_processing_timeout: EVENT_PROCESSING_TIMEOUT,
consumer_id: consumer_instance_id.clone(),
};
let handle = tokio::spawn(async move {
tracing::info!(
"Starting stream processor {}/{} with consumer ID: {}",
consumer_num + 1,
CONSUMERS_PER_STREAM,
consumer_instance_id
);
if let Err(e) = processor.process_stream().await {
error!(
"Stream processor error for consumer {}: {}",
consumer_instance_id, e
);
}
});
tasks.push(handle);
}
}
self.stream_tasks = tasks;
for (event_type, _) in self.get_stream_keys() {
let cleanup_stream = Arc::clone(&self.stream);
let parts: Vec<&str> = self.base_stream_key.split(':').collect();
let hub_host = if parts.len() >= 2 { parts[1] } else { "localhost" };
let cleanup_key = crate::types::get_stream_key(hub_host, event_type);
let _cleanup_threshold = EVENT_DELETION_THRESHOLD;
let cleanup_shutdown = Arc::clone(&self.shutdown);
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(60));
while !*cleanup_shutdown.read().await {
interval.tick().await;
if let Err(e) =
cleanup_stream.trim(&cleanup_key, Duration::from_secs(24 * 60 * 60)).await
{
error!("Error clearing old events for {}: {}", cleanup_key, e);
}
}
});
}
Ok(())
}
pub fn add_processor<P: EventProcessor + 'static>(&mut self, processor: Arc<P>) {
self.processors.push(processor);
}
pub async fn stop(&self) {
*self.shutdown.write().await = true;
for task in &self.stream_tasks {
task.abort();
}
}
}