alloy_pubsub/managers/
in_flight.rs1use alloy_json_rpc::{Response, ResponsePayload, SerializedRequest, SubId};
2use alloy_transport::{TransportError, TransportResult};
3use std::fmt;
4use tokio::sync::oneshot;
5
6pub struct InFlight {
11 pub request: SerializedRequest,
13
14 pub channel_size: usize,
16
17 pub tx: oneshot::Sender<TransportResult<Response>>,
19}
20
21impl fmt::Debug for InFlight {
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 f.debug_struct("InFlight")
24 .field("request", &self.request)
25 .field("channel_size", &self.channel_size)
26 .field("tx_is_closed", &self.tx.is_closed())
27 .finish()
28 }
29}
30
31impl InFlight {
32 pub fn new(
34 request: SerializedRequest,
35 channel_size: usize,
36 ) -> (Self, oneshot::Receiver<TransportResult<Response>>) {
37 let (tx, rx) = oneshot::channel();
38
39 (Self { request, channel_size, tx }, rx)
40 }
41
42 pub fn is_subscription(&self) -> bool {
44 self.request.is_subscription()
45 }
46
47 pub const fn request(&self) -> &SerializedRequest {
51 &self.request
52 }
53
54 pub fn fulfill(self, resp: Response) -> Option<(SubId, Self)> {
58 if self.is_subscription() {
59 if let ResponsePayload::Success(val) = resp.payload {
60 let sub_id: serde_json::Result<SubId> = serde_json::from_str(val.get());
61 return match sub_id {
62 Ok(alias) => Some((alias, self)),
63 Err(e) => {
64 let _ = self.tx.send(Err(TransportError::deser_err(e, val.get())));
65 None
66 }
67 };
68 }
69 }
70
71 let _ = self.tx.send(Ok(resp));
72 None
73 }
74}