picodata_plugin/transport/rpc/
client.rs

1use super::Request;
2use super::Response;
3use crate::internal::ffi;
4use crate::plugin::interface::PicoContext;
5#[allow(unused_imports)]
6use crate::transport::rpc::server::RouteBuilder;
7use crate::util::FfiSafeBytes;
8use crate::util::FfiSafeStr;
9use crate::util::RegionGuard;
10use std::marker::PhantomData;
11use std::mem::MaybeUninit;
12use std::time::Duration;
13use tarantool::error::BoxError;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::fiber;
16use tarantool::time::Instant;
17
18////////////////////////////////////////////////////////////////////////////////
19// RequestBuilder
20////////////////////////////////////////////////////////////////////////////////
21
22/// A helper struct for sending RPC requests.
23///
24/// See also [`RouteBuilder`] for the server side of the RPC communication.
25#[derive(Debug)]
26pub struct RequestBuilder<'a> {
27    target: FfiSafeRpcTargetSpecifier,
28    plugin_service: Option<(&'a str, &'a str)>,
29    version: Option<&'a str>,
30    path: Option<&'a str>,
31    input: Option<Request<'a>>,
32    timeout: Option<Duration>,
33}
34
35impl<'a> RequestBuilder<'a> {
36    #[inline]
37    pub fn new(target: RequestTarget<'a>) -> Self {
38        let target = match target {
39            RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
40            RequestTarget::InstanceName(instance_name) => {
41                FfiSafeRpcTargetSpecifier::InstanceName(instance_name.into())
42            }
43            RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
44                bucket_id,
45                to_master,
46            },
47            RequestTarget::TierAndBucketId(tier, bucket_id, to_master) => {
48                FfiSafeRpcTargetSpecifier::TierAndBucketId {
49                    tier: tier.into(),
50                    bucket_id,
51                    to_master,
52                }
53            }
54            RequestTarget::ReplicasetName(replicaset_name, to_master) => {
55                FfiSafeRpcTargetSpecifier::Replicaset {
56                    replicaset_name: replicaset_name.into(),
57                    to_master,
58                }
59            }
60        };
61        Self {
62            target,
63            plugin_service: None,
64            version: None,
65            path: None,
66            input: None,
67            timeout: None,
68        }
69    }
70
71    /// Use service info from `context`.
72    /// The request will be sent to an endpoint registered by the specified service.
73    #[inline]
74    #[track_caller]
75    pub fn pico_context(self, context: &'a PicoContext) -> Self {
76        self.plugin_service(context.plugin_name(), context.service_name())
77            .plugin_version(context.plugin_version())
78    }
79
80    /// The request will be sent to an endpoint registered by the specified service.
81    #[inline]
82    pub fn plugin_service(mut self, plugin: &'a str, service: &'a str) -> Self {
83        let new = (plugin, service);
84        if let Some(old) = self.plugin_service.take() {
85            #[rustfmt::skip]
86            tarantool::say_warn!("RequestBuilder plugin.service is silently changed from {old:?} to {new:?}");
87        }
88        self.plugin_service = Some(new);
89        self
90    }
91
92    /// The request will be sent to an endpoint registered by the specified service.
93    #[inline]
94    pub fn plugin_version(mut self, version: &'a str) -> Self {
95        if let Some(old) = self.version.take() {
96            #[rustfmt::skip]
97            tarantool::say_warn!("RequestBuilder service version is silently changed from {old:?} to {version:?}");
98        }
99        self.version = Some(version);
100        self
101    }
102
103    /// The request will be sent to the endpoint at the given `path`.
104    #[inline]
105    pub fn path(mut self, path: &'a str) -> Self {
106        if let Some(old) = self.path.take() {
107            #[rustfmt::skip]
108            tarantool::say_warn!("RequestBuilder path is silently changed from {old:?} to {path:?}");
109        }
110        self.path = Some(path);
111        self
112    }
113
114    /// Specify request arguments.
115    #[inline]
116    pub fn input(mut self, input: Request<'a>) -> Self {
117        if let Some(old) = self.input.take() {
118            #[rustfmt::skip]
119            tarantool::say_warn!("RequestBuilder input is silently changed from {old:?} to {input:?}");
120        }
121        self.input = Some(input);
122        self
123    }
124
125    /// Specify request timeout.
126    #[inline]
127    pub fn timeout(mut self, timeout: Duration) -> Self {
128        if let Some(old) = self.timeout.take() {
129            #[rustfmt::skip]
130            tarantool::say_warn!("RequestBuilder timeout is silently changed from {old:?} to {timeout:?}");
131        }
132        self.timeout = Some(timeout);
133        self
134    }
135
136    /// Specify request deadline.
137    #[inline(always)]
138    pub fn deadline(self, deadline: Instant) -> Self {
139        self.timeout(deadline.duration_since(fiber::clock()))
140    }
141
142    #[track_caller]
143    fn to_ffi(&self) -> Result<FfiSafeRpcRequestArguments<'a>, BoxError> {
144        let Some((plugin, service)) = self.plugin_service else {
145            #[rustfmt::skip]
146            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "plugin.service must be specified for RPC request"));
147        };
148
149        let Some(version) = self.version else {
150            #[rustfmt::skip]
151            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "service version must be specified for RPC request"));
152        };
153
154        let Some(path) = self.path else {
155            #[rustfmt::skip]
156            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC request"));
157        };
158
159        let Some(input) = &self.input else {
160            #[rustfmt::skip]
161            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
162        };
163
164        let target = self.target;
165
166        Ok(FfiSafeRpcRequestArguments {
167            plugin: plugin.into(),
168            service: service.into(),
169            version: version.into(),
170            target,
171            path: path.into(),
172            input: input.as_bytes().into(),
173            _marker: PhantomData,
174        })
175    }
176
177    /// Send the request with the current parameters.
178    ///
179    /// Note that if the request target specification (see [`RequestTarget`])
180    /// matches the current instance then the request will be performed fully
181    /// locally, i.e. the request handler will be executed in the current
182    /// process without switching the fiber (although the fiber's name may
183    /// change).
184    ///
185    /// If the request target doesn't match the current instance, an iproto
186    /// request will be sent out and the current fiber will be blocked
187    /// until the response is received or the timeout is reached.
188    ///
189    /// Returns an error if some of the parameters are invalid.
190    #[inline]
191    #[track_caller]
192    pub fn send(&self) -> Result<Response, BoxError> {
193        let arguments = self.to_ffi()?;
194        let res = send_rpc_request(&arguments, self.timeout)?;
195        Ok(res)
196    }
197}
198
199/// An enumeration of possible target specifiers for RPC requests.
200/// Determines which instance in the picodata cluster the request should be sent to.
201#[derive(Default, Debug, Clone, Copy)]
202#[non_exhaustive]
203pub enum RequestTarget<'a> {
204    /// Any instance running the corresponding service.
205    #[default]
206    Any,
207
208    /// The specific instance with a given instance name.
209    InstanceName(&'a str),
210
211    /// An instance in the replicaset in tier of target instance which currently stores the bucket with
212    /// the specified id.
213    ///
214    /// If the boolean parameter is `true`, then send the request to the replicaset master,
215    /// otherwise any replica.
216    BucketId(u64, bool),
217
218    /// An instance in the replicaset in the tier which currently stores the bucket with
219    /// the specified id.
220    ///
221    /// If the boolean parameter is `true`, then send the request to the replicaset master,
222    /// otherwise any replica.
223    TierAndBucketId(&'a str, u64, bool),
224
225    /// An instance in the replicaset determined by the explicit replicaset name.
226    ///
227    /// If the boolean parameter is `true`, then send the request to the replicaset master,
228    /// otherwise any replica.
229    ReplicasetName(&'a str, bool),
230}
231
232////////////////////////////////////////////////////////////////////////////////
233// ffi wrappers
234////////////////////////////////////////////////////////////////////////////////
235
236/// **For internal use**.
237fn send_rpc_request(
238    arguments: &FfiSafeRpcRequestArguments,
239    timeout: Option<Duration>,
240) -> Result<Response, BoxError> {
241    let mut output = MaybeUninit::uninit();
242
243    let _guard = RegionGuard::new();
244
245    // SAFETY: always safe to call picodata FFI
246    let rc = unsafe {
247        ffi::pico_ffi_rpc_request(
248            arguments,
249            timeout.unwrap_or(tarantool::clock::INFINITY).as_secs_f64(),
250            output.as_mut_ptr(),
251        )
252    };
253    if rc == -1 {
254        return Err(BoxError::last());
255    }
256
257    let output = unsafe { output.assume_init().as_bytes() };
258    Ok(Response::new_owned(output))
259}
260
261/// **For internal use**.
262///
263/// Use [`RequestBuilder`] instead.
264#[derive(Debug, Clone)]
265#[repr(C)]
266pub struct FfiSafeRpcRequestArguments<'a> {
267    pub plugin: FfiSafeStr,
268    pub service: FfiSafeStr,
269    pub version: FfiSafeStr,
270    pub target: FfiSafeRpcTargetSpecifier,
271    pub path: FfiSafeStr,
272    pub input: FfiSafeBytes,
273    _marker: PhantomData<&'a ()>,
274}
275
276/// **For internal use**.
277///
278/// Use [`RequestTarget`] instead.
279#[derive(Debug, Clone, Copy)]
280#[repr(C)]
281pub enum FfiSafeRpcTargetSpecifier {
282    Any,
283    InstanceName(FfiSafeStr),
284    Replicaset {
285        replicaset_name: FfiSafeStr,
286        to_master: bool,
287    },
288    BucketId {
289        bucket_id: u64,
290        to_master: bool,
291    },
292    TierAndBucketId {
293        tier: FfiSafeStr,
294        bucket_id: u64,
295        to_master: bool,
296    },
297}