commonware_resolver/p2p/
engine.rs

1use super::{
2    config::Config,
3    fetcher::Fetcher,
4    ingress::{Mailbox, Message},
5    metrics, wire, Coordinator, Producer,
6};
7use crate::Consumer;
8use bytes::Bytes;
9use commonware_cryptography::PublicKey;
10use commonware_macros::select;
11use commonware_p2p::{
12    utils::codec::{wrap, WrappedSender},
13    Receiver, Recipients, Sender,
14};
15use commonware_runtime::{
16    telemetry::metrics::{
17        histogram,
18        status::{CounterExt, Status},
19    },
20    Clock, Handle, Metrics, Spawner,
21};
22use commonware_utils::{futures::Pool as FuturesPool, Span};
23use futures::{
24    channel::{mpsc, oneshot},
25    future::{self, Either},
26    StreamExt,
27};
28use governor::clock::Clock as GClock;
29use rand::Rng;
30use std::{collections::HashMap, marker::PhantomData};
31use tracing::{debug, error, trace, warn};
32
33/// Represents a pending serve operation.
34struct Serve<E: Clock, P: PublicKey> {
35    timer: histogram::Timer<E>,
36    peer: P,
37    id: u64,
38    result: Result<Bytes, oneshot::Canceled>,
39}
40
41/// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations.
42pub struct Engine<
43    E: Clock + GClock + Spawner + Rng + Metrics,
44    P: PublicKey,
45    D: Coordinator<PublicKey = P>,
46    Key: Span,
47    Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
48    Pro: Producer<Key = Key>,
49    NetS: Sender<PublicKey = P>,
50    NetR: Receiver<PublicKey = P>,
51> {
52    /// Context used to spawn tasks, manage time, etc.
53    context: E,
54
55    /// Consumes data that is fetched from the network
56    consumer: Con,
57
58    /// Produces data for incoming requests
59    producer: Pro,
60
61    /// Manages the list of peers that can be used to fetch data
62    coordinator: D,
63
64    /// Used to detect changes in the peer set
65    last_peer_set_id: Option<u64>,
66
67    /// Mailbox that makes and cancels fetch requests
68    mailbox: mpsc::Receiver<Message<Key>>,
69
70    /// Manages outgoing fetch requests
71    fetcher: Fetcher<E, P, Key, NetS>,
72
73    /// Track the start time of fetch operations
74    fetch_timers: HashMap<Key, histogram::Timer<E>>,
75
76    /// Holds futures that resolve once the `Producer` has produced the data.
77    /// Once the future is resolved, the data (or an error) is sent to the peer.
78    /// Has unbounded size; the number of concurrent requests should be limited
79    /// by the `Producer` which may drop requests.
80    serves: FuturesPool<Serve<E, P>>,
81
82    /// Whether responses are sent with priority over other network messages
83    priority_responses: bool,
84
85    /// Metrics for the peer actor
86    metrics: metrics::Metrics<E>,
87
88    /// Phantom data for networking types
89    _s: PhantomData<NetS>,
90    _r: PhantomData<NetR>,
91}
92
93impl<
94        E: Clock + GClock + Spawner + Rng + Metrics,
95        P: PublicKey,
96        D: Coordinator<PublicKey = P>,
97        Key: Span,
98        Con: Consumer<Key = Key, Value = Bytes, Failure = ()>,
99        Pro: Producer<Key = Key>,
100        NetS: Sender<PublicKey = P>,
101        NetR: Receiver<PublicKey = P>,
102    > Engine<E, P, D, Key, Con, Pro, NetS, NetR>
103{
104    /// Creates a new `Actor` with the given configuration.
105    ///
106    /// Returns the actor and a mailbox to send messages to it.
107    pub fn new(context: E, cfg: Config<P, D, Key, Con, Pro>) -> (Self, Mailbox<Key>) {
108        let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
109        let metrics = metrics::Metrics::init(context.clone());
110        let fetcher = Fetcher::new(
111            context.with_label("fetcher"),
112            cfg.requester_config,
113            cfg.fetch_retry_timeout,
114            cfg.priority_requests,
115        );
116        (
117            Self {
118                context,
119                consumer: cfg.consumer,
120                producer: cfg.producer,
121                coordinator: cfg.coordinator,
122                last_peer_set_id: None,
123                mailbox: receiver,
124                fetcher,
125                serves: FuturesPool::default(),
126                priority_responses: cfg.priority_responses,
127                metrics,
128                fetch_timers: HashMap::new(),
129                _s: PhantomData,
130                _r: PhantomData,
131            },
132            Mailbox::new(sender),
133        )
134    }
135
136    /// Runs the actor until the context is stopped.
137    ///
138    /// The actor will handle:
139    /// - Fetching data from other peers and notifying the `Consumer`
140    /// - Serving data to other peers by requesting it from the `Producer`
141    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
142        self.context.spawn_ref()(self.run(network))
143    }
144
145    /// Inner run loop called by `start`.
146    async fn run(mut self, network: (NetS, NetR)) {
147        let mut shutdown = self.context.stopped();
148
149        // Wrap channel
150        let (mut sender, mut receiver) = wrap((), network.0, network.1);
151
152        // Set initial peer set.
153        self.last_peer_set_id = Some(self.coordinator.peer_set_id());
154        self.fetcher.reconcile(self.coordinator.peers());
155
156        loop {
157            // Update metrics
158            self.metrics
159                .fetch_pending
160                .set(self.fetcher.len_pending() as i64);
161            self.metrics
162                .fetch_active
163                .set(self.fetcher.len_active() as i64);
164            self.metrics
165                .peers_blocked
166                .set(self.fetcher.len_blocked() as i64);
167            self.metrics.serve_processing.set(self.serves.len() as i64);
168
169            // Update peer list if-and-only-if it might have changed.
170            let peer_set_id = self.coordinator.peer_set_id();
171            if self.last_peer_set_id != Some(peer_set_id) {
172                self.last_peer_set_id = Some(peer_set_id);
173                self.fetcher.reconcile(self.coordinator.peers());
174            }
175
176            // Get retry timeout (if any)
177            let deadline_pending = match self.fetcher.get_pending_deadline() {
178                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
179                None => Either::Right(future::pending()),
180            };
181
182            // Get requester timeout (if any)
183            let deadline_active = match self.fetcher.get_active_deadline() {
184                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
185                None => Either::Right(future::pending()),
186            };
187
188            // Handle shutdown signal
189            select! {
190                _ = &mut shutdown => {
191                    debug!("shutdown");
192                    self.serves.cancel_all();
193                    return;
194                },
195
196                // Handle mailbox messages
197                msg = self.mailbox.next() => {
198                    let Some(msg) = msg else {
199                        error!("mailbox closed");
200                        return;
201                    };
202                    match msg {
203                        Message::Fetch { key } => {
204                            trace!(?key, "mailbox: fetch");
205
206                            // Check if the fetch is already in progress
207                            if self.fetch_timers.contains_key(&key) {
208                                trace!(?key, "duplicate fetch");
209                                self.metrics.fetch.inc(Status::Dropped);
210                                continue;
211                            }
212
213                            // Record fetch start time
214                            self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer());
215                            self.fetcher.fetch(&mut sender, key, true).await;
216                        }
217                        Message::Cancel { key } => {
218                            trace!(?key, "mailbox: cancel");
219                            let mut guard = self.metrics.cancel.guard(Status::Dropped);
220                            if self.fetcher.cancel(&key) {
221                                guard.set(Status::Success);
222                                self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric
223                                self.consumer.failed(key.clone(), ()).await;
224                            }
225                        }
226                        Message::Retain { predicate } => {
227                            trace!("mailbox: retain");
228                            let before = self.fetcher.len();
229                            self.fetcher.retain(predicate);
230                            let after = self.fetcher.len();
231                            if before == after {
232                                self.metrics.cancel.inc(Status::Dropped);
233                            } else {
234                                self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
235                            }
236                        }
237                        Message::Clear => {
238                            trace!("mailbox: clear");
239                            let before = self.fetcher.len();
240                            self.fetcher.clear();
241                            let after = self.fetcher.len();
242                            if before == after {
243                                self.metrics.cancel.inc(Status::Dropped);
244                            } else {
245                                self.metrics.cancel.inc_by(Status::Success, before.checked_sub(after).unwrap() as u64);
246                            }
247                        }
248                    }
249                },
250
251                // Handle completed server requests
252                serve = self.serves.next_completed() => {
253                    let Serve { timer, peer, id, result } = serve;
254
255                    // Metrics and logs
256                    match result {
257                        Ok(_) => {
258                            self.metrics.serve.inc(Status::Success);
259                        }
260                        Err(err) => {
261                            debug!(?err, ?peer, ?id, "serve failed");
262                            timer.cancel();
263                            self.metrics.serve.inc(Status::Failure);
264                        }
265                    }
266
267                    // Send response to peer
268                    self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await;
269                },
270
271                // Handle network messages
272                msg = receiver.recv() => {
273                    // Break if the receiver is closed
274                    let (peer, msg) = match msg {
275                        Ok(msg) => msg,
276                        Err(err) => {
277                            error!(?err, "receiver closed");
278                            return;
279                        }
280                    };
281
282                    // Skip if there is a decoding error
283                    let msg = match msg {
284                        Ok(msg) => msg,
285                        Err(err) => {
286                            trace!(?err, ?peer, "decode failed");
287                            continue;
288                        }
289                    };
290                    match msg.payload {
291                        wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await,
292                        wire::Payload::Response(response) => self.handle_network_response(&mut sender, peer, msg.id, response).await,
293                        wire::Payload::ErrorResponse => self.handle_network_error_response(&mut sender, peer, msg.id).await,
294                    };
295                },
296
297                // Handle pending deadline
298                _ = deadline_pending => {
299                    let key = self.fetcher.pop_pending();
300                    debug!(?key, "retrying");
301                    self.metrics.fetch.inc(Status::Failure);
302                    self.fetcher.fetch(&mut sender, key, false).await;
303                },
304
305                // Handle active deadline
306                _ = deadline_active => {
307                    if let Some(key) = self.fetcher.pop_active() {
308                        debug!(?key, "requester timeout");
309                        self.metrics.fetch.inc(Status::Failure);
310                        self.fetcher.fetch(&mut sender, key, false).await;
311                    }
312                },
313            }
314        }
315    }
316
317    /// Handles the case where the application responds to a request from an external peer.
318    async fn handle_serve(
319        &mut self,
320        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
321        peer: P,
322        id: u64,
323        response: Result<Bytes, oneshot::Canceled>,
324        priority: bool,
325    ) {
326        // Encode message
327        let payload: wire::Payload<Key> = match response {
328            Ok(data) => wire::Payload::Response(data),
329            Err(_) => wire::Payload::ErrorResponse,
330        };
331        let msg = wire::Message { id, payload };
332
333        // Send message to peer
334        let result = sender
335            .send(Recipients::One(peer.clone()), msg, priority)
336            .await;
337
338        // Log result, but do not handle errors
339        match result {
340            Err(err) => error!(?err, ?peer, ?id, "serve send failed"),
341            Ok(to) if to.is_empty() => warn!(?peer, ?id, "serve send failed"),
342            Ok(_) => trace!(?peer, ?id, "serve sent"),
343        };
344    }
345
346    /// Handle a network request from a peer.
347    async fn handle_network_request(&mut self, peer: P, id: u64, key: Key) {
348        // Serve the request
349        trace!(?peer, ?id, "peer request");
350        let mut producer = self.producer.clone();
351        let timer = self.metrics.serve_duration.timer();
352        self.serves.push(async move {
353            let receiver = producer.produce(key).await;
354            let result = receiver.await;
355            Serve {
356                timer,
357                peer,
358                id,
359                result,
360            }
361        });
362    }
363
364    /// Handle a network response from a peer.
365    async fn handle_network_response(
366        &mut self,
367        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
368        peer: P,
369        id: u64,
370        response: Bytes,
371    ) {
372        trace!(?peer, ?id, "peer response: data");
373
374        // Get the key associated with the response, if any
375        let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else {
376            // It's possible that the key does not exist if the request was canceled
377            return;
378        };
379
380        // The peer had the data, so we can deliver it to the consumer
381        if self.consumer.deliver(key.clone(), response).await {
382            // Record metrics
383            self.metrics.fetch.inc(Status::Success);
384            self.fetch_timers.remove(&key).unwrap(); // must exist in the map, records metric on drop
385            return;
386        }
387
388        // If the data is invalid, we need to block the peer and try again
389        self.fetcher.block(peer);
390        self.metrics.fetch.inc(Status::Failure);
391        self.fetcher.fetch(sender, key, false).await;
392    }
393
394    /// Handle a network response from a peer that did not have the data.
395    async fn handle_network_error_response(
396        &mut self,
397        sender: &mut WrappedSender<NetS, wire::Message<Key>>,
398        peer: P,
399        id: u64,
400    ) {
401        trace!(?peer, ?id, "peer response: error");
402
403        // Get the key associated with the response, if any
404        let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else {
405            // It's possible that the key does not exist if the request was canceled
406            return;
407        };
408
409        // The peer did not have the data, so we need to try again
410        self.metrics.fetch.inc(Status::Failure);
411        // Don't reset start time for retries, keep the original
412        self.fetcher.fetch(sender, key, false).await;
413    }
414}