commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::{Config as FetcherConfig, Fetcher},
4    ingress::{FetchRequest, Mailbox, Message},
5    metrics, wire, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Blocker, Manager, Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    spawn_cell,
17    telemetry::metrics::{
18        histogram,
19        status::{CounterExt, GaugeExt, Status},
20    },
21    Clock, ContextCell, Handle, Metrics, Spawner,
22};
23use commonware_utils::{futures::Pool as FuturesPool, Span};
24use futures::{
25    channel::{mpsc, oneshot},
26    future::{self, Either},
27    StreamExt,
28};
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::Canceled>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: Clock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Manager<PublicKey = P>,
46    B: Blocker<PublicKey = P>,
47    Key: Span,
48    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
49    Pro: Producer<Key = Key>,
50    NetS: Sender<PublicKey = P>,
51    NetR: Receiver<PublicKey = P>,
52> {
53    /// Context used to spawn tasks, manage time, etc.
54    context: ContextCell<E>,
55
56    /// Consumes data that is fetched from the network
57    consumer: Con,
58
59    /// Produces data for incoming requests
60    producer: Pro,
61
62    /// Manages the list of peers that can be used to fetch data
63    manager: D,
64
65    /// The blocker that will be used to block peers that send invalid responses
66    blocker: B,
67
68    /// Used to detect changes in the peer set
69    last_peer_set_id: Option<u64>,
70
71    /// Mailbox that makes and cancels fetch requests
72    mailbox: mpsc::Receiver<Message<Key, P>>,
73
74    /// Manages outgoing fetch requests
75    fetcher: Fetcher<E, P, Key, NetS>,
76
77    /// Track the start time of fetch operations
78    fetch_timers: HashMap<Key, histogram::Timer<E>>,
79
80    /// Holds futures that resolve once the `Producer` has produced the data.
81    /// Once the future is resolved, the data (or an error) is sent to the peer.
82    /// Has unbounded size; the number of concurrent requests should be limited
83    /// by the `Producer` which may drop requests.
84    serves: FuturesPool<Serve<E, P>>,
85
86    /// Whether responses are sent with priority over other network messages
87    priority_responses: bool,
88
89    /// Metrics for the peer actor
90    metrics: metrics::Metrics<E>,
91
92    /// Phantom data for networking types
93    _r: PhantomData<NetR>,
94}
95
96impl<
97        E: Clock + Spawner + Rng + Metrics,
98        P: PublicKey,
99        D: Manager<PublicKey = P>,
100        B: Blocker<PublicKey = P>,
101        Key: Span,
102        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
103        Pro: Producer<Key = Key>,
104        NetS: Sender<PublicKey = P>,
105        NetR: Receiver<PublicKey = P>,
106    > Engine<E, P, D, B, Key, Con, Pro, NetS, NetR>
107{
108    /// Creates a new `Actor` with the given configuration.
109    ///
110    /// Returns the actor and a mailbox to send messages to it.
111    pub fn new(context: E, cfg: Config<P, D, B, Key, Con, Pro>) -> (Self, Mailbox<Key, P>) {
112        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
113
114        // TODO(#1833): Metrics should use the post-start context
115        let metrics = metrics::Metrics::init(context.clone());
116        let fetcher = Fetcher::new(
117            context.with_label("fetcher"),
118            FetcherConfig {
119                me: cfg.me,
120                initial: cfg.initial,
121                timeout: cfg.timeout,
122                retry_timeout: cfg.fetch_retry_timeout,
123                priority_requests: cfg.priority_requests,
124            },
125        );
126        (
127            Self {
128                context: ContextCell::new(context),
129                consumer: cfg.consumer,
130                producer: cfg.producer,
131                manager: cfg.manager,
132                blocker: cfg.blocker,
133                last_peer_set_id: None,
134                mailbox: receiver,
135                fetcher,
136                serves: FuturesPool::default(),
137                priority_responses: cfg.priority_responses,
138                metrics,
139                fetch_timers: HashMap::new(),
140                _r: PhantomData,
141            },
142            Mailbox::new(sender),
143        )
144    }
145
146    /// Runs the actor until the context is stopped.
147    ///
148    /// The actor will handle:
149    /// - Fetching data from other peers and notifying the `Consumer`
150    /// - Serving data to other peers by requesting it from the `Producer`
151    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
152        spawn_cell!(self.context, self.run(network).await)
153    }
154
155    /// Inner run loop called by `start`.
156    async fn run(mut self, network: (NetS, NetR)) {
157        let mut shutdown = self.context.stopped();
158        let peer_set_subscription = &mut self.manager.subscribe().await;
159
160        // Wrap channel
161        let (mut sender, mut receiver) = wrap((), network.0, network.1);
162
163        loop {
164            // Update metrics
165            let _ = self
166                .metrics
167                .fetch_pending
168                .try_set(self.fetcher.len_pending());
169            let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active());
170            let _ = self
171                .metrics
172                .peers_blocked
173                .try_set(self.fetcher.len_blocked());
174            let _ = self.metrics.serve_processing.try_set(self.serves.len());
175
176            // Get retry timeout (if any)
177            let deadline_pending = match self.fetcher.get_pending_deadline() {
178                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
179                None => Either::Right(future::pending()),
180            };
181
182            // Get requester timeout (if any)
183            let deadline_active = match self.fetcher.get_active_deadline() {
184                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
185                None => Either::Right(future::pending()),
186            };
187
188            // Handle shutdown signal
189            select! {
190                _ = &mut shutdown => {
191                    debug!("shutdown");
192                    self.serves.cancel_all();
193                    return;
194                },
195
196                // Handle peer set updates
197                peer_set_update = peer_set_subscription.next() => {
198                    let Some((id, _, all)) = peer_set_update else {
199                        debug!("peer set subscription closed");
200                        return;
201                    };
202
203                    // Instead of directing our requests to exclusively the latest set (which may still be syncing, we
204                    // reconcile with all tracked peers).
205                    if self.last_peer_set_id < Some(id) {
206                        self.last_peer_set_id = Some(id);
207                        self.fetcher.reconcile(all.as_ref());
208                    }
209                },
210
211                // Handle active deadline
212                _ = deadline_active => {
213                    if let Some(key) = self.fetcher.pop_active() {
214                        debug!(?key, "requester timeout");
215                        self.metrics.fetch.inc(Status::Failure);
216                        self.fetcher.add_retry(key);
217                    }
218                },
219
220                // Handle pending deadline
221                _ = deadline_pending => {
222                    self.fetcher.fetch(&mut sender).await;
223                },
224
225                // Handle mailbox messages
226                msg = self.mailbox.next() => {
227                    let Some(msg) = msg else {
228                        error!("mailbox closed");
229                        return;
230                    };
231                    match msg {
232                        Message::Fetch(requests) => {
233                            for FetchRequest { key, targets } in requests {
234                                trace!(?key, "mailbox: fetch");
235
236                                // Check if the fetch is already in progress
237                                let is_new = !self.fetch_timers.contains_key(&key);
238
239                                // Update targets
240                                match targets {
241                                    Some(targets) => {
242                                        // Only add targets if this is a new fetch OR the existing
243                                        // fetch already has targets. Don't restrict an "all" fetch
244                                        // (no targets) to specific targets.
245                                        if is_new || self.fetcher.has_targets(&key) {
246                                            self.fetcher.add_targets(key.clone(), targets);
247                                        }
248                                    }
249                                    None => self.fetcher.clear_targets(&key),
250                                }
251
252                                // Only start new fetch if not already in progress
253                                if is_new {
254                                    self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
255                                    self.fetcher.add_ready(key);
256                                } else {
257                                    trace!(?key, "updated targets for existing fetch");
258                                }
259                            }
260                        }
261                        Message::Cancel { key } => {
262                            trace!(?key, "mailbox: cancel");
263                            let mut guard = self.metrics.cancel.guard(Status::Dropped);
264                            if self.fetcher.cancel(&key) {
265                                guard.set(Status::Success);
266                                self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
267                                self.consumer.failed(key.clone(), ()).await;
268                            }
269                        }
270                        Message::Retain { predicate } => {
271                            trace!("mailbox: retain");
272
273                            // Remove from fetcher
274                            self.fetcher.retain(&predicate);
275
276                            // Clean up timers and notify consumer
277                            let before = self.fetch_timers.len();
278                            let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::<Vec<_>>();
279                            for (key, timer) in removed {
280                                timer.cancel();
281                                self.consumer.failed(key, ()).await;
282                            }
283
284                            // Metrics
285                            let removed = (before - self.fetch_timers.len()) as u64;
286                            if removed == 0 {
287                                self.metrics.cancel.inc(Status::Dropped);
288                            } else {
289                                self.metrics.cancel.inc_by(Status::Success, removed);
290                            }
291                        }
292                        Message::Clear => {
293                            trace!("mailbox: clear");
294
295                            // Clear fetcher
296                            self.fetcher.clear();
297
298                            // Drain timers and notify consumer
299                            let removed = self.fetch_timers.len() as u64;
300                            for (key, timer) in self.fetch_timers.drain() {
301                                timer.cancel();
302                                self.consumer.failed(key, ()).await;
303                            }
304
305                            // Metrics
306                            if removed == 0 {
307                                self.metrics.cancel.inc(Status::Dropped);
308                            } else {
309                                self.metrics.cancel.inc_by(Status::Success, removed);
310                            }
311                        }
312                    }
313                    assert_eq!(self.fetcher.len(), self.fetch_timers.len());
314                },
315
316                // Handle completed server requests
317                serve = self.serves.next_completed() => {
318                    let Serve { timer, peer, id, result } = serve;
319
320                    // Metrics and logs
321                    match result {
322                        Ok(_) => {
323                            self.metrics.serve.inc(Status::Success);
324                        }
325                        Err(err) => {
326                            debug!(?err, ?peer, ?id, "serve failed");
327                            timer.cancel();
328                            self.metrics.serve.inc(Status::Failure);
329                        }
330                    }
331
332                    // Send response to peer
333                    self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
334                },
335
336                // Handle network messages
337                msg = receiver.recv() => {
338                    // Break if the receiver is closed
339                    let (peer, msg) = match msg {
340                        Ok(msg) => msg,
341                        Err(err) => {
342                            error!(?err, "receiver closed");
343                            return;
344                        }
345                    };
346
347                    // Skip if there is a decoding error
348                    let msg = match msg {
349                        Ok(msg) => msg,
350                        Err(err) => {
351                            trace!(?err, ?peer, "decode failed");
352                            continue;
353                        }
354                    };
355                    match msg.payload {
356                        wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
357                        wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await,
358                        wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await,
359                    };
360                },
361            }
362        }
363    }
364
365    /// Handles the case where the application responds to a request from an external peer.
366    async fn handle_serve(
367        &mut self,
368        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
369        peer: P,
370        id: u64,
371        response: Result<Bytes, oneshot::Canceled>,
372        priority: bool,
373    ) {
374        // Encode message
375        let payload: wire::Payload<Key> = response.map_or_else(
376            |_| wire::Payload::Error,
377            |data| wire::Payload::Response(data),
378        );
379        let msg = wire::Message { id, payload };
380
381        // Send message to peer
382        let result = sender
383            .send(Recipients::One(peer.clone()), msg, priority)
384            .await;
385
386        // Log result, but do not handle errors
387        match result {
388            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
389            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
390            Ok(_) => trace!(?peer, ?id, "serve sent"),
391        };
392    }
393
394    /// Handle a network request from a peer.
395    async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
396        // Serve the request
397        trace!(?peer, ?id, "peer request");
398        let mut producer = self.producer.clone();
399        let timer = self.metrics.serve_duration.timer();
400        self.serves.push(async move {
401            let receiver = producer.produce(key).await;
402            let result = receiver.await;
403            Serve {
404                timer,
405                peer,
406                id,
407                result,
408            }
409        });
410    }
411
412    /// Handle a network response from a peer.
413    async fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) {
414        trace!(?peer, ?id, "peer response: data");
415
416        // Get the key associated with the response, if any
417        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
418            // It's possible that the key does not exist if the request was canceled
419            return;
420        };
421
422        // The peer had the data, so we can deliver it to the consumer
423        if self.consumer.deliver(key.clone(), response).await {
424            // Record metrics
425            self.metrics.fetch.inc(Status::Success);
426            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
427
428            // Clear all targets for this key
429            self.fetcher.clear_targets(&key);
430            return;
431        }
432
433        // If the data is invalid, we need to block the peer and try again
434        // (blocking the peer also removes any targets associated with it)
435        self.blocker.block(peer.clone()).await;
436        self.fetcher.block(peer);
437        self.metrics.fetch.inc(Status::Failure);
438        self.fetcher.add_retry(key);
439    }
440
441    /// Handle a network response from a peer that did not have the data.
442    async fn handle_network_error_response(&mut self, peer: P, id: u64) {
443        trace!(?peer, ?id, "peer response: error");
444
445        // Get the key associated with the response, if any
446        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
447            // It's possible that the key does not exist if the request was canceled
448            return;
449        };
450
451        // The peer did not have the data, so we need to try again
452        self.metrics.fetch.inc(Status::Failure);
453        self.fetcher.add_retry(key);
454    }
455}