1use super::{
2 config::Config,
3 fetcher::{Config as FetcherConfig, Fetcher},
4 inflight::Inflight,
5 ingress::{FetchKey, Mailbox, Message},
6 metrics, wire, Producer,
7};
8use crate::{subscribers, Consumer, Delivery};
9use bytes::Bytes;
10use commonware_actor::mailbox;
11use commonware_cryptography::PublicKey;
12use commonware_macros::select_loop;
13use commonware_p2p::{
14 utils::codec::{wrap, WrappedSender},
15 Blocker, Provider, Receiver, Recipients, Sender,
16};
17use commonware_runtime::{
18 spawn_cell,
19 telemetry::metrics::{histogram, status::Status, GaugeExt},
20 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
21};
22use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, Span};
23use futures::future::{self, Either};
24use rand::Rng;
25use std::marker::PhantomData;
26use tracing::{debug, error, trace, warn};
27
28struct Serve<P: PublicKey> {
30 timer: histogram::Timer,
31 peer: P,
32 id: u64,
33 result: Result<Bytes, oneshot::error::RecvError>,
34}
35
36pub struct Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
38where
39 E: BufferPooler + Clock + Spawner + Rng + Metrics,
40 P: PublicKey,
41 D: Provider<PublicKey = P>,
42 B: Blocker<PublicKey = P>,
43 Key: Span,
44 Con: Consumer<Key = Key, Value = Bytes>,
45 Pro: Producer<Key = Key>,
46 NetS: Sender<PublicKey = P>,
47 NetR: Receiver<PublicKey = P>,
48 Con::Subscriber: Eq,
49{
50 context: ContextCell<E>,
52
53 producer: Pro,
55
56 peer_provider: D,
58
59 blocker: B,
61
62 last_peer_set_id: Option<u64>,
64
65 mailbox: mailbox::Receiver<Message<Key, P, Con::Subscriber>>,
67
68 fetcher: Fetcher<E, P, Key, NetS>,
70
71 inflight: Inflight<Con, P>,
73
74 subscribers: subscribers::Tracker<Key, Con::Subscriber>,
76
77 serves: FuturesPool<Serve<P>>,
82
83 priority_responses: bool,
85
86 metrics: metrics::Metrics,
88
89 _r: PhantomData<NetR>,
91}
92
93impl<E, P, D, B, Key, Con, Pro, NetS, NetR> Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
94where
95 E: BufferPooler + Clock + Spawner + Rng + Metrics,
96 P: PublicKey,
97 D: Provider<PublicKey = P>,
98 B: Blocker<PublicKey = P>,
99 Key: Span,
100 Con: Consumer<Key = Key, Value = Bytes>,
101 Pro: Producer<Key = Key>,
102 NetS: Sender<PublicKey = P>,
103 NetR: Receiver<PublicKey = P>,
104 Con::Subscriber: Clone + Ord + Send + 'static,
105{
106 pub fn new(
110 context: E,
111 cfg: Config<P, D, B, Key, Con, Pro>,
112 ) -> (Self, Mailbox<Key, P, Con::Subscriber>) {
113 let (sender, receiver) = mailbox::new(context.child("mailbox"), cfg.mailbox_size);
114
115 let metrics = metrics::Metrics::init(&context);
116 let fetcher = Fetcher::new(
117 context.child("fetcher"),
118 FetcherConfig {
119 me: cfg.me,
120 initial: cfg.initial,
121 timeout: cfg.timeout,
122 retry_timeout: cfg.fetch_retry_timeout,
123 priority_requests: cfg.priority_requests,
124 },
125 );
126 (
127 Self {
128 context: ContextCell::new(context),
129 producer: cfg.producer,
130 peer_provider: cfg.peer_provider,
131 blocker: cfg.blocker,
132 last_peer_set_id: None,
133 mailbox: receiver,
134 fetcher,
135 inflight: Inflight::new(cfg.consumer),
136 subscribers: subscribers::Tracker::new(),
137 serves: FuturesPool::default(),
138 priority_responses: cfg.priority_responses,
139 metrics,
140 _r: PhantomData,
141 },
142 Mailbox::new(sender),
143 )
144 }
145
146 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152 spawn_cell!(self.context, self.run(network))
153 }
154
155 async fn run(mut self, network: (NetS, NetR)) {
157 let (mut sender, mut receiver) = wrap(
159 (),
160 self.context.network_buffer_pool().clone(),
161 network.0,
162 network.1,
163 );
164 let mut peer_set_subscription = self.peer_provider.subscribe().await;
165
166 select_loop! {
167 self.context,
168 on_start => {
169 let _ = self
171 .metrics
172 .fetch_pending
173 .try_set(self.fetcher.len_pending());
174 let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175 let _ = self
176 .metrics
177 .peers_blocked
178 .try_set(self.fetcher.len_blocked());
179 let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181 let deadline_pending = match self.fetcher.get_pending_deadline() {
183 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184 None => Either::Right(future::pending()),
185 };
186
187 let deadline_active = match self.fetcher.get_active_deadline() {
189 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190 None => Either::Right(future::pending()),
191 };
192 },
193 on_stopped => {
194 debug!("shutdown");
195 self.inflight.drain();
196 self.subscribers.clear();
197 self.serves.cancel_all();
198 },
199 Some(update) = peer_set_subscription.recv() else {
201 debug!("peer set subscription closed");
202 return;
203 } => {
204 if self.last_peer_set_id < Some(update.index) {
205 self.last_peer_set_id = Some(update.index);
206 self.fetcher.reconcile(update.latest.primary.as_ref());
207 }
208 },
209 _ = deadline_active => {
211 if let Some(key) = self.fetcher.pop_active() {
212 debug!(?key, "requester timeout");
213 self.metrics.fetch.inc(Status::Failure);
214 self.fetcher.add_retry(key);
215 }
216 },
217 _ = deadline_pending => {
219 self.fetcher.fetch(&mut sender);
220 },
221 Some(msg) = self.mailbox.recv() else {
223 error!("mailbox closed");
224 return;
225 } => {
226 match msg {
227 Message::Fetch(keys) => {
228 for FetchKey {
229 key,
230 subscribers,
231 metadata: targets,
232 } in keys
233 {
234 trace!(?key, "mailbox: fetch");
235
236 let is_new = !self.inflight.contains(&key);
238 self.subscribers.insert(key.clone(), subscribers);
239
240 match targets {
242 Some(targets) => {
243 if is_new || self.fetcher.has_targets(&key) {
247 self.fetcher.add_targets(key.clone(), targets);
248 }
249 }
250 None => self.fetcher.clear_targets(&key),
251 }
252
253 if is_new {
255 self.inflight.insert(
256 key.clone(),
257 self.metrics.fetch_duration.timer(self.context.as_ref()),
258 );
259 self.fetcher.add_ready(key);
260 } else {
261 trace!(?key, "updated targets for existing fetch");
262 }
263 }
264 }
265 Message::Retain { predicate } => {
266 trace!("mailbox: retain");
267
268 self.subscribers
269 .retain(|key, subscriber| predicate(key, subscriber));
270 let subscribers = &self.subscribers;
271 self.fetcher.retain(|key| subscribers.contains(key));
272 let count =
273 self.inflight.retain(|key| subscribers.contains(key)) as u64;
274 self.record_cancellations(count);
275 }
276 }
277 },
278 delivery = self.inflight.next_delivery() => {
280 let (peer, delivery, result) = match delivery {
283 Ok(delivery) => delivery,
284 Err(_) => continue,
285 };
286 self.handle_delivery(peer, delivery, result);
287 },
288 serve = self.serves.next_completed() => {
290 let Serve {
291 timer,
292 peer,
293 id,
294 result,
295 } = serve;
296
297 match result {
299 Ok(_) => {
300 timer.observe(self.context.as_ref());
301 self.metrics.serve.inc(Status::Success);
302 }
303 Err(ref err) => {
304 debug!(?err, ?peer, ?id, "serve failed");
305 self.metrics.serve.inc(Status::Failure);
306 }
307 }
308
309 self.handle_serve(&mut sender, peer, id, result, self.priority_responses);
311 },
312 msg = receiver.recv() => {
314 let (peer, msg) = match msg {
316 Ok(msg) => msg,
317 Err(err) => {
318 error!(?err, "receiver closed");
319 return;
320 }
321 };
322
323 let msg = match msg {
325 Ok(msg) => msg,
326 Err(err) => {
327 trace!(?err, ?peer, "decode failed");
328 continue;
329 }
330 };
331 match msg.payload {
332 wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
333 wire::Payload::Response(response) => {
334 self.handle_network_response(peer, msg.id, response)
335 }
336 wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
337 };
338 },
339 }
340 }
341
342 fn record_cancellations(&mut self, count: u64) {
344 if count == 0 {
345 self.metrics.cancel.inc(Status::Dropped);
346 } else {
347 self.metrics.cancel.inc_by(Status::Success, count);
348 }
349 }
350
351 fn handle_serve(
353 &mut self,
354 sender: &mut WrappedSender<NetS, wire::Message<Key>>,
355 peer: P,
356 id: u64,
357 response: Result<Bytes, oneshot::error::RecvError>,
358 priority: bool,
359 ) {
360 let payload: wire::Payload<Key> = response.map_or_else(
362 |_| wire::Payload::Error,
363 |data| wire::Payload::Response(data),
364 );
365 let msg = wire::Message { id, payload };
366
367 let result = sender.send(Recipients::One(peer.clone()), msg, priority);
369
370 if result.is_empty() {
372 warn!(?peer, ?id, "serve send failed");
373 } else {
374 trace!(?peer, ?id, "serve sent");
375 };
376 }
377
378 fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
380 trace!(?peer, ?id, "peer request");
382 let mut producer = self.producer.clone();
383 let timer = self.metrics.serve_duration.timer(self.context.as_ref());
384 let receiver = producer.produce(key);
385 self.serves.push(async move {
386 let result = receiver.await;
387 Serve {
388 timer,
389 peer,
390 id,
391 result,
392 }
393 });
394 }
395
396 fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
398 trace!(?peer, ?id, "peer response: data");
399
400 let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
402 return;
404 };
405
406 let Some(subscribers) = self.subscribers.pending(&key) else {
407 warn!(?key, "response for fetch with no subscribers");
408 self.inflight.cancel(&key);
409 return;
410 };
411 let delivery = Delivery {
412 key: key.clone(),
413 subscribers,
414 };
415
416 self.inflight.deliver(delivery, peer, response);
418 }
419
420 fn handle_delivery(&mut self, peer: P, delivery: Delivery<Key, Con::Subscriber>, valid: bool) {
422 let Delivery {
423 key,
424 subscribers: delivered,
425 } = delivery;
426
427 if valid {
428 let already_accepted = self.inflight.response_accepted(&key);
429
430 let remaining = self.subscribers.remove_delivered(&key, delivered);
434
435 if let Some(subscribers) = remaining {
436 if !already_accepted {
437 self.metrics.fetch.inc(Status::Success);
438 self.inflight.accept_response(&key, self.context.as_ref());
439 }
440 self.inflight.redeliver(Delivery { key, subscribers });
441 } else {
442 if !already_accepted {
445 self.metrics.fetch.inc(Status::Success);
446 }
447 self.inflight.complete(self.context.as_ref(), &key);
448 self.fetcher.clear_targets(&key);
449 }
450 return;
451 }
452
453 if self.inflight.response_accepted(&key) {
454 warn!(
455 ?key,
456 "previously accepted response was rejected during local redelivery"
457 );
458 self.metrics.fetch.inc(Status::Failure);
459 self.inflight.complete(self.context.as_ref(), &key);
460 self.subscribers.remove(&key);
461 self.fetcher.clear_targets(&key);
462 return;
463 }
464
465 commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
468 self.fetcher.block(peer);
469 self.metrics.fetch.inc(Status::Failure);
470 self.inflight.discard_response(&key);
471 self.fetcher.add_retry(key);
472 }
473
474 fn handle_network_error_response(&mut self, peer: P, id: u64) {
476 trace!(?peer, ?id, "peer response: error");
477
478 let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
480 return;
482 };
483
484 self.metrics.fetch.inc(Status::Failure);
486 self.fetcher.add_retry(key);
487 }
488}