picodata_plugin/transport/rpc/
client.rs1use super::Request;
2use super::Response;
3use crate::internal::ffi;
4use crate::plugin::interface::PicoContext;
5use crate::util::FfiSafeBytes;
6use crate::util::FfiSafeStr;
7use crate::util::RegionGuard;
8use std::marker::PhantomData;
9use std::mem::MaybeUninit;
10use std::time::Duration;
11use tarantool::error::BoxError;
12use tarantool::error::TarantoolErrorCode;
13use tarantool::fiber;
14use tarantool::time::Instant;
15
16#[derive(Debug)]
21pub struct RequestBuilder<'a> {
22 target: FfiSafeRpcTargetSpecifier,
23 plugin_service: Option<(&'a str, &'a str)>,
24 version: Option<&'a str>,
25 path: Option<&'a str>,
26 input: Option<Request<'a>>,
27 timeout: Option<Duration>,
28}
29
30impl<'a> RequestBuilder<'a> {
31 #[inline]
32 pub fn new(target: RequestTarget<'a>) -> Self {
33 let target = match target {
34 RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
35 RequestTarget::InstanceName(instance_name) => {
36 FfiSafeRpcTargetSpecifier::InstanceName(instance_name.into())
37 }
38 RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
39 bucket_id,
40 to_master,
41 },
42 RequestTarget::TierAndBucketId(tier, bucket_id, to_master) => {
43 FfiSafeRpcTargetSpecifier::TierAndBucketId {
44 tier: tier.into(),
45 bucket_id,
46 to_master,
47 }
48 }
49 RequestTarget::ReplicasetName(replicaset_name, to_master) => {
50 FfiSafeRpcTargetSpecifier::Replicaset {
51 replicaset_name: replicaset_name.into(),
52 to_master,
53 }
54 }
55 };
56 Self {
57 target,
58 plugin_service: None,
59 version: None,
60 path: None,
61 input: None,
62 timeout: None,
63 }
64 }
65
66 #[inline]
69 #[track_caller]
70 pub fn pico_context(self, context: &'a PicoContext) -> Self {
71 self.plugin_service(context.plugin_name(), context.service_name())
72 .plugin_version(context.plugin_version())
73 }
74
75 #[inline]
77 pub fn plugin_service(mut self, plugin: &'a str, service: &'a str) -> Self {
78 let new = (plugin, service);
79 if let Some(old) = self.plugin_service.take() {
80 #[rustfmt::skip]
81 tarantool::say_warn!("RequestBuilder plugin.service is silently changed from {old:?} to {new:?}");
82 }
83 self.plugin_service = Some(new);
84 self
85 }
86
87 #[inline]
89 pub fn plugin_version(mut self, version: &'a str) -> Self {
90 if let Some(old) = self.version.take() {
91 #[rustfmt::skip]
92 tarantool::say_warn!("RequestBuilder service version is silently changed from {old:?} to {version:?}");
93 }
94 self.version = Some(version);
95 self
96 }
97
98 #[inline]
100 pub fn path(mut self, path: &'a str) -> Self {
101 if let Some(old) = self.path.take() {
102 #[rustfmt::skip]
103 tarantool::say_warn!("RequestBuilder path is silently changed from {old:?} to {path:?}");
104 }
105 self.path = Some(path);
106 self
107 }
108
109 #[inline]
111 pub fn input(mut self, input: Request<'a>) -> Self {
112 if let Some(old) = self.input.take() {
113 #[rustfmt::skip]
114 tarantool::say_warn!("RequestBuilder input is silently changed from {old:?} to {input:?}");
115 }
116 self.input = Some(input.into());
117 self
118 }
119
120 #[inline]
122 pub fn timeout(mut self, timeout: Duration) -> Self {
123 if let Some(old) = self.timeout.take() {
124 #[rustfmt::skip]
125 tarantool::say_warn!("RequestBuilder timeout is silently changed from {old:?} to {timeout:?}");
126 }
127 self.timeout = Some(timeout);
128 self
129 }
130
131 #[inline(always)]
133 pub fn deadline(self, deadline: Instant) -> Self {
134 self.timeout(deadline.duration_since(fiber::clock()))
135 }
136
137 #[track_caller]
138 fn to_ffi(&self) -> Result<FfiSafeRpcRequestArguments<'a>, BoxError> {
139 let Some((plugin, service)) = self.plugin_service else {
140 #[rustfmt::skip]
141 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "plugin.service must be specified for RPC request"));
142 };
143
144 let Some(version) = self.version else {
145 #[rustfmt::skip]
146 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "service version must be specified for RPC request"));
147 };
148
149 let Some(path) = self.path else {
150 #[rustfmt::skip]
151 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC request"));
152 };
153
154 let Some(input) = &self.input else {
155 #[rustfmt::skip]
156 return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
157 };
158
159 let target = self.target;
160
161 Ok(FfiSafeRpcRequestArguments {
162 plugin: plugin.into(),
163 service: service.into(),
164 version: version.into(),
165 target,
166 path: path.into(),
167 input: input.as_bytes().into(),
168 _marker: PhantomData,
169 })
170 }
171
172 #[inline]
177 #[track_caller]
178 pub fn send(&self) -> Result<Response, BoxError> {
179 let arguments = self.to_ffi()?;
180 let res = send_rpc_request(&arguments, self.timeout)?;
181 Ok(res)
182 }
183}
184
185#[derive(Default, Debug, Clone, Copy)]
188#[non_exhaustive]
189pub enum RequestTarget<'a> {
190 #[default]
192 Any,
193
194 InstanceName(&'a str),
196
197 BucketId(u64, bool),
203
204 TierAndBucketId(&'a str, u64, bool),
210
211 ReplicasetName(&'a str, bool),
216}
217
218fn send_rpc_request(
224 arguments: &FfiSafeRpcRequestArguments,
225 timeout: Option<Duration>,
226) -> Result<Response, BoxError> {
227 let mut output = MaybeUninit::uninit();
228
229 let _guard = RegionGuard::new();
230
231 let rc = unsafe {
233 ffi::pico_ffi_rpc_request(
234 arguments,
235 timeout.unwrap_or(tarantool::clock::INFINITY).as_secs_f64(),
236 output.as_mut_ptr(),
237 )
238 };
239 if rc == -1 {
240 return Err(BoxError::last());
241 }
242
243 let output = unsafe { output.assume_init().as_bytes() };
244 Ok(Response::new_owned(output))
245}
246
247#[derive(Debug, Clone)]
251#[repr(C)]
252pub struct FfiSafeRpcRequestArguments<'a> {
253 pub plugin: FfiSafeStr,
254 pub service: FfiSafeStr,
255 pub version: FfiSafeStr,
256 pub target: FfiSafeRpcTargetSpecifier,
257 pub path: FfiSafeStr,
258 pub input: FfiSafeBytes,
259 _marker: PhantomData<&'a ()>,
260}
261
262#[derive(Debug, Clone, Copy)]
266#[repr(C)]
267pub enum FfiSafeRpcTargetSpecifier {
268 Any,
269 InstanceName(FfiSafeStr),
270 Replicaset {
271 replicaset_name: FfiSafeStr,
272 to_master: bool,
273 },
274 BucketId {
275 bucket_id: u64,
276 to_master: bool,
277 },
278 TierAndBucketId {
279 tier: FfiSafeStr,
280 bucket_id: u64,
281 to_master: bool,
282 },
283}