picodata_plugin/transport/rpc/
client.rs

1use super::Request;
2use super::Response;
3use crate::internal::ffi;
4use crate::plugin::interface::PicoContext;
5use crate::util::FfiSafeBytes;
6use crate::util::FfiSafeStr;
7use crate::util::RegionGuard;
8use std::marker::PhantomData;
9use std::mem::MaybeUninit;
10use std::time::Duration;
11use tarantool::error::BoxError;
12use tarantool::error::TarantoolErrorCode;
13use tarantool::fiber;
14use tarantool::time::Instant;
15
16////////////////////////////////////////////////////////////////////////////////
17// RequestBuilder
18////////////////////////////////////////////////////////////////////////////////
19
20#[derive(Debug)]
21pub struct RequestBuilder<'a> {
22    target: FfiSafeRpcTargetSpecifier,
23    plugin_service: Option<(&'a str, &'a str)>,
24    version: Option<&'a str>,
25    path: Option<&'a str>,
26    input: Option<Request<'a>>,
27    timeout: Option<Duration>,
28}
29
30impl<'a> RequestBuilder<'a> {
31    #[inline]
32    pub fn new(target: RequestTarget<'a>) -> Self {
33        let target = match target {
34            RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
35            RequestTarget::InstanceName(instance_name) => {
36                FfiSafeRpcTargetSpecifier::InstanceName(instance_name.into())
37            }
38            RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
39                bucket_id,
40                to_master,
41            },
42            RequestTarget::TierAndBucketId(tier, bucket_id, to_master) => {
43                FfiSafeRpcTargetSpecifier::TierAndBucketId {
44                    tier: tier.into(),
45                    bucket_id,
46                    to_master,
47                }
48            }
49            RequestTarget::ReplicasetName(replicaset_name, to_master) => {
50                FfiSafeRpcTargetSpecifier::Replicaset {
51                    replicaset_name: replicaset_name.into(),
52                    to_master,
53                }
54            }
55        };
56        Self {
57            target,
58            plugin_service: None,
59            version: None,
60            path: None,
61            input: None,
62            timeout: None,
63        }
64    }
65
66    /// Use service info from `context`.
67    /// The request will be sent to an endpoint registered by the specified service.
68    #[inline]
69    #[track_caller]
70    pub fn pico_context(self, context: &'a PicoContext) -> Self {
71        self.plugin_service(context.plugin_name(), context.service_name())
72            .plugin_version(context.plugin_version())
73    }
74
75    /// The request will be sent to an endpoint registered by the specified service.
76    #[inline]
77    pub fn plugin_service(mut self, plugin: &'a str, service: &'a str) -> Self {
78        let new = (plugin, service);
79        if let Some(old) = self.plugin_service.take() {
80            #[rustfmt::skip]
81            tarantool::say_warn!("RequestBuilder plugin.service is silently changed from {old:?} to {new:?}");
82        }
83        self.plugin_service = Some(new);
84        self
85    }
86
87    /// The request will be sent to an endpoint registered by the specified service.
88    #[inline]
89    pub fn plugin_version(mut self, version: &'a str) -> Self {
90        if let Some(old) = self.version.take() {
91            #[rustfmt::skip]
92            tarantool::say_warn!("RequestBuilder service version is silently changed from {old:?} to {version:?}");
93        }
94        self.version = Some(version);
95        self
96    }
97
98    /// The request will be sent to the endpoint at the given `path`.
99    #[inline]
100    pub fn path(mut self, path: &'a str) -> Self {
101        if let Some(old) = self.path.take() {
102            #[rustfmt::skip]
103            tarantool::say_warn!("RequestBuilder path is silently changed from {old:?} to {path:?}");
104        }
105        self.path = Some(path);
106        self
107    }
108
109    /// Specify request arguments.
110    #[inline]
111    pub fn input(mut self, input: Request<'a>) -> Self {
112        if let Some(old) = self.input.take() {
113            #[rustfmt::skip]
114            tarantool::say_warn!("RequestBuilder input is silently changed from {old:?} to {input:?}");
115        }
116        self.input = Some(input.into());
117        self
118    }
119
120    /// Specify request timeout.
121    #[inline]
122    pub fn timeout(mut self, timeout: Duration) -> Self {
123        if let Some(old) = self.timeout.take() {
124            #[rustfmt::skip]
125            tarantool::say_warn!("RequestBuilder timeout is silently changed from {old:?} to {timeout:?}");
126        }
127        self.timeout = Some(timeout);
128        self
129    }
130
131    /// Specify request deadline.
132    #[inline(always)]
133    pub fn deadline(self, deadline: Instant) -> Self {
134        self.timeout(deadline.duration_since(fiber::clock()))
135    }
136
137    #[track_caller]
138    fn to_ffi(&self) -> Result<FfiSafeRpcRequestArguments<'a>, BoxError> {
139        let Some((plugin, service)) = self.plugin_service else {
140            #[rustfmt::skip]
141            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "plugin.service must be specified for RPC request"));
142        };
143
144        let Some(version) = self.version else {
145            #[rustfmt::skip]
146            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "service version must be specified for RPC request"));
147        };
148
149        let Some(path) = self.path else {
150            #[rustfmt::skip]
151            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC request"));
152        };
153
154        let Some(input) = &self.input else {
155            #[rustfmt::skip]
156            return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
157        };
158
159        let target = self.target;
160
161        Ok(FfiSafeRpcRequestArguments {
162            plugin: plugin.into(),
163            service: service.into(),
164            version: version.into(),
165            target,
166            path: path.into(),
167            input: input.as_bytes().into(),
168            _marker: PhantomData,
169        })
170    }
171
172    /// Send the request with the current paramters. This will block the current
173    /// fiber until the response is received or the timeout is reached.
174    ///
175    /// Returns an error if some of the parameters are invalid.
176    #[inline]
177    #[track_caller]
178    pub fn send(&self) -> Result<Response, BoxError> {
179        let arguments = self.to_ffi()?;
180        let res = send_rpc_request(&arguments, self.timeout)?;
181        Ok(res)
182    }
183}
184
185/// An enumeration of possible target specifiers for RPC requests.
186/// Determines which instance in the picodata cluster the request should be sent to.
187#[derive(Default, Debug, Clone, Copy)]
188#[non_exhaustive]
189pub enum RequestTarget<'a> {
190    /// Any instance running the corresponding service.
191    #[default]
192    Any,
193
194    /// The specific instance with a given instance name.
195    InstanceName(&'a str),
196
197    /// An instance in the replicaset in tier of target instance which currently stores the bucket with
198    /// the specified id.
199    ///
200    /// If the boolean parameter is `true`, then send the request to the replicaset master,
201    /// otherwise any replica.
202    BucketId(u64, bool),
203
204    /// An instance in the replicaset in the tier which currently stores the bucket with
205    /// the specified id.
206    ///
207    /// If the boolean parameter is `true`, then send the request to the replicaset master,
208    /// otherwise any replica.
209    TierAndBucketId(&'a str, u64, bool),
210
211    /// An instance in the replicaset determined by the explicit replicaset name.
212    ///
213    /// If the boolean parameter is `true`, then send the request to the replicaset master,
214    /// otherwise any replica.
215    ReplicasetName(&'a str, bool),
216}
217
218////////////////////////////////////////////////////////////////////////////////
219// ffi wrappers
220////////////////////////////////////////////////////////////////////////////////
221
222/// **For internal use**.
223fn send_rpc_request(
224    arguments: &FfiSafeRpcRequestArguments,
225    timeout: Option<Duration>,
226) -> Result<Response, BoxError> {
227    let mut output = MaybeUninit::uninit();
228
229    let _guard = RegionGuard::new();
230
231    // SAFETY: always safe to call picodata FFI
232    let rc = unsafe {
233        ffi::pico_ffi_rpc_request(
234            arguments,
235            timeout.unwrap_or(tarantool::clock::INFINITY).as_secs_f64(),
236            output.as_mut_ptr(),
237        )
238    };
239    if rc == -1 {
240        return Err(BoxError::last());
241    }
242
243    let output = unsafe { output.assume_init().as_bytes() };
244    Ok(Response::new_owned(output))
245}
246
247/// **For internal use**.
248///
249/// Use [`RequestBuilder`] instead.
250#[derive(Debug, Clone)]
251#[repr(C)]
252pub struct FfiSafeRpcRequestArguments<'a> {
253    pub plugin: FfiSafeStr,
254    pub service: FfiSafeStr,
255    pub version: FfiSafeStr,
256    pub target: FfiSafeRpcTargetSpecifier,
257    pub path: FfiSafeStr,
258    pub input: FfiSafeBytes,
259    _marker: PhantomData<&'a ()>,
260}
261
262/// **For internal use**.
263///
264/// Use [`RequestTarget`] instead.
265#[derive(Debug, Clone, Copy)]
266#[repr(C)]
267pub enum FfiSafeRpcTargetSpecifier {
268    Any,
269    InstanceName(FfiSafeStr),
270    Replicaset {
271        replicaset_name: FfiSafeStr,
272        to_master: bool,
273    },
274    BucketId {
275        bucket_id: u64,
276        to_master: bool,
277    },
278    TierAndBucketId {
279        tier: FfiSafeStr,
280        bucket_id: u64,
281        to_master: bool,
282    },
283}