picodata_plugin/transport/rpc/
client.rs1use super::Request;
2use super::Response;
3use crate::internal::ffi;
4use crate::plugin::interface::PicoContext;
5#[allow(unused_imports)]
6use crate::transport::rpc::server::RouteBuilder;
7use crate::util::FfiSafeBytes;
8use crate::util::FfiSafeStr;
9use crate::util::RegionGuard;
10use std::marker::PhantomData;
11use std::mem::MaybeUninit;
12use std::time::Duration;
13use tarantool::error::BoxError;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::fiber;
16use tarantool::time::Instant;
17
18#[derive(Debug)]
26pub struct RequestBuilder<'a> {
27 target: FfiSafeRpcTargetSpecifier,
28 plugin_service: Option<(&'a str, &'a str)>,
29 version: Option<&'a str>,
30 path: Option<&'a str>,
31 input: Option<Request<'a>>,
32 timeout: Option<Duration>,
33}
34
35impl<'a> RequestBuilder<'a> {
36 #[inline]
37 pub fn new(target: RequestTarget<'a>) -> Self {
38 let target = match target {
39 RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
40 RequestTarget::InstanceName(instance_name) => {
41 FfiSafeRpcTargetSpecifier::InstanceName(instance_name.into())
42 }
43 RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
44 bucket_id,
45 to_master,
46 },
47 RequestTarget::TierAndBucketId(tier, bucket_id, to_master) => {
48 FfiSafeRpcTargetSpecifier::TierAndBucketId {
49 tier: tier.into(),
50 bucket_id,
51 to_master,
52 }
53 }
54 RequestTarget::ReplicasetName(replicaset_name, to_master) => {
55 FfiSafeRpcTargetSpecifier::Replicaset {
56 replicaset_name: replicaset_name.into(),
57 to_master,
58 }
59 }
60 };
61 Self {
62 target,
63 plugin_service: None,
64 version: None,
65 path: None,
66 input: None,
67 timeout: None,
68 }
69 }
70
71 #[inline]
74 #[track_caller]
75 pub fn pico_context(self, context: &'a PicoContext) -> Self {
76 self.plugin_service(context.plugin_name(), context.service_name())
77 .plugin_version(context.plugin_version())
78 }
79
80 #[inline]
82 pub fn plugin_service(mut self, plugin: &'a str, service: &'a str) -> Self {
83 let new = (plugin, service);
84 if let Some(old) = self.plugin_service.take() {
85 #[rustfmt::skip]
86 tarantool::say_warn!("RequestBuilder plugin.service is silently changed from {old:?} to {new:?}");
87 }
88 self.plugin_service = Some(new);
89 self
90 }
91
92 #[inline]
94 pub fn plugin_version(mut self, version: &'a str) -> Self {
95 if let Some(old) = self.version.take() {
96 #[rustfmt::skip]
97 tarantool::say_warn!("RequestBuilder service version is silently changed from {old:?} to {version:?}");
98 }
99 self.version = Some(version);
100 self
101 }
102
103 #[inline]
105 pub fn path(mut self, path: &'a str) -> Self {
106 if let Some(old) = self.path.take() {
107 #[rustfmt::skip]
108 tarantool::say_warn!("RequestBuilder path is silently changed from {old:?} to {path:?}");
109 }
110 self.path = Some(path);
111 self
112 }
113
114 #[inline]
116 pub fn input(mut self, input: Request<'a>) -> Self {
117 if let Some(old) = self.input.take() {
118 #[rustfmt::skip]
119 tarantool::say_warn!("RequestBuilder input is silently changed from {old:?} to {input:?}");
120 }
121 self.input = Some(input);
122 self
123 }
124
125 #[inline]
127 pub fn timeout(mut self, timeout: Duration) -> Self {
128 if let Some(old) = self.timeout.take() {
129 #[rustfmt::skip]
130 tarantool::say_warn!("RequestBuilder timeout is silently changed from {old:?} to {timeout:?}");
131 }
132 self.timeout = Some(timeout);
133 self
134 }
135
136 #[inline(always)]
138 pub fn deadline(self, deadline: Instant) -> Self {
139 self.timeout(deadline.duration_since(fiber::clock()))
140 }
141
142 #[track_caller]
143 fn to_ffi(&self) -> Result<FfiSafeRpcRequestArguments<'a>, BoxError> {
144 let Some((plugin, service)) = self.plugin_service else {
145 #[rustfmt::skip]
146 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "plugin.service must be specified for RPC request"));
147 };
148
149 let Some(version) = self.version else {
150 #[rustfmt::skip]
151 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "service version must be specified for RPC request"));
152 };
153
154 let Some(path) = self.path else {
155 #[rustfmt::skip]
156 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC request"));
157 };
158
159 let Some(input) = &self.input else {
160 #[rustfmt::skip]
161 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
162 };
163
164 let target = self.target;
165
166 Ok(FfiSafeRpcRequestArguments {
167 plugin: plugin.into(),
168 service: service.into(),
169 version: version.into(),
170 target,
171 path: path.into(),
172 input: input.as_bytes().into(),
173 _marker: PhantomData,
174 })
175 }
176
177 #[inline]
191 #[track_caller]
192 pub fn send(&self) -> Result<Response, BoxError> {
193 let arguments = self.to_ffi()?;
194 let res = send_rpc_request(&arguments, self.timeout)?;
195 Ok(res)
196 }
197}
198
199#[derive(Default, Debug, Clone, Copy)]
202#[non_exhaustive]
203pub enum RequestTarget<'a> {
204 #[default]
206 Any,
207
208 InstanceName(&'a str),
210
211 BucketId(u64, bool),
217
218 TierAndBucketId(&'a str, u64, bool),
224
225 ReplicasetName(&'a str, bool),
230}
231
232fn send_rpc_request(
238 arguments: &FfiSafeRpcRequestArguments,
239 timeout: Option<Duration>,
240) -> Result<Response, BoxError> {
241 let mut output = MaybeUninit::uninit();
242
243 let _guard = RegionGuard::new();
244
245 let rc = unsafe {
247 ffi::pico_ffi_rpc_request(
248 arguments,
249 timeout.unwrap_or(tarantool::clock::INFINITY).as_secs_f64(),
250 output.as_mut_ptr(),
251 )
252 };
253 if rc == -1 {
254 return Err(BoxError::last());
255 }
256
257 let output = unsafe { output.assume_init().as_bytes() };
258 Ok(Response::new_owned(output))
259}
260
261#[derive(Debug, Clone)]
265#[repr(C)]
266pub struct FfiSafeRpcRequestArguments<'a> {
267 pub plugin: FfiSafeStr,
268 pub service: FfiSafeStr,
269 pub version: FfiSafeStr,
270 pub target: FfiSafeRpcTargetSpecifier,
271 pub path: FfiSafeStr,
272 pub input: FfiSafeBytes,
273 _marker: PhantomData<&'a ()>,
274}
275
276#[derive(Debug, Clone, Copy)]
280#[repr(C)]
281pub enum FfiSafeRpcTargetSpecifier {
282 Any,
283 InstanceName(FfiSafeStr),
284 Replicaset {
285 replicaset_name: FfiSafeStr,
286 to_master: bool,
287 },
288 BucketId {
289 bucket_id: u64,
290 to_master: bool,
291 },
292 TierAndBucketId {
293 tier: FfiSafeStr,
294 bucket_id: u64,
295 to_master: bool,
296 },
297}