near_api/common/query/handlers/
transformers.rs

1use tracing::trace;
2
3use crate::{
4    advanced::{RpcType, handlers::ResponseHandler},
5    common::query::{QUERY_EXECUTOR_TARGET, ResultWithMethod},
6    errors::QueryError,
7};
8
9#[derive(Clone, Debug)]
10pub struct MultiQueryHandler<Handlers> {
11    handlers: Handlers,
12}
13
14impl<Query, H1, H2, R1, R2> ResponseHandler for MultiQueryHandler<(H1, H2)>
15where
16    Query: RpcType,
17    H1: ResponseHandler<Response = R1, Query = Query>,
18    H2: ResponseHandler<Response = R2, Query = Query>,
19{
20    type Response = (R1, R2);
21    type Query = H1::Query;
22
23    fn process_response(
24        &self,
25        mut responses: Vec<<H1::Query as RpcType>::Response>,
26    ) -> ResultWithMethod<Self::Response, <H1::Query as RpcType>::Error> {
27        let (h1, h2) = &self.handlers;
28
29        let first_response =
30            h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
31        let second_response = h2.process_response(responses)?;
32
33        Ok((first_response, second_response))
34    }
35
36    fn request_amount(&self) -> usize {
37        self.handlers.0.request_amount() + self.handlers.1.request_amount()
38    }
39}
40
41impl<Query, H1, H2, H3, R1, R2, R3> ResponseHandler for MultiQueryHandler<(H1, H2, H3)>
42where
43    Query: RpcType,
44    H1: ResponseHandler<Response = R1, Query = Query>,
45    H2: ResponseHandler<Response = R2, Query = Query>,
46    H3: ResponseHandler<Response = R3, Query = Query>,
47{
48    type Response = (R1, R2, R3);
49    type Query = Query;
50
51    fn process_response(
52        &self,
53        mut responses: Vec<<Query as RpcType>::Response>,
54    ) -> ResultWithMethod<Self::Response, <Query as RpcType>::Error> {
55        let (h1, h2, h3) = &self.handlers;
56
57        let first_response =
58            h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
59        let second_response = h2.process_response(
60            responses
61                .drain(h1.request_amount()..h2.request_amount())
62                .collect(),
63        )?;
64        let third_response = h3.process_response(responses)?;
65
66        Ok((first_response, second_response, third_response))
67    }
68
69    fn request_amount(&self) -> usize {
70        self.handlers.0.request_amount() + self.handlers.1.request_amount()
71    }
72}
73
74impl<Handlers> MultiQueryHandler<Handlers> {
75    pub const fn new(handlers: Handlers) -> Self {
76        Self { handlers }
77    }
78}
79
80impl<Handlers: Default> Default for MultiQueryHandler<Handlers> {
81    fn default() -> Self {
82        Self::new(Default::default())
83    }
84}
85
86pub struct PostprocessHandler<PostProcessed, Handler: ResponseHandler> {
87    post_process: Box<dyn Fn(Handler::Response) -> PostProcessed + Send + Sync>,
88    handler: Handler,
89}
90
91impl<PostProcessed, Handler: ResponseHandler> PostprocessHandler<PostProcessed, Handler> {
92    pub fn new<F>(handler: Handler, post_process: F) -> Self
93    where
94        F: Fn(Handler::Response) -> PostProcessed + Send + Sync + 'static,
95    {
96        Self {
97            post_process: Box::new(post_process),
98            handler,
99        }
100    }
101}
102
103impl<PostProcessed, Handler> ResponseHandler for PostprocessHandler<PostProcessed, Handler>
104where
105    Handler: ResponseHandler,
106{
107    type Response = PostProcessed;
108    type Query = Handler::Query;
109
110    fn process_response(
111        &self,
112        response: Vec<<Self::Query as RpcType>::Response>,
113    ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
114        trace!(target: QUERY_EXECUTOR_TARGET, "Processing response with postprocessing, response count: {}", response.len());
115        Handler::process_response(&self.handler, response).map(|data| {
116            trace!(target: QUERY_EXECUTOR_TARGET, "Applying postprocessing");
117            (self.post_process)(data)
118        })
119    }
120
121    fn request_amount(&self) -> usize {
122        self.handler.request_amount()
123    }
124}
125
126pub struct AndThenHandler<PostProcessed, Handler: ResponseHandler> {
127    #[allow(clippy::complexity)]
128    post_process: Box<
129        dyn Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
130            + Send
131            + Sync,
132    >,
133    handler: Handler,
134}
135
136impl<PostProcessed, Handler: ResponseHandler> AndThenHandler<PostProcessed, Handler> {
137    pub fn new<F>(handler: Handler, post_process: F) -> Self
138    where
139        F: Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
140            + Send
141            + Sync
142            + 'static,
143    {
144        Self {
145            post_process: Box::new(post_process),
146            handler,
147        }
148    }
149}
150
151impl<PostProcessed, Handler> ResponseHandler for AndThenHandler<PostProcessed, Handler>
152where
153    Handler: ResponseHandler,
154{
155    type Response = PostProcessed;
156    type Query = Handler::Query;
157
158    fn process_response(
159        &self,
160        response: Vec<<Self::Query as RpcType>::Response>,
161    ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
162        Handler::process_response(&self.handler, response)
163            .map(|data| (self.post_process)(data))?
164            .map_err(QueryError::ConversionError)
165    }
166}