blueprint_eigenlayer_extra/
generic_task_aggregation.rs1use alloy_primitives::{FixedBytes, hex, keccak256};
2use blueprint_core::{error, info};
3use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
4use eigensdk::crypto_bls::Signature;
5use eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller;
6use eigensdk::services_blsaggregation::bls_agg::{
7 AggregateReceiver, BlsAggregatorService, ServiceHandle, TaskMetadata, TaskSignature,
8};
9use eigensdk::services_blsaggregation::bls_aggregation_service_error::BlsAggregationServiceError;
10use eigensdk::services_blsaggregation::bls_aggregation_service_response::BlsAggregationServiceResponse;
11use eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory;
12use eigensdk::types::avs::{TaskIndex, TaskResponseDigest};
13use std::collections::{HashMap, VecDeque};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::sync::{Mutex, Notify, RwLock};
20use tokio::task::JoinHandle;
21use tokio::time::interval;
22
23#[derive(Error, Debug)]
25pub enum AggregationError {
26 #[error("Task not found: {0}")]
28 TaskNotFound(String),
29
30 #[error("BLS aggregation service error: {0}")]
32 BlsAggregationError(#[from] BlsAggregationServiceError),
33
34 #[error("Contract interaction error: {0}")]
36 ContractError(String),
37
38 #[error("I/O error: {0}")]
40 IoError(#[from] std::io::Error),
41
42 #[error("JSON serialization error: {0}")]
44 JsonError(#[from] serde_json::Error),
45
46 #[error("Aggregator already stopped")]
48 AlreadyStopped,
49
50 #[error("Service initialization error: {0}")]
52 ServiceInitError(String),
53
54 #[error("Tokio task join error: {0}")]
56 TaskJoinError(#[from] tokio::task::JoinError),
57}
58
59pub type Result<T> = std::result::Result<T, AggregationError>;
61
62pub type BlsAggServiceInMemory = BlsAggregatorService<
64 AvsRegistryServiceChainCaller<AvsRegistryChainReader, OperatorInfoServiceInMemory>,
65>;
66
67pub trait EigenTask: Clone + Send + Sync + 'static {
69 fn task_index(&self) -> TaskIndex;
71
72 fn created_block(&self) -> u32;
74
75 fn quorum_numbers(&self) -> Vec<u8>;
77
78 fn quorum_threshold_percentage(&self) -> u8;
80
81 fn encode(&self) -> Vec<u8>;
83
84 fn digest(&self) -> FixedBytes<32> {
86 keccak256(self.encode())
87 }
88}
89
90pub trait TaskResponse: Clone + Send + Sync + 'static {
92 fn reference_task_index(&self) -> TaskIndex;
94
95 fn encode(&self) -> Vec<u8>;
97
98 fn digest(&self) -> FixedBytes<32> {
100 keccak256(self.encode())
101 }
102}
103
104#[derive(Clone, Debug)]
106pub struct SignedTaskResponse<R: TaskResponse> {
107 pub response: R,
109 pub signature: Signature,
111 pub operator_id: FixedBytes<32>,
113}
114
115impl<R: TaskResponse> SignedTaskResponse<R> {
116 pub fn new(response: R, signature: Signature, operator_id: FixedBytes<32>) -> Self {
118 Self {
119 response,
120 signature,
121 operator_id,
122 }
123 }
124
125 pub fn signature(&self) -> Signature {
127 self.signature.clone()
128 }
129
130 pub fn operator_id(&self) -> FixedBytes<32> {
132 self.operator_id
133 }
134}
135
136pub trait ResponseSender<T: EigenTask, R: TaskResponse>: Send + Sync + Clone + 'static {
138 type Future: std::future::Future<Output = Result<()>> + Send + 'static;
140
141 fn send_aggregated_response(
143 &self,
144 task: &T,
145 response: &R,
146 aggregation_result: BlsAggregationServiceResponse,
147 ) -> Self::Future;
148}
149
150#[derive(Clone)]
152pub struct AggregatorConfig {
153 pub processing_interval: u64,
155 pub max_responses_per_task: usize,
157 pub send_retries: usize,
159 pub retry_delay: u64,
161}
162
163impl Default for AggregatorConfig {
164 fn default() -> Self {
165 Self {
166 processing_interval: 6,
167 max_responses_per_task: 1000,
168 send_retries: 3,
169 retry_delay: 2,
170 }
171 }
172}
173
174pub struct TaskAggregator<T, R, S>
176where
177 T: EigenTask,
178 R: TaskResponse,
179 S: ResponseSender<T, R>,
180{
181 pub tasks: Arc<RwLock<HashMap<TaskIndex, T>>>,
183 pub responses: Arc<RwLock<HashMap<TaskIndex, HashMap<TaskResponseDigest, R>>>>,
185 pub response_cache: Arc<Mutex<VecDeque<SignedTaskResponse<R>>>>,
187 pub bls_service: ServiceHandle,
189 pub aggregate_receiver: Arc<Mutex<AggregateReceiver>>,
191 pub response_sender: S,
193 pub config: AggregatorConfig,
195 pub shutdown: Arc<(Notify, Mutex<bool>)>,
197 pub task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
199 _phantom: PhantomData<(T, R)>,
200}
201
202impl<T, R, S> TaskAggregator<T, R, S>
203where
204 T: EigenTask,
205 R: TaskResponse,
206 S: ResponseSender<T, R>,
207{
208 pub fn new(
210 bls_service: BlsAggServiceInMemory,
211 response_sender: S,
212 config: AggregatorConfig,
213 ) -> Self {
214 let (service_handle, aggregate_receiver) = bls_service.start();
215
216 Self {
217 tasks: Arc::new(RwLock::new(HashMap::new())),
218 responses: Arc::new(RwLock::new(HashMap::new())),
219 response_cache: Arc::new(Mutex::new(VecDeque::new())),
220 bls_service: service_handle,
221 aggregate_receiver: Arc::new(Mutex::new(aggregate_receiver)),
222 response_sender,
223 config,
224 shutdown: Arc::new((Notify::new(), Mutex::new(false))),
225 task_handles: Arc::new(Mutex::new(Vec::new())),
226 _phantom: PhantomData,
227 }
228 }
229
230 pub async fn register_task(&self, task: T) -> Result<()> {
235 let task_index = task.task_index();
236 let mut tasks = self.tasks.write().await;
237
238 tasks.insert(task_index, task.clone());
240
241 let mut responses = self.responses.write().await;
243 responses.entry(task_index).or_insert_with(HashMap::new);
244
245 let task_metadata = TaskMetadata::new(
247 task_index,
248 u64::from(task.created_block()),
249 task.quorum_numbers(),
250 vec![task.quorum_threshold_percentage(); task.quorum_numbers().len()],
251 Duration::from_secs(self.config.processing_interval * 10), );
253
254 self.bls_service
255 .initialize_task(task_metadata)
256 .await
257 .map_err(AggregationError::BlsAggregationError)?;
258
259 Ok(())
260 }
261
262 pub async fn process_signed_response(&self, signed_response: SignedTaskResponse<R>) {
264 info!("Caching signed response");
265 self.response_cache.lock().await.push_back(signed_response);
267 }
268
269 pub async fn start(&self) {
271 let process_handle = self.start_processing_loop();
273
274 let aggregation_handle = self.start_aggregation_handling_loop();
276
277 let mut handles = self.task_handles.lock().await;
279 handles.push(process_handle);
280 handles.push(aggregation_handle);
281 }
282
283 pub async fn stop(&self) -> Result<()> {
288 let (notify, is_shutdown) = &*self.shutdown;
290 let mut shutdown_lock = is_shutdown.lock().await;
291
292 if *shutdown_lock {
293 return Err(AggregationError::AlreadyStopped);
294 }
295
296 info!("Setting shutdown flag for task aggregator");
297 *shutdown_lock = true;
298 drop(shutdown_lock);
299
300 notify.notify_waiters();
302
303 let handles = {
305 let mut handles_lock = self.task_handles.lock().await;
306 std::mem::take(&mut *handles_lock)
307 };
308
309 info!(
310 "Waiting for {} task aggregator background tasks to complete",
311 handles.len()
312 );
313
314 let timeout = Duration::from_secs(5);
316 for handle in handles {
317 if !handle.is_finished() {
318 match tokio::time::timeout(timeout, handle).await {
319 Ok(_) => info!("Task aggregator background task completed successfully"),
320 Err(_) => {
321 error!(
322 "Task aggregator background task did not complete within timeout, aborting"
323 );
324 }
327 }
328 }
329 }
330
331 info!("Task aggregator shutdown complete");
332 Ok(())
333 }
334
335 fn start_processing_loop(&self) -> JoinHandle<()> {
338 let tasks = self.tasks.clone();
339 let responses = self.responses.clone();
340 let response_cache = self.response_cache.clone();
341 let bls_service = self.bls_service.clone();
342 let shutdown = self.shutdown.clone();
343 let interval_secs = self.config.processing_interval;
344
345 tokio::spawn(async move {
346 let mut interval = interval(Duration::from_secs(interval_secs));
347
348 loop {
349 tokio::select! {
350 _ = interval.tick() => {
351 if *shutdown.1.lock().await {
353 break;
354 }
355
356 let responses_to_process = {
358 let mut cache = response_cache.lock().await;
359 std::mem::take(&mut *cache)
360 };
361
362 for signed_resp in responses_to_process {
363 let task_index = signed_resp.response.reference_task_index();
364
365 let task_exists = tasks.read().await.contains_key(&task_index);
367
368 if !task_exists {
369 response_cache.lock().await.push_back(signed_resp);
371 continue;
372 }
373
374 match Self::process_response(
376 task_index,
377 signed_resp.clone(),
378 &responses,
379 &bls_service,
380 ).await {
381 Ok(()) => {
382 }
384 Err(e) => {
385 blueprint_core::error!("Failed to process response: {:?}", e);
387 response_cache.lock().await.push_back(signed_resp);
388 }
389 }
390 }
391 }
392 () = shutdown.0.notified() => {
393 if *shutdown.1.lock().await {
394 break;
395 }
396 }
397 }
398 }
399
400 blueprint_core::info!("Processing loop shutdown complete");
401 })
402 }
403
404 fn start_aggregation_handling_loop(&self) -> JoinHandle<()> {
405 let tasks = self.tasks.clone();
406 let responses = self.responses.clone();
407 let aggregate_receiver = self.aggregate_receiver.clone();
408 let response_sender = self.response_sender.clone();
409 let shutdown = self.shutdown.clone();
410 let config = self.config.clone();
411
412 tokio::spawn(async move {
413 loop {
414 tokio::select! {
415 Ok(aggregation_result) = async {
416 let mut receiver = aggregate_receiver.lock().await;
417 receiver.receive_aggregated_response().await
418 } => {
419 let task_index = aggregation_result.task_index;
420 let response_digest = aggregation_result.task_response_digest;
421
422 blueprint_core::info!(
423 "Received aggregation result for task {}, digest: {}",
424 task_index,
425 hex::encode(response_digest)
426 );
427
428 let task_opt = tasks.read().await.get(&task_index).cloned();
430 let response_opt = responses.read().await
431 .get(&task_index)
432 .and_then(|r| r.get(&response_digest))
433 .cloned();
434
435 if let (Some(task), Some(response)) = (task_opt, response_opt) {
436 for i in 0..config.send_retries {
438 match response_sender
439 .send_aggregated_response(&task, &response, aggregation_result.clone())
440 .await
441 {
442 Ok(()) => {
443 blueprint_core::info!(
444 "Successfully sent aggregated response for task {}",
445 task_index
446 );
447 break;
448 }
449 Err(e) if i < config.send_retries - 1 => {
450 blueprint_core::warn!(
451 "Failed to send aggregated response (attempt {}/{}): {:?}",
452 i + 1,
453 config.send_retries,
454 e
455 );
456 tokio::time::sleep(Duration::from_secs(config.retry_delay)).await;
457 }
458 Err(e) => {
459 blueprint_core::error!(
460 "Failed to send aggregated response after {} attempts: {:?}",
461 config.send_retries,
462 e
463 );
464 }
465 }
466 }
467 } else {
468 blueprint_core::error!(
469 "Missing task or response for aggregation result. Task index: {}, response digest: {}",
470 task_index,
471 hex::encode(response_digest)
472 );
473 }
474 }
475 () = shutdown.0.notified() => {
476 if *shutdown.1.lock().await {
477 break;
478 }
479 }
480 }
481 }
482
483 blueprint_core::info!("Aggregation handling loop shutdown complete");
484 })
485 }
486
487 async fn process_response(
488 task_index: TaskIndex,
489 signed_resp: SignedTaskResponse<R>,
490 responses: &Arc<RwLock<HashMap<TaskIndex, HashMap<TaskResponseDigest, R>>>>,
491 bls_service: &ServiceHandle,
492 ) -> Result<()> {
493 let response_digest = signed_resp.response.digest();
494
495 let response_exists = {
497 let resp_map = responses.read().await;
498 resp_map
499 .get(&task_index)
500 .is_some_and(|m| m.contains_key(&response_digest))
501 };
502
503 if response_exists {
504 blueprint_core::info!(
505 "Response for task {} with digest {} already processed",
506 task_index,
507 hex::encode(response_digest)
508 );
509 return Ok(());
510 }
511
512 let task_signature = TaskSignature::new(
514 task_index,
515 response_digest,
516 signed_resp.signature(),
517 signed_resp.operator_id(),
518 );
519
520 bls_service
522 .process_signature(task_signature)
523 .await
524 .map_err(AggregationError::BlsAggregationError)?;
525
526 {
528 let mut resp_map = responses.write().await;
529 if let Some(task_responses) = resp_map.get_mut(&task_index) {
530 task_responses.insert(response_digest, signed_resp.response);
531 }
532 }
533
534 blueprint_core::debug!("Successfully processed signature for task {}", task_index);
535 Ok(())
536 }
537}