alto_chain/
indexer.rs

1use alto_types::{Activity, Block, Finalized, Identity, Notarized, Seed};
2use commonware_consensus::{marshal, threshold_simplex::types::Seedable, Reporter, Viewable};
3use commonware_cryptography::bls12381::primitives::variant::MinSig;
4use commonware_runtime::{Metrics, Spawner};
5use std::future::Future;
6#[cfg(test)]
7use std::{sync::atomic::AtomicBool, sync::Arc};
8use tracing::{debug, warn};
9
10/// Trait for interacting with an indexer.
11pub trait Indexer: Clone + Send + Sync + 'static {
12    type Error: std::error::Error + Send + Sync + 'static;
13
14    /// Create a new indexer with the given URI and public key.
15    fn new(uri: &str, public: Identity) -> Self;
16
17    /// Upload a seed to the indexer.
18    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
19
20    /// Upload a notarization to the indexer.
21    fn notarized_upload(
22        &self,
23        notarized: Notarized,
24    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
25
26    /// Upload a finalization to the indexer.
27    fn finalized_upload(
28        &self,
29        finalized: Finalized,
30    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
31}
32
33/// A mock indexer implementation for testing.
34#[cfg(test)]
35#[derive(Clone)]
36pub struct Mock {
37    pub seed_seen: Arc<AtomicBool>,
38    pub notarization_seen: Arc<AtomicBool>,
39    pub finalization_seen: Arc<AtomicBool>,
40}
41
42#[cfg(test)]
43impl Indexer for Mock {
44    type Error = std::io::Error;
45
46    fn new(_: &str, _: Identity) -> Self {
47        Mock {
48            seed_seen: Arc::new(AtomicBool::new(false)),
49            notarization_seen: Arc::new(AtomicBool::new(false)),
50            finalization_seen: Arc::new(AtomicBool::new(false)),
51        }
52    }
53
54    async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
55        self.seed_seen
56            .store(true, std::sync::atomic::Ordering::Relaxed);
57        Ok(())
58    }
59
60    async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
61        self.notarization_seen
62            .store(true, std::sync::atomic::Ordering::Relaxed);
63        Ok(())
64    }
65
66    async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
67        self.finalization_seen
68            .store(true, std::sync::atomic::Ordering::Relaxed);
69        Ok(())
70    }
71}
72
73impl Indexer for alto_client::Client {
74    type Error = alto_client::Error;
75
76    fn new(uri: &str, identity: Identity) -> Self {
77        Self::new(uri, identity)
78    }
79
80    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
81        self.seed_upload(seed)
82    }
83
84    fn notarized_upload(
85        &self,
86        notarized: Notarized,
87    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
88        self.notarized_upload(notarized)
89    }
90
91    fn finalized_upload(
92        &self,
93        finalized: Finalized,
94    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
95        self.finalized_upload(finalized)
96    }
97}
98
99/// An implementation of [Indexer] for the [Reporter] trait.
100#[derive(Clone)]
101pub struct Pusher<E: Spawner + Metrics, I: Indexer> {
102    context: E,
103    indexer: I,
104    marshal: marshal::Mailbox<MinSig, Block>,
105}
106
107impl<E: Spawner + Metrics, I: Indexer> Pusher<E, I> {
108    /// Create a new [Pusher].
109    pub fn new(context: E, indexer: I, marshal: marshal::Mailbox<MinSig, Block>) -> Self {
110        Self {
111            context,
112            indexer,
113            marshal,
114        }
115    }
116}
117
118impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
119    type Activity = Activity;
120
121    async fn report(&mut self, activity: Self::Activity) {
122        match activity {
123            Activity::Notarization(notarization) => {
124                // Upload seed to indexer
125                let view = notarization.view();
126                self.context.with_label("notarized_seed").spawn({
127                    let indexer = self.indexer.clone();
128                    let seed = notarization.seed();
129                    move |_| async move {
130                        let result = indexer.seed_upload(seed).await;
131                        if let Err(e) = result {
132                            warn!(?e, "failed to upload seed");
133                            return;
134                        }
135                        debug!(view, "seed uploaded to indexer");
136                    }
137                });
138
139                // Upload block to indexer (once we have it)
140                self.context.with_label("notarized_block").spawn({
141                    let indexer = self.indexer.clone();
142                    let mut marshal = self.marshal.clone();
143                    move |_| async move {
144                        // Wait for block
145                        let block = marshal
146                            .subscribe(Some(notarization.view()), notarization.proposal.payload)
147                            .await
148                            .await;
149                        let Ok(block) = block else {
150                            warn!(view, "subscription for block cancelled");
151                            return;
152                        };
153
154                        // Upload to indexer once we have it
155                        let notarization = Notarized::new(notarization, block);
156                        let result = indexer.notarized_upload(notarization).await;
157                        if let Err(e) = result {
158                            warn!(?e, "failed to upload notarization");
159                            return;
160                        }
161                        debug!(view, "notarization uploaded to indexer");
162                    }
163                });
164            }
165            Activity::Finalization(finalization) => {
166                // Upload seed to indexer
167                let view = finalization.view();
168                self.context.with_label("finalized_seed").spawn({
169                    let indexer = self.indexer.clone();
170                    let seed = finalization.seed();
171                    move |_| async move {
172                        let result = indexer.seed_upload(seed).await;
173                        if let Err(e) = result {
174                            warn!(?e, "failed to upload seed");
175                            return;
176                        }
177                        debug!(view, "seed uploaded to indexer");
178                    }
179                });
180
181                // Upload block to indexer (once we have it)
182                self.context.with_label("finalized_block").spawn({
183                    let indexer = self.indexer.clone();
184                    let mut marshal = self.marshal.clone();
185                    move |_| async move {
186                        let block = marshal
187                            .subscribe(Some(finalization.view()), finalization.proposal.payload)
188                            .await
189                            .await;
190                        let Ok(block) = block else {
191                            warn!(view, "subscription for block cancelled");
192                            return;
193                        };
194
195                        // Upload to indexer once we have it
196                        let finalization = Finalized::new(finalization, block);
197                        let result = indexer.finalized_upload(finalization).await;
198                        if let Err(e) = result {
199                            warn!(?e, "failed to upload finalization");
200                            return;
201                        }
202                        debug!(view, "finalization uploaded to indexer");
203                    }
204                });
205            }
206            _ => {}
207        }
208    }
209}