Skip to main content

blueprint_eigenlayer_extra/
generic_task_aggregation.rs

1use alloy_primitives::{FixedBytes, hex, keccak256};
2use blueprint_core::{error, info};
3use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
4use eigensdk::crypto_bls::Signature;
5use eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller;
6use eigensdk::services_blsaggregation::bls_agg::{
7    AggregateReceiver, BlsAggregatorService, ServiceHandle, TaskMetadata, TaskSignature,
8};
9use eigensdk::services_blsaggregation::bls_aggregation_service_error::BlsAggregationServiceError;
10use eigensdk::services_blsaggregation::bls_aggregation_service_response::BlsAggregationServiceResponse;
11use eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory;
12use eigensdk::types::avs::{TaskIndex, TaskResponseDigest};
13use std::collections::{HashMap, VecDeque};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::sync::{Mutex, Notify, RwLock};
20use tokio::task::JoinHandle;
21use tokio::time::interval;
22
23/// Error type for the generic task aggregation system
24#[derive(Error, Debug)]
25pub enum AggregationError {
26    /// Task not found
27    #[error("Task not found: {0}")]
28    TaskNotFound(String),
29
30    /// BLS aggregation service error
31    #[error("BLS aggregation service error: {0}")]
32    BlsAggregationError(#[from] BlsAggregationServiceError),
33
34    /// Contract interaction error
35    #[error("Contract interaction error: {0}")]
36    ContractError(String),
37
38    /// I/O error
39    #[error("I/O error: {0}")]
40    IoError(#[from] std::io::Error),
41
42    /// JSON serialization error
43    #[error("JSON serialization error: {0}")]
44    JsonError(#[from] serde_json::Error),
45
46    /// Aggregator already stopped
47    #[error("Aggregator already stopped")]
48    AlreadyStopped,
49
50    /// Service initialization error
51    #[error("Service initialization error: {0}")]
52    ServiceInitError(String),
53
54    /// Tokio task join error
55    #[error("Tokio task join error: {0}")]
56    TaskJoinError(#[from] tokio::task::JoinError),
57}
58
59/// Type alias for using Result with [`AggregationError`]
60pub type Result<T> = std::result::Result<T, AggregationError>;
61
62/// Convenience type alias for the BLS Aggregation Service used in EigenLayer
63pub type BlsAggServiceInMemory = BlsAggregatorService<
64    AvsRegistryServiceChainCaller<AvsRegistryChainReader, OperatorInfoServiceInMemory>,
65>;
66
67/// Trait for a generic EigenLayer task
68pub trait EigenTask: Clone + Send + Sync + 'static {
69    /// Get the task index/ID
70    fn task_index(&self) -> TaskIndex;
71
72    /// Get the block at which the task was created
73    fn created_block(&self) -> u32;
74
75    /// Get the quorum numbers this task is associated with
76    fn quorum_numbers(&self) -> Vec<u8>;
77
78    /// Get the quorum threshold percentage required for this task
79    fn quorum_threshold_percentage(&self) -> u8;
80
81    /// Encode the task to bytes for hashing
82    fn encode(&self) -> Vec<u8>;
83
84    /// Create a digest of the task
85    fn digest(&self) -> FixedBytes<32> {
86        keccak256(self.encode())
87    }
88}
89
90/// Trait for a generic task response
91pub trait TaskResponse: Clone + Send + Sync + 'static {
92    /// Get the task index this response refers to
93    fn reference_task_index(&self) -> TaskIndex;
94
95    /// Encode the response to bytes for hashing
96    fn encode(&self) -> Vec<u8>;
97
98    /// Create a digest of the response
99    fn digest(&self) -> FixedBytes<32> {
100        keccak256(self.encode())
101    }
102}
103
104/// A signed task response containing the response data, signature, and operator ID
105#[derive(Clone, Debug)]
106pub struct SignedTaskResponse<R: TaskResponse> {
107    /// The task response data
108    pub response: R,
109    /// The BLS signature
110    pub signature: Signature,
111    /// The operator's ID that signed the response
112    pub operator_id: FixedBytes<32>,
113}
114
115impl<R: TaskResponse> SignedTaskResponse<R> {
116    /// Create a new signed task response
117    pub fn new(response: R, signature: Signature, operator_id: FixedBytes<32>) -> Self {
118        Self {
119            response,
120            signature,
121            operator_id,
122        }
123    }
124
125    /// Get the signature
126    pub fn signature(&self) -> Signature {
127        self.signature.clone()
128    }
129
130    /// Get the operator ID
131    pub fn operator_id(&self) -> FixedBytes<32> {
132        self.operator_id
133    }
134}
135
136/// Trait for sending aggregated responses to EigenLayer contracts
137pub trait ResponseSender<T: EigenTask, R: TaskResponse>: Send + Sync + Clone + 'static {
138    /// Future type returned by `send_aggregated_response`
139    type Future: std::future::Future<Output = Result<()>> + Send + 'static;
140
141    /// Send an aggregated response to the contract
142    fn send_aggregated_response(
143        &self,
144        task: &T,
145        response: &R,
146        aggregation_result: BlsAggregationServiceResponse,
147    ) -> Self::Future;
148}
149
150/// Configuration for the generic task aggregator
151#[derive(Clone)]
152pub struct AggregatorConfig {
153    /// How often to process the cached responses (in seconds)
154    pub processing_interval: u64,
155    /// Maximum number of responses to cache per task
156    pub max_responses_per_task: usize,
157    /// Maximum number of retries for sending responses
158    pub send_retries: usize,
159    /// Delay between retries (in seconds)
160    pub retry_delay: u64,
161}
162
163impl Default for AggregatorConfig {
164    fn default() -> Self {
165        Self {
166            processing_interval: 6,
167            max_responses_per_task: 1000,
168            send_retries: 3,
169            retry_delay: 2,
170        }
171    }
172}
173
174/// Generic task aggregator for EigenLayer AVS
175pub struct TaskAggregator<T, R, S>
176where
177    T: EigenTask,
178    R: TaskResponse,
179    S: ResponseSender<T, R>,
180{
181    /// Map of task index to task
182    pub tasks: Arc<RwLock<HashMap<TaskIndex, T>>>,
183    /// Map of task index to a map of response digest to response
184    pub responses: Arc<RwLock<HashMap<TaskIndex, HashMap<TaskResponseDigest, R>>>>,
185    /// Queue of signed responses waiting to be processed
186    pub response_cache: Arc<Mutex<VecDeque<SignedTaskResponse<R>>>>,
187    /// The BLS aggregation service handle
188    pub bls_service: ServiceHandle,
189    /// The receiver for aggregated results
190    pub aggregate_receiver: Arc<Mutex<AggregateReceiver>>,
191    /// The contract response sender
192    pub response_sender: S,
193    /// Configuration
194    pub config: AggregatorConfig,
195    /// Shutdown notification
196    pub shutdown: Arc<(Notify, Mutex<bool>)>,
197    /// Running task handles
198    pub task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
199    _phantom: PhantomData<(T, R)>,
200}
201
202impl<T, R, S> TaskAggregator<T, R, S>
203where
204    T: EigenTask,
205    R: TaskResponse,
206    S: ResponseSender<T, R>,
207{
208    /// Create a new task aggregator
209    pub fn new(
210        bls_service: BlsAggServiceInMemory,
211        response_sender: S,
212        config: AggregatorConfig,
213    ) -> Self {
214        let (service_handle, aggregate_receiver) = bls_service.start();
215
216        Self {
217            tasks: Arc::new(RwLock::new(HashMap::new())),
218            responses: Arc::new(RwLock::new(HashMap::new())),
219            response_cache: Arc::new(Mutex::new(VecDeque::new())),
220            bls_service: service_handle,
221            aggregate_receiver: Arc::new(Mutex::new(aggregate_receiver)),
222            response_sender,
223            config,
224            shutdown: Arc::new((Notify::new(), Mutex::new(false))),
225            task_handles: Arc::new(Mutex::new(Vec::new())),
226            _phantom: PhantomData,
227        }
228    }
229
230    /// Register a new task with the aggregator
231    ///
232    /// # Errors
233    /// - [`AggregationError::BlsAggregationError`] - If the task index already exists in the aggregator
234    pub async fn register_task(&self, task: T) -> Result<()> {
235        let task_index = task.task_index();
236        let mut tasks = self.tasks.write().await;
237
238        // Store the task by its index
239        tasks.insert(task_index, task.clone());
240
241        // Initialize empty response map for this task
242        let mut responses = self.responses.write().await;
243        responses.entry(task_index).or_insert_with(HashMap::new);
244
245        // Initialize the task in the BLS service
246        let task_metadata = TaskMetadata::new(
247            task_index,
248            u64::from(task.created_block()),
249            task.quorum_numbers(),
250            vec![task.quorum_threshold_percentage(); task.quorum_numbers().len()],
251            Duration::from_secs(self.config.processing_interval * 10), // Set a reasonable expiry time
252        );
253
254        self.bls_service
255            .initialize_task(task_metadata)
256            .await
257            .map_err(AggregationError::BlsAggregationError)?;
258
259        Ok(())
260    }
261
262    /// Process a signed task response
263    pub async fn process_signed_response(&self, signed_response: SignedTaskResponse<R>) {
264        info!("Caching signed response");
265        // Add to cache for processing
266        self.response_cache.lock().await.push_back(signed_response);
267    }
268
269    /// Start the aggregator service
270    pub async fn start(&self) {
271        // Start the processing loop
272        let process_handle = self.start_processing_loop();
273
274        // Start the aggregation result handling loop
275        let aggregation_handle = self.start_aggregation_handling_loop();
276
277        // Store the handles
278        let mut handles = self.task_handles.lock().await;
279        handles.push(process_handle);
280        handles.push(aggregation_handle);
281    }
282
283    /// Stop the aggregator service
284    ///
285    /// # Errors
286    /// - [`AggregationError::AlreadyStopped`] - If the service is already stopped
287    pub async fn stop(&self) -> Result<()> {
288        // Set shutdown flag
289        let (notify, is_shutdown) = &*self.shutdown;
290        let mut shutdown_lock = is_shutdown.lock().await;
291
292        if *shutdown_lock {
293            return Err(AggregationError::AlreadyStopped);
294        }
295
296        info!("Setting shutdown flag for task aggregator");
297        *shutdown_lock = true;
298        drop(shutdown_lock);
299
300        // Notify all waiters to check the shutdown flag
301        notify.notify_waiters();
302
303        // Get handles but don't hold the lock
304        let handles = {
305            let mut handles_lock = self.task_handles.lock().await;
306            std::mem::take(&mut *handles_lock)
307        };
308
309        info!(
310            "Waiting for {} task aggregator background tasks to complete",
311            handles.len()
312        );
313
314        // Use a timeout to wait for tasks to complete
315        let timeout = Duration::from_secs(5);
316        for handle in handles {
317            if !handle.is_finished() {
318                match tokio::time::timeout(timeout, handle).await {
319                    Ok(_) => info!("Task aggregator background task completed successfully"),
320                    Err(_) => {
321                        error!(
322                            "Task aggregator background task did not complete within timeout, aborting"
323                        );
324                        // We can't abort the handle directly, but we've set the shutdown flag
325                        // which should cause it to exit eventually
326                    }
327                }
328            }
329        }
330
331        info!("Task aggregator shutdown complete");
332        Ok(())
333    }
334
335    // Private helper methods
336
337    fn start_processing_loop(&self) -> JoinHandle<()> {
338        let tasks = self.tasks.clone();
339        let responses = self.responses.clone();
340        let response_cache = self.response_cache.clone();
341        let bls_service = self.bls_service.clone();
342        let shutdown = self.shutdown.clone();
343        let interval_secs = self.config.processing_interval;
344
345        tokio::spawn(async move {
346            let mut interval = interval(Duration::from_secs(interval_secs));
347
348            loop {
349                tokio::select! {
350                    _ = interval.tick() => {
351                        // Check shutdown status first
352                        if *shutdown.1.lock().await {
353                            break;
354                        }
355
356                        // Get responses to process while holding the lock briefly
357                        let responses_to_process = {
358                            let mut cache = response_cache.lock().await;
359                            std::mem::take(&mut *cache)
360                        };
361
362                        for signed_resp in responses_to_process {
363                            let task_index = signed_resp.response.reference_task_index();
364
365                            // Check if we have the task
366                            let task_exists = tasks.read().await.contains_key(&task_index);
367
368                            if !task_exists {
369                                // Put it back in the cache for later processing
370                                response_cache.lock().await.push_back(signed_resp);
371                                continue;
372                            }
373
374                            // Process the response
375                            match Self::process_response(
376                                task_index,
377                                signed_resp.clone(),
378                                &responses,
379                                &bls_service,
380                            ).await {
381                                Ok(()) => {
382                                    // Successfully processed
383                                }
384                                Err(e) => {
385                                    // Log error and put back in cache for retry
386                                    blueprint_core::error!("Failed to process response: {:?}", e);
387                                    response_cache.lock().await.push_back(signed_resp);
388                                }
389                            }
390                        }
391                    }
392                    () = shutdown.0.notified() => {
393                        if *shutdown.1.lock().await {
394                            break;
395                        }
396                    }
397                }
398            }
399
400            blueprint_core::info!("Processing loop shutdown complete");
401        })
402    }
403
404    fn start_aggregation_handling_loop(&self) -> JoinHandle<()> {
405        let tasks = self.tasks.clone();
406        let responses = self.responses.clone();
407        let aggregate_receiver = self.aggregate_receiver.clone();
408        let response_sender = self.response_sender.clone();
409        let shutdown = self.shutdown.clone();
410        let config = self.config.clone();
411
412        tokio::spawn(async move {
413            loop {
414                tokio::select! {
415                    Ok(aggregation_result) = async {
416                        let mut receiver = aggregate_receiver.lock().await;
417                        receiver.receive_aggregated_response().await
418                    } => {
419                        let task_index = aggregation_result.task_index;
420                        let response_digest = aggregation_result.task_response_digest;
421
422                        blueprint_core::info!(
423                            "Received aggregation result for task {}, digest: {}",
424                            task_index,
425                            hex::encode(response_digest)
426                        );
427
428                        // Retrieve the task and response
429                        let task_opt = tasks.read().await.get(&task_index).cloned();
430                        let response_opt = responses.read().await
431                            .get(&task_index)
432                            .and_then(|r| r.get(&response_digest))
433                            .cloned();
434
435                        if let (Some(task), Some(response)) = (task_opt, response_opt) {
436                            // Try to send the response to the contract
437                            for i in 0..config.send_retries {
438                                match response_sender
439                                    .send_aggregated_response(&task, &response, aggregation_result.clone())
440                                    .await
441                                {
442                                    Ok(()) => {
443                                        blueprint_core::info!(
444                                            "Successfully sent aggregated response for task {}",
445                                            task_index
446                                        );
447                                        break;
448                                    }
449                                    Err(e) if i < config.send_retries - 1 => {
450                                        blueprint_core::warn!(
451                                            "Failed to send aggregated response (attempt {}/{}): {:?}",
452                                            i + 1,
453                                            config.send_retries,
454                                            e
455                                        );
456                                        tokio::time::sleep(Duration::from_secs(config.retry_delay)).await;
457                                    }
458                                    Err(e) => {
459                                        blueprint_core::error!(
460                                            "Failed to send aggregated response after {} attempts: {:?}",
461                                            config.send_retries,
462                                            e
463                                        );
464                                    }
465                                }
466                            }
467                        } else {
468                            blueprint_core::error!(
469                                "Missing task or response for aggregation result. Task index: {}, response digest: {}",
470                                task_index,
471                                hex::encode(response_digest)
472                            );
473                        }
474                    }
475                    () = shutdown.0.notified() => {
476                        if *shutdown.1.lock().await {
477                            break;
478                        }
479                    }
480                }
481            }
482
483            blueprint_core::info!("Aggregation handling loop shutdown complete");
484        })
485    }
486
487    async fn process_response(
488        task_index: TaskIndex,
489        signed_resp: SignedTaskResponse<R>,
490        responses: &Arc<RwLock<HashMap<TaskIndex, HashMap<TaskResponseDigest, R>>>>,
491        bls_service: &ServiceHandle,
492    ) -> Result<()> {
493        let response_digest = signed_resp.response.digest();
494
495        // Check if we've already processed this response
496        let response_exists = {
497            let resp_map = responses.read().await;
498            resp_map
499                .get(&task_index)
500                .is_some_and(|m| m.contains_key(&response_digest))
501        };
502
503        if response_exists {
504            blueprint_core::info!(
505                "Response for task {} with digest {} already processed",
506                task_index,
507                hex::encode(response_digest)
508            );
509            return Ok(());
510        }
511
512        // Create a task signature
513        let task_signature = TaskSignature::new(
514            task_index,
515            response_digest,
516            signed_resp.signature(),
517            signed_resp.operator_id(),
518        );
519
520        // Process the signature through BLS service
521        bls_service
522            .process_signature(task_signature)
523            .await
524            .map_err(AggregationError::BlsAggregationError)?;
525
526        // Store the response
527        {
528            let mut resp_map = responses.write().await;
529            if let Some(task_responses) = resp_map.get_mut(&task_index) {
530                task_responses.insert(response_digest, signed_resp.response);
531            }
532        }
533
534        blueprint_core::debug!("Successfully processed signature for task {}", task_index);
535        Ok(())
536    }
537}