battleware_node/aggregator/
actor.rs

1use crate::{
2    aggregator::{ingress::Mailbox, Config, Message},
3    indexer::Indexer,
4};
5use battleware_types::{
6    api::Summary,
7    execution::{Output, Progress, Value},
8    genesis_digest,
9};
10use bytes::{Buf, BufMut};
11use commonware_codec::{
12    DecodeExt, Encode, EncodeSize, FixedSize, Read, ReadExt, ReadRangeExt, Write,
13};
14use commonware_consensus::aggregation::types::{Certificate, Index, Item};
15use commonware_cryptography::{
16    bls12381::primitives::variant::{MinSig, Variant},
17    ed25519::PublicKey,
18    sha256::Digest,
19    Digestible,
20};
21use commonware_p2p::{Receiver, Sender};
22use commonware_resolver::{p2p, Resolver};
23use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
24use commonware_storage::{
25    cache,
26    journal::fixed,
27    mmr::verification::Proof,
28    ordinal::{self, Ordinal},
29    rmap::RMap,
30    store::operation::{Keyless, Variable},
31};
32use commonware_utils::sequence::U64;
33use futures::{
34    channel::{mpsc, oneshot},
35    join, StreamExt,
36};
37use governor::clock::Clock as GClock;
38use prometheus_client::metrics::gauge::Gauge;
39use rand::RngCore;
40use std::{
41    collections::{BTreeMap, BTreeSet},
42    time::Duration,
43};
44use tracing::{debug, info, warn};
45
46const BATCH_ENQUEUE: usize = 20;
47const RETRY_DELAY: Duration = Duration::from_secs(10);
48
49pub struct Proofs {
50    pub state_proof: Proof<Digest>,
51    pub state_proof_ops: Vec<Variable<Digest, Value>>,
52    pub events_proof: Proof<Digest>,
53    pub events_proof_ops: Vec<Keyless<Output>>,
54}
55
56impl Write for Proofs {
57    fn write(&self, buf: &mut impl BufMut) {
58        self.state_proof.write(buf);
59        self.state_proof_ops.write(buf);
60        self.events_proof.write(buf);
61        self.events_proof_ops.write(buf);
62    }
63}
64
65impl Read for Proofs {
66    type Cfg = ();
67
68    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
69        let state_proof = Proof::<Digest>::read_cfg(reader, &500)?;
70        let state_proof_ops = Vec::read_range(reader, 0..=500)?;
71        let events_proof = Proof::<Digest>::read_cfg(reader, &500)?;
72        let events_proof_ops = Vec::read_range(reader, 0..=500)?;
73        Ok(Self {
74            state_proof,
75            state_proof_ops,
76            events_proof,
77            events_proof_ops,
78        })
79    }
80}
81
82impl EncodeSize for Proofs {
83    fn encode_size(&self) -> usize {
84        self.state_proof.encode_size()
85            + self.state_proof_ops.encode_size()
86            + self.events_proof.encode_size()
87            + self.events_proof_ops.encode_size()
88    }
89}
90
91/// A fixed-size certificate that can be used to store in an ordinal.
92pub struct FixedCertificate {
93    pub index: Index,
94    pub digest: Digest,
95    pub signature: <MinSig as Variant>::Signature,
96}
97
98impl Write for FixedCertificate {
99    fn write(&self, buf: &mut impl BufMut) {
100        self.index.write(buf);
101        self.digest.write(buf);
102        self.signature.write(buf);
103    }
104}
105
106impl Read for FixedCertificate {
107    type Cfg = ();
108
109    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
110        let index = Index::read(reader)?;
111        let digest = Digest::read(reader)?;
112        let signature = <MinSig as Variant>::Signature::read(reader)?;
113        Ok(Self {
114            index,
115            digest,
116            signature,
117        })
118    }
119}
120
121impl FixedSize for FixedCertificate {
122    const SIZE: usize = Index::SIZE + Digest::SIZE + <MinSig as Variant>::Signature::SIZE;
123}
124
125impl From<Certificate<MinSig, Digest>> for FixedCertificate {
126    fn from(certificate: Certificate<MinSig, Digest>) -> Self {
127        Self {
128            index: certificate.item.index,
129            digest: certificate.item.digest,
130            signature: certificate.signature,
131        }
132    }
133}
134
135impl From<FixedCertificate> for Certificate<MinSig, Digest> {
136    fn from(fixed_certificate: FixedCertificate) -> Self {
137        Self {
138            item: Item {
139                index: fixed_certificate.index,
140                digest: fixed_certificate.digest,
141            },
142            signature: fixed_certificate.signature,
143        }
144    }
145}
146
147pub struct Actor<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> {
148    context: R,
149    config: Config<I>,
150    inbound: Mailbox,
151    mailbox: mpsc::Receiver<Message>,
152
153    waiting: BTreeSet<u64>,
154    certificates_processed: Gauge,
155}
156
157impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Actor<R, I> {
158    pub fn new(context: R, config: Config<I>) -> (Self, Mailbox) {
159        // Create mailbox
160        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
161        let inbound = Mailbox::new(sender);
162
163        // Create metrics
164        let certificates_processed = Gauge::default();
165        context.register(
166            "certificates_processed",
167            "Number of contiguous certificates processed",
168            certificates_processed.clone(),
169        );
170
171        (
172            Self {
173                context,
174                config,
175                inbound: inbound.clone(),
176                mailbox,
177                waiting: BTreeSet::new(),
178                certificates_processed,
179            },
180            inbound,
181        )
182    }
183
184    pub fn start(
185        mut self,
186        backfill: (
187            impl Sender<PublicKey = PublicKey>,
188            impl Receiver<PublicKey = PublicKey>,
189        ),
190    ) -> Handle<()> {
191        self.context.spawn_ref()(self.run(backfill))
192    }
193
194    async fn run(
195        mut self,
196        backfill: (
197            impl Sender<PublicKey = PublicKey>,
198            impl Receiver<PublicKey = PublicKey>,
199        ),
200    ) {
201        // Create storage
202        let mut cache = cache::Cache::<_, Proofs>::init(
203            self.context.with_label("cache"),
204            cache::Config {
205                partition: format!("{}-cache", self.config.partition),
206                compression: None,
207                codec_config: (),
208                items_per_blob: self.config.prunable_items_per_blob,
209                write_buffer: self.config.write_buffer,
210                replay_buffer: self.config.replay_buffer,
211                buffer_pool: self.config.buffer_pool.clone(),
212            },
213        )
214        .await
215        .expect("failed to initialize cache");
216        let mut results = fixed::Journal::init(
217            self.context.with_label("results"),
218            fixed::Config {
219                partition: format!("{}-results", self.config.partition),
220                items_per_blob: self.config.persistent_items_per_blob,
221                write_buffer: self.config.write_buffer,
222                buffer_pool: self.config.buffer_pool,
223            },
224        )
225        .await
226        .expect("failed to initialize results storage");
227        let mut certificates = Ordinal::<_, FixedCertificate>::init(
228            self.context.with_label("certificates"),
229            ordinal::Config {
230                partition: format!("{}-certificates", self.config.partition),
231                items_per_blob: self.config.persistent_items_per_blob,
232                write_buffer: self.config.write_buffer,
233                replay_buffer: self.config.replay_buffer,
234            },
235        )
236        .await
237        .expect("failed to initialize certificate storage");
238
239        // Create resolver
240        let (resolver_engine, mut resolver) = p2p::Engine::new(
241            self.context.with_label("resolver"),
242            p2p::Config {
243                coordinator: self.config.supervisor,
244                consumer: self.inbound.clone(),
245                producer: self.inbound.clone(),
246                mailbox_size: self.config.mailbox_size,
247                requester_config: commonware_p2p::utils::requester::Config {
248                    public_key: self.config.public_key,
249                    rate_limit: self.config.backfill_quota,
250                    initial: Duration::from_secs(1),
251                    timeout: Duration::from_secs(2),
252                },
253                fetch_retry_timeout: Duration::from_secs(10),
254                priority_requests: false,
255                priority_responses: false,
256            },
257        );
258        resolver_engine.start(backfill);
259
260        // Start by fetching the first missing certificates
261        let missing = certificates.missing_items(1, BATCH_ENQUEUE);
262        for next in missing {
263            self.waiting.insert(next);
264            resolver.fetch(next.into()).await;
265        }
266
267        // Compute genesis digest
268        let genesis_digest = genesis_digest();
269
270        // Track uploads
271        let mut uploads_outstanding = 0;
272        let mut cursor = cache.first().unwrap_or(1); // start at height 1
273        let mut boundary = cursor;
274        let mut tracked_uploads = RMap::new();
275        info!(cursor, "initial summary cursor");
276
277        // Track pending aggregation work
278        let mut proposal_requests: BTreeMap<u64, oneshot::Sender<Digest>> = BTreeMap::new();
279        let mut verify_requests: BTreeMap<u64, (Digest, oneshot::Sender<bool>)> = BTreeMap::new();
280        loop {
281            let Some(message) = self.mailbox.next().await else {
282                warn!("mailbox closed");
283                break;
284            };
285            match message {
286                Message::Uploaded { index } => {
287                    // Decrement uploads outstanding
288                    uploads_outstanding -= 1;
289
290                    // Track uploaded index
291                    tracked_uploads.insert(index);
292
293                    // Prune proofs up to the uploaded height (contiguous with the boundary)
294                    let Some(end_region) = tracked_uploads.next_gap(boundary).0 else {
295                        continue;
296                    };
297                    if end_region > boundary {
298                        cache
299                            .prune(end_region)
300                            .await
301                            .expect("failed to prune cache");
302                        boundary = end_region;
303                        info!(boundary, "updated summary upload marker");
304                    }
305                }
306                Message::Executed {
307                    view,
308                    height,
309                    commitment,
310                    result,
311                    state_proof,
312                    state_proof_ops,
313                    events_proof,
314                    events_proof_ops,
315                    response,
316                } => {
317                    // Persist proofs
318                    let cache_task = async {
319                        let proofs = Proofs {
320                            state_proof,
321                            state_proof_ops,
322                            events_proof,
323                            events_proof_ops,
324                        };
325                        cache.put(height, proofs).await.unwrap(); // ok to call put multiple times
326                        cache.sync().await.unwrap();
327                    };
328
329                    // Persist progress
330                    let result = Progress::new(
331                        view,
332                        height,
333                        commitment,
334                        result.state_root,
335                        result.state_start_op,
336                        result.state_end_op,
337                        result.events_root,
338                        result.events_start_op,
339                        result.events_end_op,
340                    );
341                    let result_digest = result.digest();
342                    let progress_task = async {
343                        // Size is the next item to store and the height-th value will be stored at height - 1,
344                        // so comparing size() to height is equivalent to checking if the next item stored will be
345                        // at height + 1 (i.e. this height has already been processed).
346                        if results.size().await.unwrap() == height {
347                            warn!(height, "already processed results");
348                            return;
349                        }
350                        results.append(result).await.unwrap();
351                        results.sync().await.unwrap();
352                    };
353                    join!(cache_task, progress_task);
354                    info!(
355                        height,
356                        view,
357                        ?result.state_root,
358                        result.state_start_op,
359                        result.state_end_op,
360                        ?result.events_root,
361                        result.events_start_op,
362                        result.events_end_op,
363                        ?result_digest,
364                        "processed block"
365                    );
366
367                    // Check if we should clear aggregation requests
368                    if let Some(request) = proposal_requests.remove(&height) {
369                        debug!(height, view, "backfilled aggregation proposal");
370                        let _ = request.send(result_digest);
371                    }
372                    proposal_requests.retain(|index, _| *index > height);
373                    if let Some((payload, request)) = verify_requests.remove(&height) {
374                        debug!(height, view, "backfilled aggregation verify");
375                        let _ = request.send(result_digest == payload);
376                    }
377                    verify_requests.retain(|index, _| *index > height);
378
379                    // Continue processing blocks
380                    let _ = response.send(());
381                }
382                Message::Genesis { response } => {
383                    let _ = response.send(genesis_digest);
384                }
385                Message::Propose { index, response } => {
386                    // Fetch item from progress
387                    if index == 0 {
388                        let _ = response.send(genesis_digest);
389                        continue;
390                    }
391                    let item = index - 1;
392                    if let Ok(result) = results.read(item).await {
393                        let _ = response.send(result.digest());
394                        continue;
395                    };
396
397                    // This height may not yet be stored, so we'll wait for it to occur
398                    proposal_requests.insert(index, response);
399                }
400                Message::Verify {
401                    index,
402                    payload,
403                    response,
404                } => {
405                    if index == 0 {
406                        let _ = response.send(genesis_digest == payload);
407                        continue;
408                    }
409                    let item = index - 1;
410                    if let Ok(result) = results.read(item).await {
411                        let _ = response.send(result.digest() == payload);
412                        continue;
413                    };
414
415                    // This height may not yet be stored, so we'll wait for it to occur
416                    verify_requests.insert(index, (payload, response));
417                }
418                Message::Certified { certificate } => {
419                    self.waiting.remove(&certificate.item.index);
420
421                    let height = certificate.item.index;
422                    if height == 0 {
423                        continue;
424                    }
425
426                    // Skip if already processed
427                    if certificates.has(height) {
428                        debug!(height, "already processed");
429                        continue;
430                    }
431                    info!(
432                        height = certificate.item.index,
433                        digest = ?certificate.item.digest,
434                        "certified block"
435                    );
436
437                    // Store in certificates
438                    certificates
439                        .put(height, certificate.clone().into())
440                        .await
441                        .unwrap();
442                    certificates.sync().await.unwrap();
443
444                    // Cancel resolver
445                    if let Some(current_end) = certificates.next_gap(1).0 {
446                        self.certificates_processed.set(current_end as i64);
447                        let current_end = U64::from(current_end);
448                        resolver.retain(move |x| x > &current_end).await;
449                    }
450
451                    // Enqueue missing seeds
452                    let missing = certificates.missing_items(1, BATCH_ENQUEUE);
453                    for next in missing {
454                        if !self.waiting.insert(next) {
455                            continue;
456                        }
457                        resolver.fetch(next.into()).await;
458                    }
459                }
460                Message::Tip { index: tip } => {
461                    debug!(tip, "new aggregation tip");
462                }
463                Message::Deliver {
464                    index,
465                    certificate,
466                    response,
467                } => {
468                    // Decode certificate
469                    let Ok(certificate) =
470                        Certificate::<MinSig, Digest>::decode(&mut certificate.as_ref())
471                    else {
472                        response.send(false).expect("failed to send false");
473                        continue;
474                    };
475                    if certificate.item.index != index {
476                        response.send(false).expect("failed to send false");
477                        continue;
478                    }
479
480                    // Verify certificate
481                    if !certificate.verify(&self.config.namespace, &self.config.identity) {
482                        response.send(false).expect("failed to send false");
483                        continue;
484                    }
485
486                    // Store in certificates
487                    self.waiting.remove(&index);
488                    certificates
489                        .put(index, certificate.clone().into())
490                        .await
491                        .unwrap();
492                    certificates.sync().await.unwrap();
493
494                    // Enqueue missing seeds
495                    let missing = certificates.missing_items(1, BATCH_ENQUEUE);
496                    for next in missing {
497                        if !self.waiting.insert(next) {
498                            continue;
499                        }
500                        resolver.fetch(next.into()).await;
501                    }
502                }
503                Message::Produce { index, response } => {
504                    // Fetch item from certificates
505                    let Ok(Some(fixed_certificate)) = certificates.get(index).await else {
506                        continue;
507                    };
508                    let certificate: Certificate<MinSig, Digest> = fixed_certificate.into();
509                    response
510                        .send(certificate.encode().into())
511                        .expect("failed to send certificate");
512                }
513            }
514
515            // Attempt to upload any certificates
516            //
517            // We only delete entires in the cache when they cross the section boundary,
518            // so we may re-upload the same height again on restart.
519            while uploads_outstanding < self.config.max_uploads_outstanding {
520                // Get next certificate
521                if !cache.has(cursor) || !certificates.has(cursor) {
522                    break;
523                }
524
525                // Increment uploads outstanding
526                uploads_outstanding += 1;
527
528                // Get certificate
529                let certificate = certificates
530                    .get(cursor)
531                    .await
532                    .unwrap()
533                    .expect("failed to fetch certificate");
534
535                // Get result
536                let result = results
537                    .read(cursor - 1)
538                    .await
539                    .expect("failed to fetch result"); // offset by 1 because stored by 0th offset
540
541                // Get proofs
542                let proofs = cache
543                    .get(cursor)
544                    .await
545                    .unwrap()
546                    .expect("failed to fetch proofs");
547
548                // Upload the summary to the indexer
549                let summary = Summary {
550                    progress: result,
551                    certificate: certificate.into(),
552                    state_proof: proofs.state_proof,
553                    state_proof_ops: proofs.state_proof_ops,
554                    events_proof: proofs.events_proof,
555                    events_proof_ops: proofs.events_proof_ops,
556                };
557                self.context.with_label("summary_submit").spawn({
558                    let indexer = self.config.indexer.clone();
559                    let mut channel = self.inbound.clone();
560                    move |context| async move {
561                        let mut attempts = 1;
562                        loop {
563                            let Err(e) = indexer.submit_summary(summary.clone()).await else {
564                                break;
565                            };
566                            warn!(?e, attempts, "failed to upload summary");
567                            context.sleep(RETRY_DELAY).await;
568                            attempts += 1;
569                        }
570                        debug!(cursor, attempts, "summary uploaded to indexer");
571                        channel.uploaded(cursor).await;
572                    }
573                });
574
575                // Increment cursor
576                cursor += 1;
577            }
578        }
579    }
580}