exoware_simplex/
resolver.rs1use std::{future::Future, marker::PhantomData, num::NonZeroUsize, time::Duration};
2
3use bytes::Bytes;
4use commonware_actor::Feedback;
5use commonware_consensus::marshal::resolver::handler::{
6 self, Annotation, Handler as MarshalHandler, Key as MarshalKey,
7};
8use commonware_cryptography::{Digest, PublicKey};
9use commonware_resolver::{opaque, Fetch, Resolver, TargetedResolver};
10use commonware_runtime::{Clock, Metrics, Spawner};
11use commonware_utils::vec::NonEmptyVec;
12
13use crate::SimplexClient;
14
15const RETRY_DELAY: Duration = Duration::from_millis(50);
16
17#[derive(Clone)]
18struct MarshalFetcher<D: Digest> {
19 client: SimplexClient,
20 _marker: PhantomData<D>,
21}
22
23impl<D: Digest> MarshalFetcher<D> {
24 const fn new(client: SimplexClient) -> Self {
25 Self {
26 client,
27 _marker: PhantomData,
28 }
29 }
30}
31
32impl<D> opaque::Fetcher for MarshalFetcher<D>
33where
34 D: Digest + Send + 'static,
35{
36 type Key = MarshalKey<D>;
37 type Value = Bytes;
38
39 fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
40 let client = self.client.clone();
41 async move {
42 match fetch_value(&client, key).await {
43 Ok(value) => value,
44 Err(error) => {
45 tracing::debug!(%error, "failed to resolve marshal value");
46 None
47 }
48 }
49 }
50 }
51}
52
53#[derive(Clone)]
59pub struct MarshalResolver<D: Digest, P: PublicKey> {
60 client: SimplexClient,
61 inner: opaque::Resolver<MarshalKey<D>, Annotation, P>,
62}
63
64impl<D, P> MarshalResolver<D, P>
65where
66 D: Digest + Send + 'static,
67 P: PublicKey,
68{
69 pub fn init<E>(
70 context: E,
71 mailbox_size: NonZeroUsize,
72 client: SimplexClient,
73 ) -> (handler::Receiver<D>, Self)
74 where
75 E: Clock + Metrics + Spawner,
76 {
77 let (receiver, handler) = handler::init(context.child("handler"), mailbox_size);
78 (receiver, Self::new(context, mailbox_size, client, handler))
79 }
80
81 pub fn new<E>(
82 context: E,
83 mailbox_size: NonZeroUsize,
84 client: SimplexClient,
85 handler: MarshalHandler<D>,
86 ) -> Self
87 where
88 E: Clock + Metrics + Spawner,
89 {
90 let fetcher = MarshalFetcher::new(client.clone());
91 let inner = opaque::init(
92 context.child("opaque"),
93 fetcher,
94 handler,
95 mailbox_size,
96 RETRY_DELAY,
97 );
98 Self { client, inner }
99 }
100
101 pub const fn client(&self) -> &SimplexClient {
102 &self.client
103 }
104
105 pub fn into_client(self) -> SimplexClient {
106 self.client
107 }
108}
109
110impl<D, P> Resolver for MarshalResolver<D, P>
111where
112 D: Digest + Send + 'static,
113 P: PublicKey,
114{
115 type Key = MarshalKey<D>;
116 type Subscriber = Annotation;
117
118 fn fetch<F>(&mut self, key: F) -> Feedback
119 where
120 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
121 {
122 self.inner.fetch(key)
123 }
124
125 fn fetch_all<F>(&mut self, keys: Vec<F>) -> Feedback
126 where
127 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
128 {
129 self.inner.fetch_all(keys)
130 }
131
132 fn retain(
133 &mut self,
134 predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
135 ) -> Feedback {
136 self.inner.retain(predicate)
137 }
138}
139
140impl<D, P> TargetedResolver for MarshalResolver<D, P>
141where
142 D: Digest + Send + 'static,
143 P: PublicKey,
144{
145 type PublicKey = P;
146
147 fn fetch_targeted(
148 &mut self,
149 key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
150 targets: NonEmptyVec<Self::PublicKey>,
151 ) -> Feedback {
152 self.inner.fetch_targeted(key, targets)
153 }
154
155 fn fetch_all_targeted<F>(&mut self, keys: Vec<(F, NonEmptyVec<Self::PublicKey>)>) -> Feedback
156 where
157 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
158 {
159 self.inner.fetch_all_targeted(keys)
160 }
161}
162
163async fn fetch_value<D: Digest>(
164 client: &SimplexClient,
165 key: MarshalKey<D>,
166) -> Result<Option<Bytes>, crate::SimplexError> {
167 match key {
168 MarshalKey::Block(commitment) => client.get_header_raw(&commitment).await,
169 MarshalKey::Finalized { height } => client.get_finalized_by_height_raw(height).await,
170 MarshalKey::Notarized { round } => client.get_notarized_raw(round.view()).await,
171 }
172}