Skip to main content

commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    inflight::Inflight,
5    ingress::{FetchKey, Mailbox, Message},
6    metrics, wire, Producer,
7};
8use crate::{subscribers, Consumer, Delivery};
9use bytes::Bytes;
10use commonware_actor::mailbox;
11use commonware_cryptography::PublicKey;
12use commonware_macros::select_loop;
13use commonware_p2p::{
14    utils::codec::{wrap, WrappedSender},
15    Blocker, Provider, Receiver, Recipients, Sender,
16};
17use commonware_runtime::{
18    spawn_cell,
19    telemetry::metrics::{histogram, status::Status, GaugeExt},
20    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
21};
22use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, Span};
23use futures::future::{self, Either};
24use rand::Rng;
25use std::marker::PhantomData;
26use tracing::{debug, error, trace, warn};
27
28/// Represents a pending serve operation.
29struct Serve<P: PublicKey> {
30    timer: histogram::Timer,
31    peer: P,
32    id: u64,
33    result: Result<Bytes, oneshot::error::RecvError>,
34}
35
36/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
37pub struct Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
38where
39    E: BufferPooler + Clock + Spawner + Rng + Metrics,
40    P: PublicKey,
41    D: Provider<PublicKey = P>,
42    B: Blocker<PublicKey = P>,
43    Key: Span,
44    Con: Consumer<Key = Key, Value = Bytes>,
45    Pro: Producer<Key = Key>,
46    NetS: Sender<PublicKey = P>,
47    NetR: Receiver<PublicKey = P>,
48    Con::Subscriber: Eq,
49{
50    /// Context used to spawn tasks, manage time, etc.
51    context: ContextCell<E>,
52
53    /// Produces data for incoming requests
54    producer: Pro,
55
56    /// Manages the list of peers that can be used to fetch data
57    peer_provider: D,
58
59    /// The blocker that will be used to block peers that send invalid responses
60    blocker: B,
61
62    /// Used to detect changes in the peer set
63    last_peer_set_id: Option<u64>,
64
65    /// Mailbox that makes and prunes fetches
66    mailbox: mailbox::Receiver<Message<Key, P, Con::Subscriber>>,
67
68    /// Manages outgoing fetch requests
69    fetcher: Fetcher<E, P, Key, NetS>,
70
71    /// Tracks all in-flight fetch state
72    inflight: Inflight<Con, P>,
73
74    /// Subscribers that keep each fetch alive.
75    subscribers: subscribers::Tracker<Key, Con::Subscriber>,
76
77    /// Holds futures that resolve once the `Producer` has produced the data.
78    /// Once the future is resolved, the data (or an error) is sent to the peer.
79    /// Has unbounded size; the number of concurrent requests should be limited
80    /// by the `Producer` which may drop requests.
81    serves: FuturesPool<Serve<P>>,
82
83    /// Whether responses are sent with priority over other network messages
84    priority_responses: bool,
85
86    /// Metrics for the peer actor
87    metrics: metrics::Metrics,
88
89    /// Phantom data for networking types
90    _r: PhantomData<NetR>,
91}
92
93impl<E, P, D, B, Key, Con, Pro, NetS, NetR> Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
94where
95    E: BufferPooler + Clock + Spawner + Rng + Metrics,
96    P: PublicKey,
97    D: Provider<PublicKey = P>,
98    B: Blocker<PublicKey = P>,
99    Key: Span,
100    Con: Consumer<Key = Key, Value = Bytes>,
101    Pro: Producer<Key = Key>,
102    NetS: Sender<PublicKey = P>,
103    NetR: Receiver<PublicKey = P>,
104    Con::Subscriber: Clone + Ord + Send + 'static,
105{
106    /// Creates a new `Actor` with the given configuration.
107    ///
108    /// Returns the actor and a mailbox to send messages to it.
109    pub fn new(
110        context: E,
111        cfg: Config<P, D, B, Key, Con, Pro>,
112    ) -> (Self, Mailbox<Key, P, Con::Subscriber>) {
113        let (sender, receiver) = mailbox::new(context.child("mailbox"), cfg.mailbox_size);
114
115        let metrics = metrics::Metrics::init(&context);
116        let fetcher = Fetcher::new(
117            context.child("fetcher"),
118            FetcherConfig {
119                me: cfg.me,
120                initial: cfg.initial,
121                timeout: cfg.timeout,
122                retry_timeout: cfg.fetch_retry_timeout,
123                priority_requests: cfg.priority_requests,
124            },
125        );
126        (
127            Self {
128                context: ContextCell::new(context),
129                producer: cfg.producer,
130                peer_provider: cfg.peer_provider,
131                blocker: cfg.blocker,
132                last_peer_set_id: None,
133                mailbox: receiver,
134                fetcher,
135                inflight: Inflight::new(cfg.consumer),
136                subscribers: subscribers::Tracker::new(),
137                serves: FuturesPool::default(),
138                priority_responses: cfg.priority_responses,
139                metrics,
140                _r: PhantomData,
141            },
142            Mailbox::new(sender),
143        )
144    }
145
146    /// Runs the actor until the context is stopped.
147    ///
148    /// The actor will handle:
149    /// - Fetching data from other peers and notifying the `Consumer`
150    /// - Serving data to other peers by requesting it from the `Producer`
151    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152        spawn_cell!(self.context, self.run(network))
153    }
154
155    /// Inner run loop called by `start`.
156    async fn run(mut self, network: (NetS, NetR)) {
157        // Wrap channel
158        let (mut sender, mut receiver) = wrap(
159            (),
160            self.context.network_buffer_pool().clone(),
161            network.0,
162            network.1,
163        );
164        let mut peer_set_subscription = self.peer_provider.subscribe().await;
165
166        select_loop! {
167            self.context,
168            on_start => {
169                // Update metrics
170                let _ = self
171                    .metrics
172                    .fetch_pending
173                    .try_set(self.fetcher.len_pending());
174                let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175                let _ = self
176                    .metrics
177                    .peers_blocked
178                    .try_set(self.fetcher.len_blocked());
179                let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181                // Get retry timeout (if any)
182                let deadline_pending = match self.fetcher.get_pending_deadline() {
183                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184                    None => Either::Right(future::pending()),
185                };
186
187                // Get requester timeout (if any)
188                let deadline_active = match self.fetcher.get_active_deadline() {
189                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190                    None => Either::Right(future::pending()),
191                };
192            },
193            on_stopped => {
194                debug!("shutdown");
195                self.inflight.drain();
196                self.subscribers.clear();
197                self.serves.cancel_all();
198            },
199            // Handle peer set updates
200            Some(update) = peer_set_subscription.recv() else {
201                debug!("peer set subscription closed");
202                return;
203            } => {
204                if self.last_peer_set_id < Some(update.index) {
205                    self.last_peer_set_id = Some(update.index);
206                    self.fetcher.reconcile(update.latest.primary.as_ref());
207                }
208            },
209            // Handle active deadline
210            _ = deadline_active => {
211                if let Some(key) = self.fetcher.pop_active() {
212                    debug!(?key, "requester timeout");
213                    self.metrics.fetch.inc(Status::Failure);
214                    self.fetcher.add_retry(key);
215                }
216            },
217            // Handle pending deadline
218            _ = deadline_pending => {
219                self.fetcher.fetch(&mut sender);
220            },
221            // Handle mailbox messages
222            Some(msg) = self.mailbox.recv() else {
223                error!("mailbox closed");
224                return;
225            } => {
226                match msg {
227                    Message::Fetch(keys) => {
228                        for FetchKey {
229                            key,
230                            subscribers,
231                            metadata: targets,
232                        } in keys
233                        {
234                            trace!(?key, "mailbox: fetch");
235
236                            // Check if the fetch is already in progress
237                            let is_new = !self.inflight.contains(&key);
238                            self.subscribers.insert(key.clone(), subscribers);
239
240                            // Update targets
241                            match targets {
242                                Some(targets) => {
243                                    // Only add targets if this is a new fetch OR the existing
244                                    // fetch already has targets. Don't restrict an "all" fetch
245                                    // (no targets) to specific targets.
246                                    if is_new || self.fetcher.has_targets(&key) {
247                                        self.fetcher.add_targets(key.clone(), targets);
248                                    }
249                                }
250                                None => self.fetcher.clear_targets(&key),
251                            }
252
253                            // Only start new fetch if not already in progress
254                            if is_new {
255                                self.inflight.insert(
256                                    key.clone(),
257                                    self.metrics.fetch_duration.timer(self.context.as_ref()),
258                                );
259                                self.fetcher.add_ready(key);
260                            } else {
261                                trace!(?key, "updated targets for existing fetch");
262                            }
263                        }
264                    }
265                    Message::Retain { predicate } => {
266                        trace!("mailbox: retain");
267
268                        self.subscribers
269                            .retain(|key, subscriber| predicate(key, subscriber));
270                        let subscribers = &self.subscribers;
271                        self.fetcher.retain(|key| subscribers.contains(key));
272                        let count =
273                            self.inflight.retain(|key| subscribers.contains(key)) as u64;
274                        self.record_cancellations(count);
275                    }
276                }
277            },
278            // Handle completed consumer deliveries
279            delivery = self.inflight.next_delivery() => {
280                // If the delivery was aborted, its inflight entry was dropped (via
281                // Retain or shutdown) before the consumer finished validating.
282                let (peer, delivery, result) = match delivery {
283                    Ok(delivery) => delivery,
284                    Err(_) => continue,
285                };
286                self.handle_delivery(peer, delivery, result);
287            },
288            // Handle completed server requests
289            serve = self.serves.next_completed() => {
290                let Serve {
291                    timer,
292                    peer,
293                    id,
294                    result,
295                } = serve;
296
297                // Metrics and logs
298                match result {
299                    Ok(_) => {
300                        timer.observe(self.context.as_ref());
301                        self.metrics.serve.inc(Status::Success);
302                    }
303                    Err(ref err) => {
304                        debug!(?err, ?peer, ?id, "serve failed");
305                        self.metrics.serve.inc(Status::Failure);
306                    }
307                }
308
309                // Send response to peer
310                self.handle_serve(&mut sender, peer, id, result, self.priority_responses);
311            },
312            // Handle network messages
313            msg = receiver.recv() => {
314                // Break if the receiver is closed
315                let (peer, msg) = match msg {
316                    Ok(msg) => msg,
317                    Err(err) => {
318                        error!(?err, "receiver closed");
319                        return;
320                    }
321                };
322
323                // Skip if there is a decoding error
324                let msg = match msg {
325                    Ok(msg) => msg,
326                    Err(err) => {
327                        trace!(?err, ?peer, "decode failed");
328                        continue;
329                    }
330                };
331                match msg.payload {
332                    wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
333                    wire::Payload::Response(response) => {
334                        self.handle_network_response(peer, msg.id, response)
335                    }
336                    wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
337                };
338            },
339        }
340    }
341
342    /// Record cancellation metrics for a retain-style operation.
343    fn record_cancellations(&mut self, count: u64) {
344        if count == 0 {
345            self.metrics.cancel.inc(Status::Dropped);
346        } else {
347            self.metrics.cancel.inc_by(Status::Success, count);
348        }
349    }
350
351    /// Handles the case where the application responds to a request from an external peer.
352    fn handle_serve(
353        &mut self,
354        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
355        peer: P,
356        id: u64,
357        response: Result<Bytes, oneshot::error::RecvError>,
358        priority: bool,
359    ) {
360        // Encode message
361        let payload: wire::Payload<Key> = response.map_or_else(
362            |_| wire::Payload::Error,
363            |data| wire::Payload::Response(data),
364        );
365        let msg = wire::Message { id, payload };
366
367        // Send message to peer
368        let result = sender.send(Recipients::One(peer.clone()), msg, priority);
369
370        // Log result, but do not handle errors.
371        if result.is_empty() {
372            warn!(?peer, ?id, "serve send failed");
373        } else {
374            trace!(?peer, ?id, "serve sent");
375        };
376    }
377
378    /// Handle a network request from a peer.
379    fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
380        // Serve the request
381        trace!(?peer, ?id, "peer request");
382        let mut producer = self.producer.clone();
383        let timer = self.metrics.serve_duration.timer(self.context.as_ref());
384        let receiver = producer.produce(key);
385        self.serves.push(async move {
386            let result = receiver.await;
387            Serve {
388                timer,
389                peer,
390                id,
391                result,
392            }
393        });
394    }
395
396    /// Handle a network response from a peer.
397    fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
398        trace!(?peer, ?id, "peer response: data");
399
400        // Get the key associated with the response, if any
401        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
402            // It's possible that the key does not exist if the request was pruned.
403            return;
404        };
405
406        let Some(subscribers) = self.subscribers.pending(&key) else {
407            warn!(?key, "response for fetch with no subscribers");
408            self.inflight.cancel(&key);
409            return;
410        };
411        let delivery = Delivery {
412            key: key.clone(),
413            subscribers,
414        };
415
416        // The peer had the data, so deliver it to the consumer without blocking the engine.
417        self.inflight.deliver(delivery, peer, response);
418    }
419
420    /// Handle completed delivery to the consumer.
421    fn handle_delivery(&mut self, peer: P, delivery: Delivery<Key, Con::Subscriber>, valid: bool) {
422        let Delivery {
423            key,
424            subscribers: delivered,
425        } = delivery;
426
427        if valid {
428            let already_accepted = self.inflight.response_accepted(&key);
429
430            // Remove only the subscribers that accepted this response. If other
431            // subscribers still need the key, deliver the same accepted response
432            // locally with the remaining annotations.
433            let remaining = self.subscribers.remove_delivered(&key, delivered);
434
435            if let Some(subscribers) = remaining {
436                if !already_accepted {
437                    self.metrics.fetch.inc(Status::Success);
438                    self.inflight.accept_response(&key, self.context.as_ref());
439                }
440                self.inflight.redeliver(Delivery { key, subscribers });
441            } else {
442                // All subscribers observed a valid response; clear any targeting
443                // state retained for this key.
444                if !already_accepted {
445                    self.metrics.fetch.inc(Status::Success);
446                }
447                self.inflight.complete(self.context.as_ref(), &key);
448                self.fetcher.clear_targets(&key);
449            }
450            return;
451        }
452
453        if self.inflight.response_accepted(&key) {
454            warn!(
455                ?key,
456                "previously accepted response was rejected during local redelivery"
457            );
458            self.metrics.fetch.inc(Status::Failure);
459            self.inflight.complete(self.context.as_ref(), &key);
460            self.subscribers.remove(&key);
461            self.fetcher.clear_targets(&key);
462            return;
463        }
464
465        // If the data is invalid, block the peer and try again. Blocking the
466        // peer also removes any targets associated with it.
467        commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
468        self.fetcher.block(peer);
469        self.metrics.fetch.inc(Status::Failure);
470        self.inflight.discard_response(&key);
471        self.fetcher.add_retry(key);
472    }
473
474    /// Handle a network response from a peer that did not have the data.
475    fn handle_network_error_response(&mut self, peer: P, id: u64) {
476        trace!(?peer, ?id, "peer response: error");
477
478        // Get the key associated with the response, if any
479        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
480            // It's possible that the key does not exist if the request was pruned.
481            return;
482        };
483
484        // The peer did not have the data, so we need to try again
485        self.metrics.fetch.inc(Status::Failure);
486        self.fetcher.add_retry(key);
487    }
488}