near_api/common/query/handlers/
transformers.rs

1use tracing::trace;
2
3use crate::{
4    advanced::{handlers::ResponseHandler, RpcType},
5    common::query::{ResultWithMethod, QUERY_EXECUTOR_TARGET},
6    errors::QueryError,
7};
8
9#[derive(Clone, Debug)]
10pub struct MultiQueryHandler<Handlers> {
11    handlers: Handlers,
12}
13
14impl<Query, H1, H2, R1, R2> ResponseHandler for MultiQueryHandler<(H1, H2)>
15where
16    Query: RpcType,
17    H1: ResponseHandler<Response = R1, Query = Query>,
18    H2: ResponseHandler<Response = R2, Query = Query>,
19{
20    type Response = (R1, R2);
21    type Query = H1::Query;
22
23    fn process_response(
24        &self,
25        mut responses: Vec<<H1::Query as RpcType>::Response>,
26    ) -> ResultWithMethod<Self::Response, <H1::Query as RpcType>::Error> {
27        let (h1, h2) = &self.handlers;
28
29        let first_response =
30            h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
31        let second_response = h2.process_response(responses)?;
32
33        Ok((first_response, second_response))
34    }
35
36    fn request_amount(&self) -> usize {
37        self.handlers.0.request_amount() + self.handlers.1.request_amount()
38    }
39}
40
41impl<Query, H1, H2, H3, R1, R2, R3> ResponseHandler for MultiQueryHandler<(H1, H2, H3)>
42where
43    Query: RpcType,
44    H1: ResponseHandler<Response = R1, Query = Query>,
45    H2: ResponseHandler<Response = R2, Query = Query>,
46    H3: ResponseHandler<Response = R3, Query = Query>,
47{
48    type Response = (R1, R2, R3);
49    type Query = Query;
50
51    fn process_response(
52        &self,
53        mut responses: Vec<<Query as RpcType>::Response>,
54    ) -> ResultWithMethod<Self::Response, <Query as RpcType>::Error> {
55        let (h1, h2, h3) = &self.handlers;
56
57        let first_response =
58            h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
59        let second_response =
60            h2.process_response(responses.drain(0..h2.request_amount()).collect())?;
61        let third_response = h3.process_response(responses)?;
62
63        Ok((first_response, second_response, third_response))
64    }
65
66    fn request_amount(&self) -> usize {
67        self.handlers.0.request_amount() + self.handlers.1.request_amount()
68    }
69}
70
71impl<Handlers> MultiQueryHandler<Handlers> {
72    pub const fn new(handlers: Handlers) -> Self {
73        Self { handlers }
74    }
75}
76
77impl<Handlers: Default> Default for MultiQueryHandler<Handlers> {
78    fn default() -> Self {
79        Self::new(Default::default())
80    }
81}
82
83pub struct PostprocessHandler<PostProcessed, Handler: ResponseHandler> {
84    post_process: Box<dyn Fn(Handler::Response) -> PostProcessed + Send + Sync>,
85    handler: Handler,
86}
87
88impl<PostProcessed, Handler: ResponseHandler> PostprocessHandler<PostProcessed, Handler> {
89    pub fn new<F>(handler: Handler, post_process: F) -> Self
90    where
91        F: Fn(Handler::Response) -> PostProcessed + Send + Sync + 'static,
92    {
93        Self {
94            post_process: Box::new(post_process),
95            handler,
96        }
97    }
98}
99
100impl<PostProcessed, Handler> ResponseHandler for PostprocessHandler<PostProcessed, Handler>
101where
102    Handler: ResponseHandler,
103{
104    type Response = PostProcessed;
105    type Query = Handler::Query;
106
107    fn process_response(
108        &self,
109        response: Vec<<Self::Query as RpcType>::Response>,
110    ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
111        trace!(target: QUERY_EXECUTOR_TARGET, "Processing response with postprocessing, response count: {}", response.len());
112        Handler::process_response(&self.handler, response).map(|data| {
113            trace!(target: QUERY_EXECUTOR_TARGET, "Applying postprocessing");
114            (self.post_process)(data)
115        })
116    }
117
118    fn request_amount(&self) -> usize {
119        self.handler.request_amount()
120    }
121}
122
123pub struct AndThenHandler<PostProcessed, Handler: ResponseHandler> {
124    #[allow(clippy::complexity)]
125    post_process: Box<
126        dyn Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
127            + Send
128            + Sync,
129    >,
130    handler: Handler,
131}
132
133impl<PostProcessed, Handler: ResponseHandler> AndThenHandler<PostProcessed, Handler> {
134    pub fn new<F>(handler: Handler, post_process: F) -> Self
135    where
136        F: Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
137            + Send
138            + Sync
139            + 'static,
140    {
141        Self {
142            post_process: Box::new(post_process),
143            handler,
144        }
145    }
146}
147
148impl<PostProcessed, Handler> ResponseHandler for AndThenHandler<PostProcessed, Handler>
149where
150    Handler: ResponseHandler,
151{
152    type Response = PostProcessed;
153    type Query = Handler::Query;
154
155    fn process_response(
156        &self,
157        response: Vec<<Self::Query as RpcType>::Response>,
158    ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
159        Handler::process_response(&self.handler, response)
160            .map(|data| (self.post_process)(data))?
161            .map_err(QueryError::ConversionError)
162    }
163}