1use super::{
2 config::Config,
3 fetcher::Fetcher,
4 ingress::{Mailbox, Message},
5 metrics, wire, Coordinator, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 telemetry::metrics::{
17 histogram,
18 status::{CounterExt, Status},
19 },
20 Clock, Handle, Metrics, Spawner,
21};
22use commonware_utils::{futures::Pool as FuturesPool, Span};
23use futures::{
24 channel::{mpsc, oneshot},
25 future::{self, Either},
26 StreamExt,
27};
28use governor::clock::Clock as GClock;
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::Canceled>,
39}
40
41pub struct Engine<
43 E: Clock + GClock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Coordinator<PublicKey = P>,
46 Key: Span,
47 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
48 Pro: Producer<Key = Key>,
49 NetS: Sender<PublicKey = P>,
50 NetR: Receiver<PublicKey = P>,
51> {
52 context: E,
54
55 consumer: Con,
57
58 producer: Pro,
60
61 coordinator: D,
63
64 last_peer_set_id: Option<u64>,
66
67 mailbox: mpsc::Receiver<Message<Key>>,
69
70 fetcher: Fetcher<E, P, Key, NetS>,
72
73 fetch_timers: HashMap<Key, histogram::Timer<E>>,
75
76 serves: FuturesPool<Serve<E, P>>,
81
82 priority_responses: bool,
84
85 metrics: metrics::Metrics<E>,
87
88 _s: PhantomData<NetS>,
90 _r: PhantomData<NetR>,
91}
92
93impl<
94 E: Clock + GClock + Spawner + Rng + Metrics,
95 P: PublicKey,
96 D: Coordinator<PublicKey = P>,
97 Key: Span,
98 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
99 Pro: Producer<Key = Key>,
100 NetS: Sender<PublicKey = P>,
101 NetR: Receiver<PublicKey = P>,
102 > Engine<E, P, D, Key, Con, Pro, NetS, NetR>
103{
104 pub fn new(context: E, cfg: Config<P, D, Key, Con, Pro>) -> (Self, Mailbox<Key>) {
108 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
109 let metrics = metrics::Metrics::init(context.clone());
110 let fetcher = Fetcher::new(
111 context.with_label("fetcher"),
112 cfg.requester_config,
113 cfg.fetch_retry_timeout,
114 cfg.priority_requests,
115 );
116 (
117 Self {
118 context,
119 consumer: cfg.consumer,
120 producer: cfg.producer,
121 coordinator: cfg.coordinator,
122 last_peer_set_id: None,
123 mailbox: receiver,
124 fetcher,
125 serves: FuturesPool::default(),
126 priority_responses: cfg.priority_responses,
127 metrics,
128 fetch_timers: HashMap::new(),
129 _s: PhantomData,
130 _r: PhantomData,
131 },
132 Mailbox::new(sender),
133 )
134 }
135
136 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
142 self.context.spawn_ref()(self.run(network))
143 }
144
145 async fn run(mut self, network: (NetS, NetR)) {
147 let mut shutdown = self.context.stopped();
148
149 let (mut sender, mut receiver) = wrap((), network.0, network.1);
151
152 self.last_peer_set_id = Some(self.coordinator.peer_set_id());
154 self.fetcher.reconcile(self.coordinator.peers());
155
156 loop {
157 self.metrics
159 .fetch_pending
160 .set(self.fetcher.len_pending() as i64);
161 self.metrics
162 .fetch_active
163 .set(self.fetcher.len_active() as i64);
164 self.metrics
165 .peers_blocked
166 .set(self.fetcher.len_blocked() as i64);
167 self.metrics.serve_processing.set(self.serves.len() as i64);
168
169 let peer_set_id = self.coordinator.peer_set_id();
171 if self.last_peer_set_id != Some(peer_set_id) {
172 self.last_peer_set_id = Some(peer_set_id);
173 self.fetcher.reconcile(self.coordinator.peers());
174 }
175
176 let deadline_pending = match self.fetcher.get_pending_deadline() {
178 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
179 None => Either::Right(future::pending()),
180 };
181
182 let deadline_active = match self.fetcher.get_active_deadline() {
184 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
185 None => Either::Right(future::pending()),
186 };
187
188 select! {
190 _ = &mut shutdown => {
191 debug!("shutdown");
192 self.serves.cancel_all();
193 return;
194 },
195
196 msg = self.mailbox.next() => {
198 let Some(msg) = msg else {
199 error!("mailbox closed");
200 return;
201 };
202 match msg {
203 Message::Fetch { key } => {
204 trace!(?key, "mailbox: fetch");
205
206 if self.fetch_timers.contains_key(&key) {
208 trace!(?key, "duplicate fetch");
209 self.metrics.fetch.inc(Status::Dropped);
210 continue;
211 }
212
213 self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
215 self.fetcher.fetch(&mut sender, key, true).await;
216 }
217 Message::Cancel { key } => {
218 trace!(?key, "mailbox: cancel");
219 let mut guard = self.metrics.cancel.guard(Status::Dropped);
220 if self.fetcher.cancel(&key) {
221 guard.set(Status::Success);
222 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
224 }
225 }
226 Message::Retain { predicate } => {
227 trace!("mailbox: retain");
228 let before = self.fetcher.len();
229 self.fetcher.retain(predicate);
230 let after = self.fetcher.len();
231 if before == after {
232 self.metrics.cancel.inc(Status::Dropped);
233 } else {
234 self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
235 }
236 }
237 Message::Clear => {
238 trace!("mailbox: clear");
239 let before = self.fetcher.len();
240 self.fetcher.clear();
241 let after = self.fetcher.len();
242 if before == after {
243 self.metrics.cancel.inc(Status::Dropped);
244 } else {
245 self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
246 }
247 }
248 }
249 },
250
251 serve = self.serves.next_completed() => {
253 let Serve { timer, peer, id, result } = serve;
254
255 match result {
257 Ok(_) => {
258 self.metrics.serve.inc(Status::Success);
259 }
260 Err(err) => {
261 debug!(?err, ?peer, ?id, "serve failed");
262 timer.cancel();
263 self.metrics.serve.inc(Status::Failure);
264 }
265 }
266
267 self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
269 },
270
271 msg = receiver.recv() => {
273 let (peer, msg) = match msg {
275 Ok(msg) => msg,
276 Err(err) => {
277 error!(?err, "receiver closed");
278 return;
279 }
280 };
281
282 let msg = match msg {
284 Ok(msg) => msg,
285 Err(err) => {
286 trace!(?err, ?peer, "decode failed");
287 continue;
288 }
289 };
290 match msg.payload {
291 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
292 wire::Payload::Response(response) => self.handle_network_response(&mut sender, peer, msg.id, response).await,
293 wire::Payload::ErrorResponse => self.handle_network_error_response(&mut sender, peer, msg.id).await,
294 };
295 },
296
297 _ = deadline_pending => {
299 let key = self.fetcher.pop_pending();
300 debug!(?key, "retrying");
301 self.metrics.fetch.inc(Status::Failure);
302 self.fetcher.fetch(&mut sender, key, false).await;
303 },
304
305 _ = deadline_active => {
307 if let Some(key) = self.fetcher.pop_active() {
308 debug!(?key, "requester timeout");
309 self.metrics.fetch.inc(Status::Failure);
310 self.fetcher.fetch(&mut sender, key, false).await;
311 }
312 },
313 }
314 }
315 }
316
317 async fn handle_serve(
319 &mut self,
320 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
321 peer: P,
322 id: u64,
323 response: Result<Bytes, oneshot::Canceled>,
324 priority: bool,
325 ) {
326 let payload: wire::Payload<Key> = match response {
328 Ok(data) => wire::Payload::Response(data),
329 Err(_) => wire::Payload::ErrorResponse,
330 };
331 let msg = wire::Message { id, payload };
332
333 let result = sender
335 .send(Recipients::One(peer.clone()), msg, priority)
336 .await;
337
338 match result {
340 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
341 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
342 Ok(_) => trace!(?peer, ?id, "serve sent"),
343 };
344 }
345
346 async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
348 trace!(?peer, ?id, "peer request");
350 let mut producer = self.producer.clone();
351 let timer = self.metrics.serve_duration.timer();
352 self.serves.push(async move {
353 let receiver = producer.produce(key).await;
354 let result = receiver.await;
355 Serve {
356 timer,
357 peer,
358 id,
359 result,
360 }
361 });
362 }
363
364 async fn handle_network_response(
366 &mut self,
367 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
368 peer: P,
369 id: u64,
370 response: Bytes,
371 ) {
372 trace!(?peer, ?id, "peer response: data");
373
374 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
376 return;
378 };
379
380 if self.consumer.deliver(key.clone(), response).await {
382 self.metrics.fetch.inc(Status::Success);
384 self.fetch_timers.remove(&key).unwrap(); return;
386 }
387
388 self.fetcher.block(peer);
390 self.metrics.fetch.inc(Status::Failure);
391 self.fetcher.fetch(sender, key, false).await;
392 }
393
394 async fn handle_network_error_response(
396 &mut self,
397 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
398 peer: P,
399 id: u64,
400 ) {
401 trace!(?peer, ?id, "peer response: error");
402
403 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
405 return;
407 };
408
409 self.metrics.fetch.inc(Status::Failure);
411 self.fetcher.fetch(sender, key, false).await;
413 }
414}