alloy_pubsub/managers/
in_flight.rs

1use alloy_json_rpc::{Response, ResponsePayload, SerializedRequest, SubId};
2use alloy_transport::{TransportError, TransportResult};
3use std::fmt;
4use tokio::sync::oneshot;
5
6/// An in-flight JSON-RPC request.
7///
8/// This struct contains the request that was sent, as well as a channel to
9/// receive the response on.
10pub struct InFlight {
11    /// The request
12    pub request: SerializedRequest,
13
14    /// The number of items to buffer in the subscription channel.
15    pub channel_size: usize,
16
17    /// The channel to send the response on.
18    pub tx: oneshot::Sender<TransportResult<Response>>,
19}
20
21impl fmt::Debug for InFlight {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("InFlight")
24            .field("request", &self.request)
25            .field("channel_size", &self.channel_size)
26            .field("tx_is_closed", &self.tx.is_closed())
27            .finish()
28    }
29}
30
31impl InFlight {
32    /// Create a new in-flight request.
33    pub fn new(
34        request: SerializedRequest,
35        channel_size: usize,
36    ) -> (Self, oneshot::Receiver<TransportResult<Response>>) {
37        let (tx, rx) = oneshot::channel();
38
39        (Self { request, channel_size, tx }, rx)
40    }
41
42    /// Check if the request is a subscription.
43    pub fn is_subscription(&self) -> bool {
44        self.request.is_subscription()
45    }
46
47    /// Get a reference to the serialized request.
48    ///
49    /// This is used to (re-)send the request over the transport.
50    pub const fn request(&self) -> &SerializedRequest {
51        &self.request
52    }
53
54    /// Fulfill the request with a response. This consumes the in-flight
55    /// request. If the request is a subscription and the response is not an
56    /// error, the subscription ID and the in-flight request are returned.
57    pub fn fulfill(self, resp: Response) -> Option<(SubId, Self)> {
58        if self.is_subscription() {
59            if let ResponsePayload::Success(val) = resp.payload {
60                let sub_id: serde_json::Result<SubId> = serde_json::from_str(val.get());
61                return match sub_id {
62                    Ok(alias) => Some((alias, self)),
63                    Err(e) => {
64                        let _ = self.tx.send(Err(TransportError::deser_err(e, val.get())));
65                        None
66                    }
67                };
68            }
69        }
70
71        let _ = self.tx.send(Ok(resp));
72        None
73    }
74}