Skip to main content

exoware_simplex/
resolver.rs

1use std::{future::Future, marker::PhantomData, num::NonZeroUsize, time::Duration};
2
3use bytes::Bytes;
4use commonware_actor::Feedback;
5use commonware_consensus::marshal::resolver::handler::{
6    self, Annotation, Handler as MarshalHandler, Key as MarshalKey,
7};
8use commonware_cryptography::{Digest, PublicKey};
9use commonware_resolver::{opaque, Fetch, Resolver, TargetedResolver};
10use commonware_runtime::{Clock, Metrics, Spawner};
11use commonware_utils::vec::NonEmptyVec;
12
13use crate::SimplexClient;
14
15const RETRY_DELAY: Duration = Duration::from_millis(50);
16
17#[derive(Clone)]
18struct MarshalFetcher<D: Digest> {
19    client: SimplexClient,
20    _marker: PhantomData<D>,
21}
22
23impl<D: Digest> MarshalFetcher<D> {
24    const fn new(client: SimplexClient) -> Self {
25        Self {
26            client,
27            _marker: PhantomData,
28        }
29    }
30}
31
32impl<D> opaque::Fetcher for MarshalFetcher<D>
33where
34    D: Digest + Send + 'static,
35{
36    type Key = MarshalKey<D>;
37    type Value = Bytes;
38
39    fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
40        let client = self.client.clone();
41        async move {
42            match fetch_value(&client, key).await {
43                Ok(value) => value,
44                Err(error) => {
45                    tracing::debug!(%error, "failed to resolve marshal value");
46                    None
47                }
48            }
49        }
50    }
51}
52
53/// Store-backed resolver for Commonware marshal requests.
54///
55/// Unlike the P2P resolver, this resolves marshal backfill requests directly
56/// from an Exoware Simplex store and delivers responses into Marshal's local
57/// resolver handler.
58#[derive(Clone)]
59pub struct MarshalResolver<D: Digest, P: PublicKey> {
60    client: SimplexClient,
61    inner: opaque::Resolver<MarshalKey<D>, Annotation, P>,
62}
63
64impl<D, P> MarshalResolver<D, P>
65where
66    D: Digest + Send + 'static,
67    P: PublicKey,
68{
69    pub fn init<E>(
70        context: E,
71        mailbox_size: NonZeroUsize,
72        client: SimplexClient,
73    ) -> (handler::Receiver<D>, Self)
74    where
75        E: Clock + Metrics + Spawner,
76    {
77        let (receiver, handler) = handler::init(context.child("handler"), mailbox_size);
78        (receiver, Self::new(context, mailbox_size, client, handler))
79    }
80
81    pub fn new<E>(
82        context: E,
83        mailbox_size: NonZeroUsize,
84        client: SimplexClient,
85        handler: MarshalHandler<D>,
86    ) -> Self
87    where
88        E: Clock + Metrics + Spawner,
89    {
90        let fetcher = MarshalFetcher::new(client.clone());
91        let inner = opaque::init(
92            context.child("opaque"),
93            fetcher,
94            handler,
95            mailbox_size,
96            RETRY_DELAY,
97        );
98        Self { client, inner }
99    }
100
101    pub const fn client(&self) -> &SimplexClient {
102        &self.client
103    }
104
105    pub fn into_client(self) -> SimplexClient {
106        self.client
107    }
108}
109
110impl<D, P> Resolver for MarshalResolver<D, P>
111where
112    D: Digest + Send + 'static,
113    P: PublicKey,
114{
115    type Key = MarshalKey<D>;
116    type Subscriber = Annotation;
117
118    fn fetch<F>(&mut self, key: F) -> Feedback
119    where
120        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
121    {
122        self.inner.fetch(key)
123    }
124
125    fn fetch_all<F>(&mut self, keys: Vec<F>) -> Feedback
126    where
127        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
128    {
129        self.inner.fetch_all(keys)
130    }
131
132    fn retain(
133        &mut self,
134        predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
135    ) -> Feedback {
136        self.inner.retain(predicate)
137    }
138}
139
140impl<D, P> TargetedResolver for MarshalResolver<D, P>
141where
142    D: Digest + Send + 'static,
143    P: PublicKey,
144{
145    type PublicKey = P;
146
147    fn fetch_targeted(
148        &mut self,
149        key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
150        targets: NonEmptyVec<Self::PublicKey>,
151    ) -> Feedback {
152        self.inner.fetch_targeted(key, targets)
153    }
154
155    fn fetch_all_targeted<F>(&mut self, keys: Vec<(F, NonEmptyVec<Self::PublicKey>)>) -> Feedback
156    where
157        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
158    {
159        self.inner.fetch_all_targeted(keys)
160    }
161}
162
163async fn fetch_value<D: Digest>(
164    client: &SimplexClient,
165    key: MarshalKey<D>,
166) -> Result<Option<Bytes>, crate::SimplexError> {
167    match key {
168        MarshalKey::Block(commitment) => client.get_header_raw(&commitment).await,
169        MarshalKey::Finalized { height } => client.get_finalized_by_height_raw(height).await,
170        MarshalKey::Notarized { round } => client.get_notarized_raw(round.view()).await,
171    }
172}