Skip to main content

commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    ingress::{FetchRequest, Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select_loop;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Blocker, Provider, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, GaugeExt, Status},
20    },
21    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{
24    channel::{mpsc, oneshot},
25    futures::Pool as FuturesPool,
26    Span,
27};
28use futures::future::{self, Either};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::error::RecvError>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: BufferPooler + Clock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Provider<PublicKey = P>,
46    B: Blocker<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    peer_provider: D,
64
65    /// The blocker that will be used to block peers that send invalid responses
66    blocker: B,
67
68    /// Used to detect changes in the peer set
69    last_peer_set_id: Option<u64>,
70
71    /// Mailbox that makes and cancels fetch requests
72    mailbox: mpsc::Receiver<Message<Key, P>>,
73
74    /// Manages outgoing fetch requests
75    fetcher: Fetcher<E, P, Key, NetS>,
76
77    /// Track the start time of fetch operations
78    fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80    /// Holds futures that resolve once the `Producer` has produced the data.
81    /// Once the future is resolved, the data (or an error) is sent to the peer.
82    /// Has unbounded size; the number of concurrent requests should be limited
83    /// by the `Producer` which may drop requests.
84    serves: FuturesPool<Serve<E, P>>,
85
86    /// Whether responses are sent with priority over other network messages
87    priority_responses: bool,
88
89    /// Metrics for the peer actor
90    metrics: metrics::Metrics<E>,
91
92    /// Phantom data for networking types
93    _r: PhantomData<NetR>,
94}
95
96impl<
97        E: BufferPooler + Clock + Spawner + Rng + Metrics,
98        P: PublicKey,
99        D: Provider<PublicKey = P>,
100        B: Blocker<PublicKey = P>,
101        Key: Span,
102        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103        Pro: Producer<Key = Key>,
104        NetS: Sender<PublicKey = P>,
105        NetR: Receiver<PublicKey = P>,
106    > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108    /// Creates a new `Actor` with the given configuration.
109    ///
110    /// Returns the actor and a mailbox to send messages to it.
111    pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114        // TODO(#1833): Metrics should use the post-start context
115        let metrics = metrics::Metrics::init(context.clone());
116        let fetcher = Fetcher::new(
117            context.with_label("fetcher"),
118            FetcherConfig {
119                me: cfg.me,
120                initial: cfg.initial,
121                timeout: cfg.timeout,
122                retry_timeout: cfg.fetch_retry_timeout,
123                priority_requests: cfg.priority_requests,
124            },
125        );
126        (
127            Self {
128                context: ContextCell::new(context),
129                consumer: cfg.consumer,
130                producer: cfg.producer,
131                peer_provider: cfg.peer_provider,
132                blocker: cfg.blocker,
133                last_peer_set_id: None,
134                mailbox: receiver,
135                fetcher,
136                serves: FuturesPool::default(),
137                priority_responses: cfg.priority_responses,
138                metrics,
139                fetch_timers: HashMap::new(),
140                _r: PhantomData,
141            },
142            Mailbox::new(sender),
143        )
144    }
145
146    /// Runs the actor until the context is stopped.
147    ///
148    /// The actor will handle:
149    /// - Fetching data from other peers and notifying the `Consumer`
150    /// - Serving data to other peers by requesting it from the `Producer`
151    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152        spawn_cell!(self.context, self.run(network).await)
153    }
154
155    /// Inner run loop called by `start`.
156    async fn run(mut self, network: (NetS, NetR)) {
157        // Wrap channel
158        let (mut sender, mut receiver) = wrap(
159            (),
160            self.context.network_buffer_pool().clone(),
161            network.0,
162            network.1,
163        );
164        let peer_set_subscription = &mut self.peer_provider.subscribe().await;
165
166        select_loop! {
167            self.context,
168            on_start => {
169                // Update metrics
170                let _ = self
171                    .metrics
172                    .fetch_pending
173                    .try_set(self.fetcher.len_pending());
174                let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
175                let _ = self
176                    .metrics
177                    .peers_blocked
178                    .try_set(self.fetcher.len_blocked());
179                let _ = self.metrics.serve_processing.try_set(self.serves.len());
180
181                // Get retry timeout (if any)
182                let deadline_pending = match self.fetcher.get_pending_deadline() {
183                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
184                    None => Either::Right(future::pending()),
185                };
186
187                // Get requester timeout (if any)
188                let deadline_active = match self.fetcher.get_active_deadline() {
189                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
190                    None => Either::Right(future::pending()),
191                };
192            },
193            on_stopped => {
194                debug!("shutdown");
195                self.serves.cancel_all();
196            },
197            // Handle peer set updates
198            Some(update) = peer_set_subscription.recv() else {
199                debug!("peer set subscription closed");
200                return;
201            } => {
202                if self.last_peer_set_id < Some(update.index) {
203                    self.last_peer_set_id = Some(update.index);
204                    self.fetcher.reconcile(update.latest.primary.as_ref());
205                }
206            },
207            // Handle active deadline
208            _ = deadline_active => {
209                if let Some(key) = self.fetcher.pop_active() {
210                    debug!(?key, "requester timeout");
211                    self.metrics.fetch.inc(Status::Failure);
212                    self.fetcher.add_retry(key);
213                }
214            },
215            // Handle pending deadline
216            _ = deadline_pending => {
217                self.fetcher.fetch(&mut sender).await;
218            },
219            // Handle mailbox messages
220            Some(msg) = self.mailbox.recv() else {
221                error!("mailbox closed");
222                return;
223            } => {
224                match msg {
225                    Message::Fetch(requests) => {
226                        for FetchRequest { key, targets } in requests {
227                            trace!(?key, "mailbox: fetch");
228
229                            // Check if the fetch is already in progress
230                            let is_new = !self.fetch_timers.contains_key(&key);
231
232                            // Update targets
233                            match targets {
234                                Some(targets) => {
235                                    // Only add targets if this is a new fetch OR the existing
236                                    // fetch already has targets. Don't restrict an "all" fetch
237                                    // (no targets) to specific targets.
238                                    if is_new || self.fetcher.has_targets(&key) {
239                                        self.fetcher.add_targets(key.clone(), targets);
240                                    }
241                                }
242                                None => self.fetcher.clear_targets(&key),
243                            }
244
245                            // Only start new fetch if not already in progress
246                            if is_new {
247                                self.fetch_timers
248                                    .insert(key.clone(), self.metrics.fetch_duration.timer());
249                                self.fetcher.add_ready(key);
250                            } else {
251                                trace!(?key, "updated targets for existing fetch");
252                            }
253                        }
254                    }
255                    Message::Cancel { key } => {
256                        trace!(?key, "mailbox: cancel");
257                        let mut guard = self.metrics.cancel.guard(Status::Dropped);
258                        if self.fetcher.cancel(&key) {
259                            guard.set(Status::Success);
260                            self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
261                            self.consumer.failed(key.clone(), ()).await;
262                        }
263                    }
264                    Message::Retain { predicate } => {
265                        trace!("mailbox: retain");
266
267                        // Remove from fetcher
268                        self.fetcher.retain(&predicate);
269
270                        // Clean up timers and notify consumer
271                        let before = self.fetch_timers.len();
272                        let removed = self
273                            .fetch_timers
274                            .extract_if(|k, _| !predicate(k))
275                            .collect::<Vec<_>>();
276                        for (key, timer) in removed {
277                            timer.cancel();
278                            self.consumer.failed(key, ()).await;
279                        }
280
281                        // Metrics
282                        let removed = (before - self.fetch_timers.len()) as u64;
283                        if removed == 0 {
284                            self.metrics.cancel.inc(Status::Dropped);
285                        } else {
286                            self.metrics.cancel.inc_by(Status::Success, removed);
287                        }
288                    }
289                    Message::Clear => {
290                        trace!("mailbox: clear");
291
292                        // Clear fetcher
293                        self.fetcher.clear();
294
295                        // Drain timers and notify consumer
296                        let removed = self.fetch_timers.len() as u64;
297                        for (key, timer) in self.fetch_timers.drain() {
298                            timer.cancel();
299                            self.consumer.failed(key, ()).await;
300                        }
301
302                        // Metrics
303                        if removed == 0 {
304                            self.metrics.cancel.inc(Status::Dropped);
305                        } else {
306                            self.metrics.cancel.inc_by(Status::Success, removed);
307                        }
308                    }
309                }
310                assert_eq!(self.fetcher.len(), self.fetch_timers.len());
311            },
312            // Handle completed server requests
313            serve = self.serves.next_completed() => {
314                let Serve {
315                    timer,
316                    peer,
317                    id,
318                    result,
319                } = serve;
320
321                // Metrics and logs
322                match result {
323                    Ok(_) => {
324                        self.metrics.serve.inc(Status::Success);
325                    }
326                    Err(ref err) => {
327                        debug!(?err, ?peer, ?id, "serve failed");
328                        timer.cancel();
329                        self.metrics.serve.inc(Status::Failure);
330                    }
331                }
332
333                // Send response to peer
334                self.handle_serve(&mut sender, peer, id, result, self.priority_responses)
335                    .await;
336            },
337            // Handle network messages
338            msg = receiver.recv() => {
339                // Break if the receiver is closed
340                let (peer, msg) = match msg {
341                    Ok(msg) => msg,
342                    Err(err) => {
343                        error!(?err, "receiver closed");
344                        return;
345                    }
346                };
347
348                // Skip if there is a decoding error
349                let msg = match msg {
350                    Ok(msg) => msg,
351                    Err(err) => {
352                        trace!(?err, ?peer, "decode failed");
353                        continue;
354                    }
355                };
356                match msg.payload {
357                    wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key),
358                    wire::Payload::Response(response) => {
359                        self.handle_network_response(peer, msg.id, response).await
360                    }
361                    wire::Payload::Error => self.handle_network_error_response(peer, msg.id),
362                };
363            },
364        }
365    }
366
367    /// Handles the case where the application responds to a request from an external peer.
368    async fn handle_serve(
369        &mut self,
370        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
371        peer: P,
372        id: u64,
373        response: Result<Bytes, oneshot::error::RecvError>,
374        priority: bool,
375    ) {
376        // Encode message
377        let payload: wire::Payload<Key> = response.map_or_else(
378            |_| wire::Payload::Error,
379            |data| wire::Payload::Response(data),
380        );
381        let msg = wire::Message { id, payload };
382
383        // Send message to peer
384        let result = sender
385            .send(Recipients::One(peer.clone()), msg, priority)
386            .await;
387
388        // Log result, but do not handle errors
389        match result {
390            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
391            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
392            Ok(_) => trace!(?peer, ?id, "serve sent"),
393        };
394    }
395
396    /// Handle a network request from a peer.
397    fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
398        // Serve the request
399        trace!(?peer, ?id, "peer request");
400        let mut producer = self.producer.clone();
401        let timer = self.metrics.serve_duration.timer();
402        self.serves.push(async move {
403            let receiver = producer.produce(key).await;
404            let result = receiver.await;
405            Serve {
406                timer,
407                peer,
408                id,
409                result,
410            }
411        });
412    }
413
414    /// Handle a network response from a peer.
415    async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
416        trace!(?peer, ?id, "peer response: data");
417
418        // Get the key associated with the response, if any
419        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
420            // It's possible that the key does not exist if the request was canceled
421            return;
422        };
423
424        // The peer had the data, so we can deliver it to the consumer
425        if self.consumer.deliver(key.clone(), response).await {
426            // Record metrics
427            self.metrics.fetch.inc(Status::Success);
428            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
429
430            // Clear all targets for this key
431            self.fetcher.clear_targets(&key);
432            return;
433        }
434
435        // If the data is invalid, we need to block the peer and try again
436        // (blocking the peer also removes any targets associated with it)
437        commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received");
438        self.fetcher.block(peer);
439        self.metrics.fetch.inc(Status::Failure);
440        self.fetcher.add_retry(key);
441    }
442
443    /// Handle a network response from a peer that did not have the data.
444    fn handle_network_error_response(&mut self, peer: P, id: u64) {
445        trace!(?peer, ?id, "peer response: error");
446
447        // Get the key associated with the response, if any
448        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
449            // It's possible that the key does not exist if the request was canceled
450            return;
451        };
452
453        // The peer did not have the data, so we need to try again
454        self.metrics.fetch.inc(Status::Failure);
455        self.fetcher.add_retry(key);
456    }
457}