near_api/common/query/handlers/
transformers.rs1use tracing::trace;
2
3use crate::{
4 advanced::{handlers::ResponseHandler, RpcType},
5 common::query::{ResultWithMethod, QUERY_EXECUTOR_TARGET},
6 errors::QueryError,
7};
8
9#[derive(Clone, Debug)]
10pub struct MultiQueryHandler<Handlers> {
11 handlers: Handlers,
12}
13
14impl<Query, H1, H2, R1, R2> ResponseHandler for MultiQueryHandler<(H1, H2)>
15where
16 Query: RpcType,
17 H1: ResponseHandler<Response = R1, Query = Query>,
18 H2: ResponseHandler<Response = R2, Query = Query>,
19{
20 type Response = (R1, R2);
21 type Query = H1::Query;
22
23 fn process_response(
24 &self,
25 mut responses: Vec<<H1::Query as RpcType>::Response>,
26 ) -> ResultWithMethod<Self::Response, <H1::Query as RpcType>::Error> {
27 let (h1, h2) = &self.handlers;
28
29 let first_response =
30 h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
31 let second_response = h2.process_response(responses)?;
32
33 Ok((first_response, second_response))
34 }
35
36 fn request_amount(&self) -> usize {
37 self.handlers.0.request_amount() + self.handlers.1.request_amount()
38 }
39}
40
41impl<Query, H1, H2, H3, R1, R2, R3> ResponseHandler for MultiQueryHandler<(H1, H2, H3)>
42where
43 Query: RpcType,
44 H1: ResponseHandler<Response = R1, Query = Query>,
45 H2: ResponseHandler<Response = R2, Query = Query>,
46 H3: ResponseHandler<Response = R3, Query = Query>,
47{
48 type Response = (R1, R2, R3);
49 type Query = Query;
50
51 fn process_response(
52 &self,
53 mut responses: Vec<<Query as RpcType>::Response>,
54 ) -> ResultWithMethod<Self::Response, <Query as RpcType>::Error> {
55 let (h1, h2, h3) = &self.handlers;
56
57 let first_response =
58 h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
59 let second_response =
60 h2.process_response(responses.drain(0..h2.request_amount()).collect())?;
61 let third_response = h3.process_response(responses)?;
62
63 Ok((first_response, second_response, third_response))
64 }
65
66 fn request_amount(&self) -> usize {
67 self.handlers.0.request_amount() + self.handlers.1.request_amount()
68 }
69}
70
71impl<Handlers> MultiQueryHandler<Handlers> {
72 pub const fn new(handlers: Handlers) -> Self {
73 Self { handlers }
74 }
75}
76
77impl<Handlers: Default> Default for MultiQueryHandler<Handlers> {
78 fn default() -> Self {
79 Self::new(Default::default())
80 }
81}
82
83pub struct PostprocessHandler<PostProcessed, Handler: ResponseHandler> {
84 post_process: Box<dyn Fn(Handler::Response) -> PostProcessed + Send + Sync>,
85 handler: Handler,
86}
87
88impl<PostProcessed, Handler: ResponseHandler> PostprocessHandler<PostProcessed, Handler> {
89 pub fn new<F>(handler: Handler, post_process: F) -> Self
90 where
91 F: Fn(Handler::Response) -> PostProcessed + Send + Sync + 'static,
92 {
93 Self {
94 post_process: Box::new(post_process),
95 handler,
96 }
97 }
98}
99
100impl<PostProcessed, Handler> ResponseHandler for PostprocessHandler<PostProcessed, Handler>
101where
102 Handler: ResponseHandler,
103{
104 type Response = PostProcessed;
105 type Query = Handler::Query;
106
107 fn process_response(
108 &self,
109 response: Vec<<Self::Query as RpcType>::Response>,
110 ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
111 trace!(target: QUERY_EXECUTOR_TARGET, "Processing response with postprocessing, response count: {}", response.len());
112 Handler::process_response(&self.handler, response).map(|data| {
113 trace!(target: QUERY_EXECUTOR_TARGET, "Applying postprocessing");
114 (self.post_process)(data)
115 })
116 }
117
118 fn request_amount(&self) -> usize {
119 self.handler.request_amount()
120 }
121}
122
123pub struct AndThenHandler<PostProcessed, Handler: ResponseHandler> {
124 #[allow(clippy::complexity)]
125 post_process: Box<
126 dyn Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
127 + Send
128 + Sync,
129 >,
130 handler: Handler,
131}
132
133impl<PostProcessed, Handler: ResponseHandler> AndThenHandler<PostProcessed, Handler> {
134 pub fn new<F>(handler: Handler, post_process: F) -> Self
135 where
136 F: Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
137 + Send
138 + Sync
139 + 'static,
140 {
141 Self {
142 post_process: Box::new(post_process),
143 handler,
144 }
145 }
146}
147
148impl<PostProcessed, Handler> ResponseHandler for AndThenHandler<PostProcessed, Handler>
149where
150 Handler: ResponseHandler,
151{
152 type Response = PostProcessed;
153 type Query = Handler::Query;
154
155 fn process_response(
156 &self,
157 response: Vec<<Self::Query as RpcType>::Response>,
158 ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
159 Handler::process_response(&self.handler, response)
160 .map(|data| (self.post_process)(data))?
161 .map_err(QueryError::ConversionError)
162 }
163}