1use alto_types::{Activity, Block, Finalized, Identity, Notarized, Scheme, Seed, Seedable};
2use commonware_consensus::{marshal, Reporter, Viewable};
3use commonware_runtime::{Metrics, Spawner};
4use std::future::Future;
5#[cfg(test)]
6use std::{sync::atomic::AtomicBool, sync::Arc};
7use tracing::{debug, warn};
8
9pub trait Indexer: Clone + Send + Sync + 'static {
11 type Error: std::error::Error + Send + Sync + 'static;
12
13 fn new(uri: &str, public: Identity) -> Self;
15
16 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
18
19 fn notarized_upload(
21 &self,
22 notarized: Notarized,
23 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
24
25 fn finalized_upload(
27 &self,
28 finalized: Finalized,
29 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
30}
31
32#[cfg(test)]
34#[derive(Clone)]
35pub struct Mock {
36 pub seed_seen: Arc<AtomicBool>,
37 pub notarization_seen: Arc<AtomicBool>,
38 pub finalization_seen: Arc<AtomicBool>,
39}
40
41#[cfg(test)]
42impl Indexer for Mock {
43 type Error = std::io::Error;
44
45 fn new(_: &str, _: Identity) -> Self {
46 Mock {
47 seed_seen: Arc::new(AtomicBool::new(false)),
48 notarization_seen: Arc::new(AtomicBool::new(false)),
49 finalization_seen: Arc::new(AtomicBool::new(false)),
50 }
51 }
52
53 async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
54 self.seed_seen
55 .store(true, std::sync::atomic::Ordering::Relaxed);
56 Ok(())
57 }
58
59 async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
60 self.notarization_seen
61 .store(true, std::sync::atomic::Ordering::Relaxed);
62 Ok(())
63 }
64
65 async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
66 self.finalization_seen
67 .store(true, std::sync::atomic::Ordering::Relaxed);
68 Ok(())
69 }
70}
71
72impl Indexer for alto_client::Client {
73 type Error = alto_client::Error;
74
75 fn new(uri: &str, identity: Identity) -> Self {
76 Self::new(uri, identity)
77 }
78
79 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
80 self.seed_upload(seed)
81 }
82
83 fn notarized_upload(
84 &self,
85 notarized: Notarized,
86 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
87 self.notarized_upload(notarized)
88 }
89
90 fn finalized_upload(
91 &self,
92 finalized: Finalized,
93 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
94 self.finalized_upload(finalized)
95 }
96}
97
98#[derive(Clone)]
100pub struct Pusher<E: Spawner + Metrics, I: Indexer> {
101 context: E,
102 indexer: I,
103 marshal: marshal::Mailbox<Scheme, Block>,
104}
105
106impl<E: Spawner + Metrics, I: Indexer> Pusher<E, I> {
107 pub fn new(context: E, indexer: I, marshal: marshal::Mailbox<Scheme, Block>) -> Self {
109 Self {
110 context,
111 indexer,
112 marshal,
113 }
114 }
115}
116
117impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
118 type Activity = Activity;
119
120 async fn report(&mut self, activity: Self::Activity) {
121 match activity {
122 Activity::Notarization(notarization) => {
123 let view = notarization.view();
125 self.context.with_label("notarized_seed").spawn({
126 let indexer = self.indexer.clone();
127 let seed = notarization.seed();
128 move |_| async move {
129 let result = indexer.seed_upload(seed).await;
130 if let Err(e) = result {
131 warn!(?e, "failed to upload seed");
132 return;
133 }
134 debug!(view, "seed uploaded to indexer");
135 }
136 });
137
138 self.context.with_label("notarized_block").spawn({
140 let indexer = self.indexer.clone();
141 let mut marshal = self.marshal.clone();
142 move |_| async move {
143 let block = marshal
145 .subscribe(Some(notarization.round()), notarization.proposal.payload)
146 .await
147 .await;
148 let Ok(block) = block else {
149 warn!(view, "subscription for block cancelled");
150 return;
151 };
152
153 let notarization = Notarized::new(notarization, block);
155 let result = indexer.notarized_upload(notarization).await;
156 if let Err(e) = result {
157 warn!(?e, "failed to upload notarization");
158 return;
159 }
160 debug!(view, "notarization uploaded to indexer");
161 }
162 });
163 }
164 Activity::Finalization(finalization) => {
165 let view = finalization.view();
167 self.context.with_label("finalized_seed").spawn({
168 let indexer = self.indexer.clone();
169 let seed = finalization.seed();
170 move |_| async move {
171 let result = indexer.seed_upload(seed).await;
172 if let Err(e) = result {
173 warn!(?e, "failed to upload seed");
174 return;
175 }
176 debug!(view, "seed uploaded to indexer");
177 }
178 });
179
180 self.context.with_label("finalized_block").spawn({
182 let indexer = self.indexer.clone();
183 let mut marshal = self.marshal.clone();
184 move |_| async move {
185 let block = marshal
186 .subscribe(Some(finalization.round()), finalization.proposal.payload)
187 .await
188 .await;
189 let Ok(block) = block else {
190 warn!(view, "subscription for block cancelled");
191 return;
192 };
193
194 let finalization = Finalized::new(finalization, block);
196 let result = indexer.finalized_upload(finalization).await;
197 if let Err(e) = result {
198 warn!(?e, "failed to upload finalization");
199 return;
200 }
201 debug!(view, "finalization uploaded to indexer");
202 }
203 });
204 }
205 _ => {}
206 }
207 }
208}