Skip to main content

commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    ingress::{FetchRequest, Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, GaugeExt, Status},
20    },
21    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24    channel::{mpsc, oneshot},
25    futures::Pool as FuturesPool,
26    Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: BufferPooler + Clock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Provider<PublicKey = P>,
46    B: Blocker<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    peer_provider: D,
64
65    /// The blocker that will be used to block peers that send invalid responses
66    blocker: B,
67
68    /// Used to detect changes in the peer set
69    last_peer_set_id: Option<u64>,
70
71    /// Mailbox that makes and cancels fetch requests
72    mailbox: mpsc::Receiver<Message<Key, P>>,
73
74    /// Manages outgoing fetch requests
75    fetcher: Fetcher<E, P, Key, NetS>,
76
77    /// Track the start time of fetch operations
78    fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80    /// Holds futures that resolve once the `Producer` has produced the data.
81    /// Once the future is resolved, the data (or an error) is sent to the peer.
82    /// Has unbounded size; the number of concurrent requests should be limited
83    /// by the `Producer` which may drop requests.
84    serves: FuturesPool<Serve<E, P>>,
85
86    /// Whether responses are sent with priority over other network messages
87    priority_responses: bool,
88
89    /// Metrics for the peer actor
90    metrics: metrics::Metrics<E>,
91
92    /// Phantom data for networking types
93    _r: PhantomData<NetR>,
94}
95
96impl<
97        E: BufferPooler + Clock + Spawner + Rng + Metrics,
98        P: PublicKey,
99        D: Provider<PublicKey = P>,
100        B: Blocker<PublicKey = P>,
101        Key: Span,
102        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103        Pro: Producer<Key = Key>,
104        NetS: Sender<PublicKey = P>,
105        NetR: Receiver<PublicKey = P>,
106    > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108    /// Creates a new `Actor` with the given configuration.
109    ///
110    /// Returns the actor and a mailbox to send messages to it.
111    pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114        // TODO(#1833): Metrics should use the post-start context
115        let metrics = metrics::Metrics::init(context.clone());
116        let fetcher = Fetcher::new(
117            context.with_label("fetcher"),
118            FetcherConfig {
119                me: cfg.me,
120                initial: cfg.initial,
121                timeout: cfg.timeout,
122                retry_timeout: cfg.fetch_retry_timeout,
123                priority_requests: cfg.priority_requests,
124            },
125        );
126        (
127            Self {
128                context: ContextCell::new(context),
129                consumer: cfg.consumer,
130                producer: cfg.producer,
131                peer_provider: cfg.peer_provider,
132                blocker: cfg.blocker,
133                last_peer_set_id: None,
134                mailbox: receiver,
135                fetcher,
136                serves: FuturesPool::default(),
137                priority_responses: cfg.priority_responses,
138                metrics,
139                fetch_timers: HashMap::new(),
140                _r: PhantomData,
141            },
142            Mailbox::new(sender),
143        )
144    }
145
146    /// Runs the actor until the context is stopped.
147    ///
148    /// The actor will handle:
149    /// - Fetching data from other peers and notifying the `Consumer`
150    /// - Serving data to other peers by requesting it from the `Producer`
151    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152        spawn_cell!(self.context, self.run(network).await)
153    }
154
155    /// Inner run loop called by `start`.
156    async fn run(mut self, network: (NetS, NetR)) {
157        // Wrap channel
158        let (mut sender, mut receiver) = wrap(
159            (),
160            self.context.network_buffer_pool().clone(),
161            network.0,
162            network.1,
163        );
164        let peer_set_subscription = &mut self.peer_provider.subscribe().await;
165
166        select_loop! {
167            self.context,
168            on_start => {
169                // Update metrics
170                let _ = self
171                    .metrics
172                    .fetch_pending
173                    .try_set(self.fetcher.len_pending());
174                let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175                let _ = self
176                    .metrics
177                    .peers_blocked
178                    .try_set(self.fetcher.len_blocked());
179                let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181                // Get retry timeout (if any)
182                let deadline_pending = match self.fetcher.get_pending_deadline() {
183                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184                    None => Either::Right(future::pending()),
185                };
186
187                // Get requester timeout (if any)
188                let deadline_active = match self.fetcher.get_active_deadline() {
189                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190                    None => Either::Right(future::pending()),
191                };
192            },
193            on_stopped => {
194                debug!("shutdown");
195                self.serves.cancel_all();
196            },
197            // Handle peer set updates
198            Some((id, _, all)) = peer_set_subscription.recv() else {
199                debug!("peer set subscription closed");
200                return;
201            } => {
202                // Instead of directing our requests to exclusively the latest set (which may still be syncing, we
203                // reconcile with all tracked peers).
204                if self.last_peer_set_id < Some(id) {
205                    self.last_peer_set_id = Some(id);
206                    self.fetcher.reconcile(all.as_ref());
207                }
208            },
209            // Handle active deadline
210            _ = deadline_active => {
211                if let Some(key) = self.fetcher.pop_active() {
212                    debug!(?key, "requester timeout");
213                    self.metrics.fetch.inc(Status::Failure);
214                    self.fetcher.add_retry(key);
215                }
216            },
217            // Handle pending deadline
218            _ = deadline_pending => {
219                self.fetcher.fetch(&mut sender).await;
220            },
221            // Handle mailbox messages
222            Some(msg) = self.mailbox.recv() else {
223                error!("mailbox closed");
224                return;
225            } => {
226                match msg {
227                    Message::Fetch(requests) => {
228                        for FetchRequest { key, targets } in requests {
229                            trace!(?key, "mailbox: fetch");
230
231                            // Check if the fetch is already in progress
232                            let is_new = !self.fetch_timers.contains_key(&key);
233
234                            // Update targets
235                            match targets {
236                                Some(targets) => {
237                                    // Only add targets if this is a new fetch OR the existing
238                                    // fetch already has targets. Don't restrict an "all" fetch
239                                    // (no targets) to specific targets.
240                                    if is_new || self.fetcher.has_targets(&key) {
241                                        self.fetcher.add_targets(key.clone(), targets);
242                                    }
243                                }
244                                None => self.fetcher.clear_targets(&key),
245                            }
246
247                            // Only start new fetch if not already in progress
248                            if is_new {
249                                self.fetch_timers
250                                    .insert(key.clone(), self.metrics.fetch_duration.timer());
251                                self.fetcher.add_ready(key);
252                            } else {
253                                trace!(?key, "updated targets for existing fetch");
254                            }
255                        }
256                    }
257                    Message::Cancel { key } => {
258                        trace!(?key, "mailbox: cancel");
259                        let mut guard = self.metrics.cancel.guard(Status::Dropped);
260                        if self.fetcher.cancel(&key) {
261                            guard.set(Status::Success);
262                            self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
263                            self.consumer.failed(key.clone(), ()).await;
264                        }
265                    }
266                    Message::Retain { predicate } => {
267                        trace!("mailbox: retain");
268
269                        // Remove from fetcher
270                        self.fetcher.retain(&predicate);
271
272                        // Clean up timers and notify consumer
273                        let before = self.fetch_timers.len();
274                        let removed = self
275                            .fetch_timers
276                            .extract_if(|k, _| !predicate(k))
277                            .collect::<Vec<_>>();
278                        for (key, timer) in removed {
279                            timer.cancel();
280                            self.consumer.failed(key, ()).await;
281                        }
282
283                        // Metrics
284                        let removed = (before - self.fetch_timers.len()) as u64;
285                        if removed == 0 {
286                            self.metrics.cancel.inc(Status::Dropped);
287                        } else {
288                            self.metrics.cancel.inc_by(Status::Success, removed);
289                        }
290                    }
291                    Message::Clear => {
292                        trace!("mailbox: clear");
293
294                        // Clear fetcher
295                        self.fetcher.clear();
296
297                        // Drain timers and notify consumer
298                        let removed = self.fetch_timers.len() as u64;
299                        for (key, timer) in self.fetch_timers.drain() {
300                            timer.cancel();
301                            self.consumer.failed(key, ()).await;
302                        }
303
304                        // Metrics
305                        if removed == 0 {
306                            self.metrics.cancel.inc(Status::Dropped);
307                        } else {
308                            self.metrics.cancel.inc_by(Status::Success, removed);
309                        }
310                    }
311                }
312                assert_eq!(self.fetcher.len(), self.fetch_timers.len());
313            },
314            // Handle completed server requests
315            serve = self.serves.next_completed() => {
316                let Serve {
317                    timer,
318                    peer,
319                    id,
320                    result,
321                } = serve;
322
323                // Metrics and logs
324                match result {
325                    Ok(_) => {
326                        self.metrics.serve.inc(Status::Success);
327                    }
328                    Err(ref err) => {
329                        debug!(?err, ?peer, ?id, "serve failed");
330                        timer.cancel();
331                        self.metrics.serve.inc(Status::Failure);
332                    }
333                }
334
335                // Send response to peer
336                self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
337                    .await;
338            },
339            // Handle network messages
340            msg = receiver.recv() => {
341                // Break if the receiver is closed
342                let (peer, msg) = match msg {
343                    Ok(msg) => msg,
344                    Err(err) => {
345                        error!(?err, "receiver closed");
346                        return;
347                    }
348                };
349
350                // Skip if there is a decoding error
351                let msg = match msg {
352                    Ok(msg) => msg,
353                    Err(err) => {
354                        trace!(?err, ?peer, "decode failed");
355                        continue;
356                    }
357                };
358                match msg.payload {
359                    wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
360                    wire::Payload::Response(response) => {
361                        self.handle_network_response(peer, msg.id, response).await
362                    }
363                    wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
364                };
365            },
366        }
367    }
368
369    /// Handles the case where the application responds to a request from an external peer.
370    async fn handle_serve(
371        &mut self,
372        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
373        peer: P,
374        id: u64,
375        response: Result<Bytes, oneshot::error::RecvError>,
376        priority: bool,
377    ) {
378        // Encode message
379        let payload: wire::Payload<Key> = response.map_or_else(
380            |_| wire::Payload::Error,
381            |data| wire::Payload::Response(data),
382        );
383        let msg = wire::Message { id, payload };
384
385        // Send message to peer
386        let result = sender
387            .send(Recipients::One(peer.clone()), msg, priority)
388            .await;
389
390        // Log result, but do not handle errors
391        match result {
392            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
393            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
394            Ok(_) => trace!(?peer, ?id, "serve sent"),
395        };
396    }
397
398    /// Handle a network request from a peer.
399    fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
400        // Serve the request
401        trace!(?peer, ?id, "peer request");
402        let mut producer = self.producer.clone();
403        let timer = self.metrics.serve_duration.timer();
404        self.serves.push(async move {
405            let receiver = producer.produce(key).await;
406            let result = receiver.await;
407            Serve {
408                timer,
409                peer,
410                id,
411                result,
412            }
413        });
414    }
415
416    /// Handle a network response from a peer.
417    async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
418        trace!(?peer, ?id, "peer response: data");
419
420        // Get the key associated with the response, if any
421        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
422            // It's possible that the key does not exist if the request was canceled
423            return;
424        };
425
426        // The peer had the data, so we can deliver it to the consumer
427        if self.consumer.deliver(key.clone(), response).await {
428            // Record metrics
429            self.metrics.fetch.inc(Status::Success);
430            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
431
432            // Clear all targets for this key
433            self.fetcher.clear_targets(&key);
434            return;
435        }
436
437        // If the data is invalid, we need to block the peer and try again
438        // (blocking the peer also removes any targets associated with it)
439        commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
440        self.fetcher.block(peer);
441        self.metrics.fetch.inc(Status::Failure);
442        self.fetcher.add_retry(key);
443    }
444
445    /// Handle a network response from a peer that did not have the data.
446    fn handle_network_error_response(&mut self, peer: P, id: u64) {
447        trace!(?peer, ?id, "peer response: error");
448
449        // Get the key associated with the response, if any
450        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
451            // It's possible that the key does not exist if the request was canceled
452            return;
453        };
454
455        // The peer did not have the data, so we need to try again
456        self.metrics.fetch.inc(Status::Failure);
457        self.fetcher.add_retry(key);
458    }
459}