near_api/common/query/handlers/
transformers.rs1use tracing::trace;
2
3use crate::{
4 advanced::{RpcType, handlers::ResponseHandler},
5 common::query::{QUERY_EXECUTOR_TARGET, ResultWithMethod},
6 errors::QueryError,
7};
8
9#[derive(Clone, Debug)]
10pub struct MultiQueryHandler<Handlers> {
11 handlers: Handlers,
12}
13
14impl<Query, H1, H2, R1, R2> ResponseHandler for MultiQueryHandler<(H1, H2)>
15where
16 Query: RpcType,
17 H1: ResponseHandler<Response = R1, Query = Query>,
18 H2: ResponseHandler<Response = R2, Query = Query>,
19{
20 type Response = (R1, R2);
21 type Query = H1::Query;
22
23 fn process_response(
24 &self,
25 mut responses: Vec<<H1::Query as RpcType>::Response>,
26 ) -> ResultWithMethod<Self::Response, <H1::Query as RpcType>::Error> {
27 let (h1, h2) = &self.handlers;
28
29 let first_response =
30 h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
31 let second_response = h2.process_response(responses)?;
32
33 Ok((first_response, second_response))
34 }
35
36 fn request_amount(&self) -> usize {
37 self.handlers.0.request_amount() + self.handlers.1.request_amount()
38 }
39}
40
41impl<Query, H1, H2, H3, R1, R2, R3> ResponseHandler for MultiQueryHandler<(H1, H2, H3)>
42where
43 Query: RpcType,
44 H1: ResponseHandler<Response = R1, Query = Query>,
45 H2: ResponseHandler<Response = R2, Query = Query>,
46 H3: ResponseHandler<Response = R3, Query = Query>,
47{
48 type Response = (R1, R2, R3);
49 type Query = Query;
50
51 fn process_response(
52 &self,
53 mut responses: Vec<<Query as RpcType>::Response>,
54 ) -> ResultWithMethod<Self::Response, <Query as RpcType>::Error> {
55 let (h1, h2, h3) = &self.handlers;
56
57 let first_response =
58 h1.process_response(responses.drain(0..h1.request_amount()).collect())?;
59 let second_response = h2.process_response(
60 responses
61 .drain(h1.request_amount()..h2.request_amount())
62 .collect(),
63 )?;
64 let third_response = h3.process_response(responses)?;
65
66 Ok((first_response, second_response, third_response))
67 }
68
69 fn request_amount(&self) -> usize {
70 self.handlers.0.request_amount() + self.handlers.1.request_amount()
71 }
72}
73
74impl<Handlers> MultiQueryHandler<Handlers> {
75 pub const fn new(handlers: Handlers) -> Self {
76 Self { handlers }
77 }
78}
79
80impl<Handlers: Default> Default for MultiQueryHandler<Handlers> {
81 fn default() -> Self {
82 Self::new(Default::default())
83 }
84}
85
86pub struct PostprocessHandler<PostProcessed, Handler: ResponseHandler> {
87 post_process: Box<dyn Fn(Handler::Response) -> PostProcessed + Send + Sync>,
88 handler: Handler,
89}
90
91impl<PostProcessed, Handler: ResponseHandler> PostprocessHandler<PostProcessed, Handler> {
92 pub fn new<F>(handler: Handler, post_process: F) -> Self
93 where
94 F: Fn(Handler::Response) -> PostProcessed + Send + Sync + 'static,
95 {
96 Self {
97 post_process: Box::new(post_process),
98 handler,
99 }
100 }
101}
102
103impl<PostProcessed, Handler> ResponseHandler for PostprocessHandler<PostProcessed, Handler>
104where
105 Handler: ResponseHandler,
106{
107 type Response = PostProcessed;
108 type Query = Handler::Query;
109
110 fn process_response(
111 &self,
112 response: Vec<<Self::Query as RpcType>::Response>,
113 ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
114 trace!(target: QUERY_EXECUTOR_TARGET, "Processing response with postprocessing, response count: {}", response.len());
115 Handler::process_response(&self.handler, response).map(|data| {
116 trace!(target: QUERY_EXECUTOR_TARGET, "Applying postprocessing");
117 (self.post_process)(data)
118 })
119 }
120
121 fn request_amount(&self) -> usize {
122 self.handler.request_amount()
123 }
124}
125
126pub struct AndThenHandler<PostProcessed, Handler: ResponseHandler> {
127 #[allow(clippy::complexity)]
128 post_process: Box<
129 dyn Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
130 + Send
131 + Sync,
132 >,
133 handler: Handler,
134}
135
136impl<PostProcessed, Handler: ResponseHandler> AndThenHandler<PostProcessed, Handler> {
137 pub fn new<F>(handler: Handler, post_process: F) -> Self
138 where
139 F: Fn(Handler::Response) -> Result<PostProcessed, Box<dyn std::error::Error + Send + Sync>>
140 + Send
141 + Sync
142 + 'static,
143 {
144 Self {
145 post_process: Box::new(post_process),
146 handler,
147 }
148 }
149}
150
151impl<PostProcessed, Handler> ResponseHandler for AndThenHandler<PostProcessed, Handler>
152where
153 Handler: ResponseHandler,
154{
155 type Response = PostProcessed;
156 type Query = Handler::Query;
157
158 fn process_response(
159 &self,
160 response: Vec<<Self::Query as RpcType>::Response>,
161 ) -> ResultWithMethod<Self::Response, <Self::Query as RpcType>::Error> {
162 Handler::process_response(&self.handler, response)
163 .map(|data| (self.post_process)(data))?
164 .map_err(QueryError::ConversionError)
165 }
166}