1use crate::{
2 aggregator::{ingress::Mailbox, Config, Message},
3 indexer::Indexer,
4};
5use battleware_types::{
6 api::Summary,
7 execution::{Output, Progress, Value},
8 genesis_digest,
9};
10use bytes::{Buf, BufMut};
11use commonware_codec::{
12 DecodeExt, Encode, EncodeSize, FixedSize, Read, ReadExt, ReadRangeExt, Write,
13};
14use commonware_consensus::aggregation::types::{Certificate, Index, Item};
15use commonware_cryptography::{
16 bls12381::primitives::variant::{MinSig, Variant},
17 ed25519::PublicKey,
18 sha256::Digest,
19 Digestible,
20};
21use commonware_p2p::{Receiver, Sender};
22use commonware_resolver::{p2p, Resolver};
23use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
24use commonware_storage::{
25 cache,
26 journal::fixed,
27 mmr::verification::Proof,
28 ordinal::{self, Ordinal},
29 rmap::RMap,
30 store::operation::{Keyless, Variable},
31};
32use commonware_utils::sequence::U64;
33use futures::{
34 channel::{mpsc, oneshot},
35 join, StreamExt,
36};
37use governor::clock::Clock as GClock;
38use prometheus_client::metrics::gauge::Gauge;
39use rand::RngCore;
40use std::{
41 collections::{BTreeMap, BTreeSet},
42 time::Duration,
43};
44use tracing::{debug, info, warn};
45
46const BATCH_ENQUEUE: usize = 20;
47const RETRY_DELAY: Duration = Duration::from_secs(10);
48
49pub struct Proofs {
50 pub state_proof: Proof<Digest>,
51 pub state_proof_ops: Vec<Variable<Digest, Value>>,
52 pub events_proof: Proof<Digest>,
53 pub events_proof_ops: Vec<Keyless<Output>>,
54}
55
56impl Write for Proofs {
57 fn write(&self, buf: &mut impl BufMut) {
58 self.state_proof.write(buf);
59 self.state_proof_ops.write(buf);
60 self.events_proof.write(buf);
61 self.events_proof_ops.write(buf);
62 }
63}
64
65impl Read for Proofs {
66 type Cfg = ();
67
68 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
69 let state_proof = Proof::<Digest>::read_cfg(reader, &500)?;
70 let state_proof_ops = Vec::read_range(reader, 0..=500)?;
71 let events_proof = Proof::<Digest>::read_cfg(reader, &500)?;
72 let events_proof_ops = Vec::read_range(reader, 0..=500)?;
73 Ok(Self {
74 state_proof,
75 state_proof_ops,
76 events_proof,
77 events_proof_ops,
78 })
79 }
80}
81
82impl EncodeSize for Proofs {
83 fn encode_size(&self) -> usize {
84 self.state_proof.encode_size()
85 + self.state_proof_ops.encode_size()
86 + self.events_proof.encode_size()
87 + self.events_proof_ops.encode_size()
88 }
89}
90
91pub struct FixedCertificate {
93 pub index: Index,
94 pub digest: Digest,
95 pub signature: <MinSig as Variant>::Signature,
96}
97
98impl Write for FixedCertificate {
99 fn write(&self, buf: &mut impl BufMut) {
100 self.index.write(buf);
101 self.digest.write(buf);
102 self.signature.write(buf);
103 }
104}
105
106impl Read for FixedCertificate {
107 type Cfg = ();
108
109 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
110 let index = Index::read(reader)?;
111 let digest = Digest::read(reader)?;
112 let signature = <MinSig as Variant>::Signature::read(reader)?;
113 Ok(Self {
114 index,
115 digest,
116 signature,
117 })
118 }
119}
120
121impl FixedSize for FixedCertificate {
122 const SIZE: usize = Index::SIZE + Digest::SIZE + <MinSig as Variant>::Signature::SIZE;
123}
124
125impl From<Certificate<MinSig, Digest>> for FixedCertificate {
126 fn from(certificate: Certificate<MinSig, Digest>) -> Self {
127 Self {
128 index: certificate.item.index,
129 digest: certificate.item.digest,
130 signature: certificate.signature,
131 }
132 }
133}
134
135impl From<FixedCertificate> for Certificate<MinSig, Digest> {
136 fn from(fixed_certificate: FixedCertificate) -> Self {
137 Self {
138 item: Item {
139 index: fixed_certificate.index,
140 digest: fixed_certificate.digest,
141 },
142 signature: fixed_certificate.signature,
143 }
144 }
145}
146
147pub struct Actor<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> {
148 context: R,
149 config: Config<I>,
150 inbound: Mailbox,
151 mailbox: mpsc::Receiver<Message>,
152
153 waiting: BTreeSet<u64>,
154 certificates_processed: Gauge,
155}
156
157impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Actor<R, I> {
158 pub fn new(context: R, config: Config<I>) -> (Self, Mailbox) {
159 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
161 let inbound = Mailbox::new(sender);
162
163 let certificates_processed = Gauge::default();
165 context.register(
166 "certificates_processed",
167 "Number of contiguous certificates processed",
168 certificates_processed.clone(),
169 );
170
171 (
172 Self {
173 context,
174 config,
175 inbound: inbound.clone(),
176 mailbox,
177 waiting: BTreeSet::new(),
178 certificates_processed,
179 },
180 inbound,
181 )
182 }
183
184 pub fn start(
185 mut self,
186 backfill: (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 ) -> Handle<()> {
191 self.context.spawn_ref()(self.run(backfill))
192 }
193
194 async fn run(
195 mut self,
196 backfill: (
197 impl Sender<PublicKey = PublicKey>,
198 impl Receiver<PublicKey = PublicKey>,
199 ),
200 ) {
201 let mut cache = cache::Cache::<_, Proofs>::init(
203 self.context.with_label("cache"),
204 cache::Config {
205 partition: format!("{}-cache", self.config.partition),
206 compression: None,
207 codec_config: (),
208 items_per_blob: self.config.prunable_items_per_blob,
209 write_buffer: self.config.write_buffer,
210 replay_buffer: self.config.replay_buffer,
211 buffer_pool: self.config.buffer_pool.clone(),
212 },
213 )
214 .await
215 .expect("failed to initialize cache");
216 let mut results = fixed::Journal::init(
217 self.context.with_label("results"),
218 fixed::Config {
219 partition: format!("{}-results", self.config.partition),
220 items_per_blob: self.config.persistent_items_per_blob,
221 write_buffer: self.config.write_buffer,
222 buffer_pool: self.config.buffer_pool,
223 },
224 )
225 .await
226 .expect("failed to initialize results storage");
227 let mut certificates = Ordinal::<_, FixedCertificate>::init(
228 self.context.with_label("certificates"),
229 ordinal::Config {
230 partition: format!("{}-certificates", self.config.partition),
231 items_per_blob: self.config.persistent_items_per_blob,
232 write_buffer: self.config.write_buffer,
233 replay_buffer: self.config.replay_buffer,
234 },
235 )
236 .await
237 .expect("failed to initialize certificate storage");
238
239 let (resolver_engine, mut resolver) = p2p::Engine::new(
241 self.context.with_label("resolver"),
242 p2p::Config {
243 coordinator: self.config.supervisor,
244 consumer: self.inbound.clone(),
245 producer: self.inbound.clone(),
246 mailbox_size: self.config.mailbox_size,
247 requester_config: commonware_p2p::utils::requester::Config {
248 public_key: self.config.public_key,
249 rate_limit: self.config.backfill_quota,
250 initial: Duration::from_secs(1),
251 timeout: Duration::from_secs(2),
252 },
253 fetch_retry_timeout: Duration::from_secs(10),
254 priority_requests: false,
255 priority_responses: false,
256 },
257 );
258 resolver_engine.start(backfill);
259
260 let missing = certificates.missing_items(1, BATCH_ENQUEUE);
262 for next in missing {
263 self.waiting.insert(next);
264 resolver.fetch(next.into()).await;
265 }
266
267 let genesis_digest = genesis_digest();
269
270 let mut uploads_outstanding = 0;
272 let mut cursor = cache.first().unwrap_or(1); let mut boundary = cursor;
274 let mut tracked_uploads = RMap::new();
275 info!(cursor, "initial summary cursor");
276
277 let mut proposal_requests: BTreeMap<u64, oneshot::Sender<Digest>> = BTreeMap::new();
279 let mut verify_requests: BTreeMap<u64, (Digest, oneshot::Sender<bool>)> = BTreeMap::new();
280 loop {
281 let Some(message) = self.mailbox.next().await else {
282 warn!("mailbox closed");
283 break;
284 };
285 match message {
286 Message::Uploaded { index } => {
287 uploads_outstanding -= 1;
289
290 tracked_uploads.insert(index);
292
293 let Some(end_region) = tracked_uploads.next_gap(boundary).0 else {
295 continue;
296 };
297 if end_region > boundary {
298 cache
299 .prune(end_region)
300 .await
301 .expect("failed to prune cache");
302 boundary = end_region;
303 info!(boundary, "updated summary upload marker");
304 }
305 }
306 Message::Executed {
307 view,
308 height,
309 commitment,
310 result,
311 state_proof,
312 state_proof_ops,
313 events_proof,
314 events_proof_ops,
315 response,
316 } => {
317 let cache_task = async {
319 let proofs = Proofs {
320 state_proof,
321 state_proof_ops,
322 events_proof,
323 events_proof_ops,
324 };
325 cache.put(height, proofs).await.unwrap(); cache.sync().await.unwrap();
327 };
328
329 let result = Progress::new(
331 view,
332 height,
333 commitment,
334 result.state_root,
335 result.state_start_op,
336 result.state_end_op,
337 result.events_root,
338 result.events_start_op,
339 result.events_end_op,
340 );
341 let result_digest = result.digest();
342 let progress_task = async {
343 if results.size().await.unwrap() == height {
347 warn!(height, "already processed results");
348 return;
349 }
350 results.append(result).await.unwrap();
351 results.sync().await.unwrap();
352 };
353 join!(cache_task, progress_task);
354 info!(
355 height,
356 view,
357 ?result.state_root,
358 result.state_start_op,
359 result.state_end_op,
360 ?result.events_root,
361 result.events_start_op,
362 result.events_end_op,
363 ?result_digest,
364 "processed block"
365 );
366
367 if let Some(request) = proposal_requests.remove(&height) {
369 debug!(height, view, "backfilled aggregation proposal");
370 let _ = request.send(result_digest);
371 }
372 proposal_requests.retain(|index, _| *index > height);
373 if let Some((payload, request)) = verify_requests.remove(&height) {
374 debug!(height, view, "backfilled aggregation verify");
375 let _ = request.send(result_digest == payload);
376 }
377 verify_requests.retain(|index, _| *index > height);
378
379 let _ = response.send(());
381 }
382 Message::Genesis { response } => {
383 let _ = response.send(genesis_digest);
384 }
385 Message::Propose { index, response } => {
386 if index == 0 {
388 let _ = response.send(genesis_digest);
389 continue;
390 }
391 let item = index - 1;
392 if let Ok(result) = results.read(item).await {
393 let _ = response.send(result.digest());
394 continue;
395 };
396
397 proposal_requests.insert(index, response);
399 }
400 Message::Verify {
401 index,
402 payload,
403 response,
404 } => {
405 if index == 0 {
406 let _ = response.send(genesis_digest == payload);
407 continue;
408 }
409 let item = index - 1;
410 if let Ok(result) = results.read(item).await {
411 let _ = response.send(result.digest() == payload);
412 continue;
413 };
414
415 verify_requests.insert(index, (payload, response));
417 }
418 Message::Certified { certificate } => {
419 self.waiting.remove(&certificate.item.index);
420
421 let height = certificate.item.index;
422 if height == 0 {
423 continue;
424 }
425
426 if certificates.has(height) {
428 debug!(height, "already processed");
429 continue;
430 }
431 info!(
432 height = certificate.item.index,
433 digest = ?certificate.item.digest,
434 "certified block"
435 );
436
437 certificates
439 .put(height, certificate.clone().into())
440 .await
441 .unwrap();
442 certificates.sync().await.unwrap();
443
444 if let Some(current_end) = certificates.next_gap(1).0 {
446 self.certificates_processed.set(current_end as i64);
447 let current_end = U64::from(current_end);
448 resolver.retain(move |x| x > ¤t_end).await;
449 }
450
451 let missing = certificates.missing_items(1, BATCH_ENQUEUE);
453 for next in missing {
454 if !self.waiting.insert(next) {
455 continue;
456 }
457 resolver.fetch(next.into()).await;
458 }
459 }
460 Message::Tip { index: tip } => {
461 debug!(tip, "new aggregation tip");
462 }
463 Message::Deliver {
464 index,
465 certificate,
466 response,
467 } => {
468 let Ok(certificate) =
470 Certificate::<MinSig, Digest>::decode(&mut certificate.as_ref())
471 else {
472 response.send(false).expect("failed to send false");
473 continue;
474 };
475 if certificate.item.index != index {
476 response.send(false).expect("failed to send false");
477 continue;
478 }
479
480 if !certificate.verify(&self.config.namespace, &self.config.identity) {
482 response.send(false).expect("failed to send false");
483 continue;
484 }
485
486 self.waiting.remove(&index);
488 certificates
489 .put(index, certificate.clone().into())
490 .await
491 .unwrap();
492 certificates.sync().await.unwrap();
493
494 let missing = certificates.missing_items(1, BATCH_ENQUEUE);
496 for next in missing {
497 if !self.waiting.insert(next) {
498 continue;
499 }
500 resolver.fetch(next.into()).await;
501 }
502 }
503 Message::Produce { index, response } => {
504 let Ok(Some(fixed_certificate)) = certificates.get(index).await else {
506 continue;
507 };
508 let certificate: Certificate<MinSig, Digest> = fixed_certificate.into();
509 response
510 .send(certificate.encode().into())
511 .expect("failed to send certificate");
512 }
513 }
514
515 while uploads_outstanding < self.config.max_uploads_outstanding {
520 if !cache.has(cursor) || !certificates.has(cursor) {
522 break;
523 }
524
525 uploads_outstanding += 1;
527
528 let certificate = certificates
530 .get(cursor)
531 .await
532 .unwrap()
533 .expect("failed to fetch certificate");
534
535 let result = results
537 .read(cursor - 1)
538 .await
539 .expect("failed to fetch result"); let proofs = cache
543 .get(cursor)
544 .await
545 .unwrap()
546 .expect("failed to fetch proofs");
547
548 let summary = Summary {
550 progress: result,
551 certificate: certificate.into(),
552 state_proof: proofs.state_proof,
553 state_proof_ops: proofs.state_proof_ops,
554 events_proof: proofs.events_proof,
555 events_proof_ops: proofs.events_proof_ops,
556 };
557 self.context.with_label("summary_submit").spawn({
558 let indexer = self.config.indexer.clone();
559 let mut channel = self.inbound.clone();
560 move |context| async move {
561 let mut attempts = 1;
562 loop {
563 let Err(e) = indexer.submit_summary(summary.clone()).await else {
564 break;
565 };
566 warn!(?e, attempts, "failed to upload summary");
567 context.sleep(RETRY_DELAY).await;
568 attempts += 1;
569 }
570 debug!(cursor, attempts, "summary uploaded to indexer");
571 channel.uploaded(cursor).await;
572 }
573 });
574
575 cursor += 1;
577 }
578 }
579 }
580}