1use alto_types::{Activity, Block, Finalized, Identity, Notarized, Seed};
2use commonware_consensus::{marshal, threshold_simplex::types::Seedable, Reporter, Viewable};
3use commonware_cryptography::bls12381::primitives::variant::MinSig;
4use commonware_runtime::{Metrics, Spawner};
5use std::future::Future;
6#[cfg(test)]
7use std::{sync::atomic::AtomicBool, sync::Arc};
8use tracing::{debug, warn};
9
10pub trait Indexer: Clone + Send + Sync + 'static {
12 type Error: std::error::Error + Send + Sync + 'static;
13
14 fn new(uri: &str, public: Identity) -> Self;
16
17 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
19
20 fn notarized_upload(
22 &self,
23 notarized: Notarized,
24 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
25
26 fn finalized_upload(
28 &self,
29 finalized: Finalized,
30 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
31}
32
33#[cfg(test)]
35#[derive(Clone)]
36pub struct Mock {
37 pub seed_seen: Arc<AtomicBool>,
38 pub notarization_seen: Arc<AtomicBool>,
39 pub finalization_seen: Arc<AtomicBool>,
40}
41
42#[cfg(test)]
43impl Indexer for Mock {
44 type Error = std::io::Error;
45
46 fn new(_: &str, _: Identity) -> Self {
47 Mock {
48 seed_seen: Arc::new(AtomicBool::new(false)),
49 notarization_seen: Arc::new(AtomicBool::new(false)),
50 finalization_seen: Arc::new(AtomicBool::new(false)),
51 }
52 }
53
54 async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
55 self.seed_seen
56 .store(true, std::sync::atomic::Ordering::Relaxed);
57 Ok(())
58 }
59
60 async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
61 self.notarization_seen
62 .store(true, std::sync::atomic::Ordering::Relaxed);
63 Ok(())
64 }
65
66 async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
67 self.finalization_seen
68 .store(true, std::sync::atomic::Ordering::Relaxed);
69 Ok(())
70 }
71}
72
73impl Indexer for alto_client::Client {
74 type Error = alto_client::Error;
75
76 fn new(uri: &str, identity: Identity) -> Self {
77 Self::new(uri, identity)
78 }
79
80 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
81 self.seed_upload(seed)
82 }
83
84 fn notarized_upload(
85 &self,
86 notarized: Notarized,
87 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
88 self.notarized_upload(notarized)
89 }
90
91 fn finalized_upload(
92 &self,
93 finalized: Finalized,
94 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
95 self.finalized_upload(finalized)
96 }
97}
98
99#[derive(Clone)]
101pub struct Pusher<E: Spawner + Metrics, I: Indexer> {
102 context: E,
103 indexer: I,
104 marshal: marshal::Mailbox<MinSig, Block>,
105}
106
107impl<E: Spawner + Metrics, I: Indexer> Pusher<E, I> {
108 pub fn new(context: E, indexer: I, marshal: marshal::Mailbox<MinSig, Block>) -> Self {
110 Self {
111 context,
112 indexer,
113 marshal,
114 }
115 }
116}
117
118impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
119 type Activity = Activity;
120
121 async fn report(&mut self, activity: Self::Activity) {
122 match activity {
123 Activity::Notarization(notarization) => {
124 let view = notarization.view();
126 self.context.with_label("notarized_seed").spawn({
127 let indexer = self.indexer.clone();
128 let seed = notarization.seed();
129 move |_| async move {
130 let result = indexer.seed_upload(seed).await;
131 if let Err(e) = result {
132 warn!(?e, "failed to upload seed");
133 return;
134 }
135 debug!(view, "seed uploaded to indexer");
136 }
137 });
138
139 self.context.with_label("notarized_block").spawn({
141 let indexer = self.indexer.clone();
142 let mut marshal = self.marshal.clone();
143 move |_| async move {
144 let block = marshal
146 .subscribe(Some(notarization.view()), notarization.proposal.payload)
147 .await
148 .await;
149 let Ok(block) = block else {
150 warn!(view, "subscription for block cancelled");
151 return;
152 };
153
154 let notarization = Notarized::new(notarization, block);
156 let result = indexer.notarized_upload(notarization).await;
157 if let Err(e) = result {
158 warn!(?e, "failed to upload notarization");
159 return;
160 }
161 debug!(view, "notarization uploaded to indexer");
162 }
163 });
164 }
165 Activity::Finalization(finalization) => {
166 let view = finalization.view();
168 self.context.with_label("finalized_seed").spawn({
169 let indexer = self.indexer.clone();
170 let seed = finalization.seed();
171 move |_| async move {
172 let result = indexer.seed_upload(seed).await;
173 if let Err(e) = result {
174 warn!(?e, "failed to upload seed");
175 return;
176 }
177 debug!(view, "seed uploaded to indexer");
178 }
179 });
180
181 self.context.with_label("finalized_block").spawn({
183 let indexer = self.indexer.clone();
184 let mut marshal = self.marshal.clone();
185 move |_| async move {
186 let block = marshal
187 .subscribe(Some(finalization.view()), finalization.proposal.payload)
188 .await
189 .await;
190 let Ok(block) = block else {
191 warn!(view, "subscription for block cancelled");
192 return;
193 };
194
195 let finalization = Finalized::new(finalization, block);
197 let result = indexer.finalized_upload(finalization).await;
198 if let Err(e) = result {
199 warn!(?e, "failed to upload finalization");
200 return;
201 }
202 debug!(view, "finalization uploaded to indexer");
203 }
204 });
205 }
206 _ => {}
207 }
208 }
209}