1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 ingress::{FetchRequest, Mailbox, Message},
5 metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12 utils::codec::{wrap, WrappedSender},
13 Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16 spawn_cell,
17 telemetry::metrics::{
18 histogram,
19 status::{CounterExt, GaugeExt, Status},
20 },
21 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24 channel::{mpsc, oneshot},
25 futures::Pool as FuturesPool,
26 Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33struct Serve<E: Clock, P: PublicKey> {
35 timer: histogram::Timer<E>,
36 peer: P,
37 id: u64,
38 result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41pub struct Engine<
43 E: BufferPooler + Clock + Spawner + Rng + Metrics,
44 P: PublicKey,
45 D: Provider<PublicKey = P>,
46 B: Blocker<PublicKey = P>,
47 Key: Span,
48 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49 Pro: Producer<Key = Key>,
50 NetS: Sender<PublicKey = P>,
51 NetR: Receiver<PublicKey = P>,
52> {
53 context: ContextCell<E>,
55
56 consumer: Con,
58
59 producer: Pro,
61
62 peer_provider: D,
64
65 blocker: B,
67
68 last_peer_set_id: Option<u64>,
70
71 mailbox: mpsc::Receiver<Message<Key, P>>,
73
74 fetcher: Fetcher<E, P, Key, NetS>,
76
77 fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80 serves: FuturesPool<Serve<E, P>>,
85
86 priority_responses: bool,
88
89 metrics: metrics::Metrics<E>,
91
92 _r: PhantomData<NetR>,
94}
95
96impl<
97 E: BufferPooler + Clock + Spawner + Rng + Metrics,
98 P: PublicKey,
99 D: Provider<PublicKey = P>,
100 B: Blocker<PublicKey = P>,
101 Key: Span,
102 Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103 Pro: Producer<Key = Key>,
104 NetS: Sender<PublicKey = P>,
105 NetR: Receiver<PublicKey = P>,
106 > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108 pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112 let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114 let metrics = metrics::Metrics::init(context.clone());
116 let fetcher = Fetcher::new(
117 context.with_label("fetcher"),
118 FetcherConfig {
119 me: cfg.me,
120 initial: cfg.initial,
121 timeout: cfg.timeout,
122 retry_timeout: cfg.fetch_retry_timeout,
123 priority_requests: cfg.priority_requests,
124 },
125 );
126 (
127 Self {
128 context: ContextCell::new(context),
129 consumer: cfg.consumer,
130 producer: cfg.producer,
131 peer_provider: cfg.peer_provider,
132 blocker: cfg.blocker,
133 last_peer_set_id: None,
134 mailbox: receiver,
135 fetcher,
136 serves: FuturesPool::default(),
137 priority_responses: cfg.priority_responses,
138 metrics,
139 fetch_timers: HashMap::new(),
140 _r: PhantomData,
141 },
142 Mailbox::new(sender),
143 )
144 }
145
146 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152 spawn_cell!(self.context, self.run(network).await)
153 }
154
155 async fn run(mut self, network: (NetS, NetR)) {
157 let (mut sender, mut receiver) = wrap(
159 (),
160 self.context.network_buffer_pool().clone(),
161 network.0,
162 network.1,
163 );
164 let peer_set_subscription = &mut self.peer_provider.subscribe().await;
165
166 select_loop! {
167 self.context,
168 on_start => {
169 let _ = self
171 .metrics
172 .fetch_pending
173 .try_set(self.fetcher.len_pending());
174 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175 let _ = self
176 .metrics
177 .peers_blocked
178 .try_set(self.fetcher.len_blocked());
179 let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181 let deadline_pending = match self.fetcher.get_pending_deadline() {
183 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184 None => Either::Right(future::pending()),
185 };
186
187 let deadline_active = match self.fetcher.get_active_deadline() {
189 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190 None => Either::Right(future::pending()),
191 };
192 },
193 on_stopped => {
194 debug!("shutdown");
195 self.serves.cancel_all();
196 },
197 Some((id, _, all)) = peer_set_subscription.recv() else {
199 debug!("peer set subscription closed");
200 return;
201 } => {
202 if self.last_peer_set_id < Some(id) {
205 self.last_peer_set_id = Some(id);
206 self.fetcher.reconcile(all.as_ref());
207 }
208 },
209 _ = deadline_active => {
211 if let Some(key) = self.fetcher.pop_active() {
212 debug!(?key, "requester timeout");
213 self.metrics.fetch.inc(Status::Failure);
214 self.fetcher.add_retry(key);
215 }
216 },
217 _ = deadline_pending => {
219 self.fetcher.fetch(&mut sender).await;
220 },
221 Some(msg) = self.mailbox.recv() else {
223 error!("mailbox closed");
224 return;
225 } => {
226 match msg {
227 Message::Fetch(requests) => {
228 for FetchRequest { key, targets } in requests {
229 trace!(?key, "mailbox: fetch");
230
231 let is_new = !self.fetch_timers.contains_key(&key);
233
234 match targets {
236 Some(targets) => {
237 if is_new || self.fetcher.has_targets(&key) {
241 self.fetcher.add_targets(key.clone(), targets);
242 }
243 }
244 None => self.fetcher.clear_targets(&key),
245 }
246
247 if is_new {
249 self.fetch_timers
250 .insert(key.clone(), self.metrics.fetch_duration.timer());
251 self.fetcher.add_ready(key);
252 } else {
253 trace!(?key, "updated targets for existing fetch");
254 }
255 }
256 }
257 Message::Cancel { key } => {
258 trace!(?key, "mailbox: cancel");
259 let mut guard = self.metrics.cancel.guard(Status::Dropped);
260 if self.fetcher.cancel(&key) {
261 guard.set(Status::Success);
262 self.fetch_timers.remove(&key).unwrap().cancel(); self.consumer.failed(key.clone(), ()).await;
264 }
265 }
266 Message::Retain { predicate } => {
267 trace!("mailbox: retain");
268
269 self.fetcher.retain(&predicate);
271
272 let before = self.fetch_timers.len();
274 let removed = self
275 .fetch_timers
276 .extract_if(|k, _| !predicate(k))
277 .collect::<Vec<_>>();
278 for (key, timer) in removed {
279 timer.cancel();
280 self.consumer.failed(key, ()).await;
281 }
282
283 let removed = (before - self.fetch_timers.len()) as u64;
285 if removed == 0 {
286 self.metrics.cancel.inc(Status::Dropped);
287 } else {
288 self.metrics.cancel.inc_by(Status::Success, removed);
289 }
290 }
291 Message::Clear => {
292 trace!("mailbox: clear");
293
294 self.fetcher.clear();
296
297 let removed = self.fetch_timers.len() as u64;
299 for (key, timer) in self.fetch_timers.drain() {
300 timer.cancel();
301 self.consumer.failed(key, ()).await;
302 }
303
304 if removed == 0 {
306 self.metrics.cancel.inc(Status::Dropped);
307 } else {
308 self.metrics.cancel.inc_by(Status::Success, removed);
309 }
310 }
311 }
312 assert_eq!(self.fetcher.len(), self.fetch_timers.len());
313 },
314 serve = self.serves.next_completed() => {
316 let Serve {
317 timer,
318 peer,
319 id,
320 result,
321 } = serve;
322
323 match result {
325 Ok(_) => {
326 self.metrics.serve.inc(Status::Success);
327 }
328 Err(ref err) => {
329 debug!(?err, ?peer, ?id, "serve failed");
330 timer.cancel();
331 self.metrics.serve.inc(Status::Failure);
332 }
333 }
334
335 self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
337 .await;
338 },
339 msg = receiver.recv() => {
341 let (peer, msg) = match msg {
343 Ok(msg) => msg,
344 Err(err) => {
345 error!(?err, "receiver closed");
346 return;
347 }
348 };
349
350 let msg = match msg {
352 Ok(msg) => msg,
353 Err(err) => {
354 trace!(?err, ?peer, "decode failed");
355 continue;
356 }
357 };
358 match msg.payload {
359 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
360 wire::Payload::Response(response) => {
361 self.handle_network_response(peer, msg.id, response).await
362 }
363 wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
364 };
365 },
366 }
367 }
368
369 async fn handle_serve(
371 &mut self,
372 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
373 peer: P,
374 id: u64,
375 response: Result<Bytes, oneshot::error::RecvError>,
376 priority: bool,
377 ) {
378 let payload: wire::Payload<Key> = response.map_or_else(
380 |_| wire::Payload::Error,
381 |data| wire::Payload::Response(data),
382 );
383 let msg = wire::Message { id, payload };
384
385 let result = sender
387 .send(Recipients::One(peer.clone()), msg, priority)
388 .await;
389
390 match result {
392 Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
393 Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
394 Ok(_) => trace!(?peer, ?id, "serve sent"),
395 };
396 }
397
398 fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
400 trace!(?peer, ?id, "peer request");
402 let mut producer = self.producer.clone();
403 let timer = self.metrics.serve_duration.timer();
404 self.serves.push(async move {
405 let receiver = producer.produce(key).await;
406 let result = receiver.await;
407 Serve {
408 timer,
409 peer,
410 id,
411 result,
412 }
413 });
414 }
415
416 async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
418 trace!(?peer, ?id, "peer response: data");
419
420 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
422 return;
424 };
425
426 if self.consumer.deliver(key.clone(), response).await {
428 self.metrics.fetch.inc(Status::Success);
430 self.fetch_timers.remove(&key).unwrap(); self.fetcher.clear_targets(&key);
434 return;
435 }
436
437 commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
440 self.fetcher.block(peer);
441 self.metrics.fetch.inc(Status::Failure);
442 self.fetcher.add_retry(key);
443 }
444
445 fn handle_network_error_response(&mut self, peer: P, id: u64) {
447 trace!(?peer, ?id, "peer response: error");
448
449 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
451 return;
453 };
454
455 self.metrics.fetch.inc(Status::Failure);
457 self.fetcher.add_retry(key);
458 }
459}