alto_chain/
indexer.rs

1use alto_types::{Activity, Block, Finalized, Identity, Notarized, Scheme, Seed, Seedable};
2use commonware_consensus::{marshal, Reporter, Viewable};
3use commonware_runtime::{Metrics, Spawner};
4use std::future::Future;
5#[cfg(test)]
6use std::{sync::atomic::AtomicBool, sync::Arc};
7use tracing::{debug, warn};
8
9/// Trait for interacting with an indexer.
10pub trait Indexer: Clone + Send + Sync + 'static {
11    type Error: std::error::Error + Send + Sync + 'static;
12
13    /// Create a new indexer with the given URI and public key.
14    fn new(uri: &str, public: Identity) -> Self;
15
16    /// Upload a seed to the indexer.
17    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
18
19    /// Upload a notarization to the indexer.
20    fn notarized_upload(
21        &self,
22        notarized: Notarized,
23    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
24
25    /// Upload a finalization to the indexer.
26    fn finalized_upload(
27        &self,
28        finalized: Finalized,
29    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
30}
31
32/// A mock indexer implementation for testing.
33#[cfg(test)]
34#[derive(Clone)]
35pub struct Mock {
36    pub seed_seen: Arc<AtomicBool>,
37    pub notarization_seen: Arc<AtomicBool>,
38    pub finalization_seen: Arc<AtomicBool>,
39}
40
41#[cfg(test)]
42impl Indexer for Mock {
43    type Error = std::io::Error;
44
45    fn new(_: &str, _: Identity) -> Self {
46        Mock {
47            seed_seen: Arc::new(AtomicBool::new(false)),
48            notarization_seen: Arc::new(AtomicBool::new(false)),
49            finalization_seen: Arc::new(AtomicBool::new(false)),
50        }
51    }
52
53    async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
54        self.seed_seen
55            .store(true, std::sync::atomic::Ordering::Relaxed);
56        Ok(())
57    }
58
59    async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
60        self.notarization_seen
61            .store(true, std::sync::atomic::Ordering::Relaxed);
62        Ok(())
63    }
64
65    async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
66        self.finalization_seen
67            .store(true, std::sync::atomic::Ordering::Relaxed);
68        Ok(())
69    }
70}
71
72impl Indexer for alto_client::Client {
73    type Error = alto_client::Error;
74
75    fn new(uri: &str, identity: Identity) -> Self {
76        Self::new(uri, identity)
77    }
78
79    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
80        self.seed_upload(seed)
81    }
82
83    fn notarized_upload(
84        &self,
85        notarized: Notarized,
86    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
87        self.notarized_upload(notarized)
88    }
89
90    fn finalized_upload(
91        &self,
92        finalized: Finalized,
93    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
94        self.finalized_upload(finalized)
95    }
96}
97
98/// An implementation of [Indexer] for the [Reporter] trait.
99#[derive(Clone)]
100pub struct Pusher<E: Spawner + Metrics, I: Indexer> {
101    context: E,
102    indexer: I,
103    marshal: marshal::Mailbox<Scheme, Block>,
104}
105
106impl<E: Spawner + Metrics, I: Indexer> Pusher<E, I> {
107    /// Create a new [Pusher].
108    pub fn new(context: E, indexer: I, marshal: marshal::Mailbox<Scheme, Block>) -> Self {
109        Self {
110            context,
111            indexer,
112            marshal,
113        }
114    }
115}
116
117impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
118    type Activity = Activity;
119
120    async fn report(&mut self, activity: Self::Activity) {
121        match activity {
122            Activity::Notarization(notarization) => {
123                // Upload seed to indexer
124                let view = notarization.view();
125                self.context.with_label("notarized_seed").spawn({
126                    let indexer = self.indexer.clone();
127                    let seed = notarization.seed();
128                    move |_| async move {
129                        let result = indexer.seed_upload(seed).await;
130                        if let Err(e) = result {
131                            warn!(?e, "failed to upload seed");
132                            return;
133                        }
134                        debug!(view, "seed uploaded to indexer");
135                    }
136                });
137
138                // Upload block to indexer (once we have it)
139                self.context.with_label("notarized_block").spawn({
140                    let indexer = self.indexer.clone();
141                    let mut marshal = self.marshal.clone();
142                    move |_| async move {
143                        // Wait for block
144                        let block = marshal
145                            .subscribe(Some(notarization.round()), notarization.proposal.payload)
146                            .await
147                            .await;
148                        let Ok(block) = block else {
149                            warn!(view, "subscription for block cancelled");
150                            return;
151                        };
152
153                        // Upload to indexer once we have it
154                        let notarization = Notarized::new(notarization, block);
155                        let result = indexer.notarized_upload(notarization).await;
156                        if let Err(e) = result {
157                            warn!(?e, "failed to upload notarization");
158                            return;
159                        }
160                        debug!(view, "notarization uploaded to indexer");
161                    }
162                });
163            }
164            Activity::Finalization(finalization) => {
165                // Upload seed to indexer
166                let view = finalization.view();
167                self.context.with_label("finalized_seed").spawn({
168                    let indexer = self.indexer.clone();
169                    let seed = finalization.seed();
170                    move |_| async move {
171                        let result = indexer.seed_upload(seed).await;
172                        if let Err(e) = result {
173                            warn!(?e, "failed to upload seed");
174                            return;
175                        }
176                        debug!(view, "seed uploaded to indexer");
177                    }
178                });
179
180                // Upload block to indexer (once we have it)
181                self.context.with_label("finalized_block").spawn({
182                    let indexer = self.indexer.clone();
183                    let mut marshal = self.marshal.clone();
184                    move |_| async move {
185                        let block = marshal
186                            .subscribe(Some(finalization.round()), finalization.proposal.payload)
187                            .await
188                            .await;
189                        let Ok(block) = block else {
190                            warn!(view, "subscription for block cancelled");
191                            return;
192                        };
193
194                        // Upload to indexer once we have it
195                        let finalization = Finalized::new(finalization, block);
196                        let result = indexer.finalized_upload(finalization).await;
197                        if let Err(e) = result {
198                            warn!(?e, "failed to upload finalization");
199                            return;
200                        }
201                        debug!(view, "finalization uploaded to indexer");
202                    }
203                });
204            }
205            _ => {}
206        }
207    }
208}