use super::Request;
use super::Response;
use crate::internal::ffi;
use crate::plugin::interface::PicoContext;
#[allow(unused_imports)]
use crate::transport::rpc::server::RouteBuilder;
use crate::util::FfiSafeBytes;
use crate::util::FfiSafeStr;
use crate::util::RegionGuard;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::time::Duration;
use tarantool::error::BoxError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::time::Instant;
#[derive(Debug)]
pub struct RequestBuilder<'a> {
target: FfiSafeRpcTargetSpecifier,
plugin_service: Option<(&'a str, &'a str)>,
version: Option<&'a str>,
path: Option<&'a str>,
input: Option<Request<'a>>,
timeout: Option<Duration>,
}
impl<'a> RequestBuilder<'a> {
#[inline]
pub fn new(target: RequestTarget<'a>) -> Self {
let target = match target {
RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
RequestTarget::InstanceName(instance_name) => {
FfiSafeRpcTargetSpecifier::InstanceName(instance_name.into())
}
RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
bucket_id,
to_master,
},
RequestTarget::TierAndBucketId(tier, bucket_id, to_master) => {
FfiSafeRpcTargetSpecifier::TierAndBucketId {
tier: tier.into(),
bucket_id,
to_master,
}
}
RequestTarget::ReplicasetName(replicaset_name, to_master) => {
FfiSafeRpcTargetSpecifier::Replicaset {
replicaset_name: replicaset_name.into(),
to_master,
}
}
};
Self {
target,
plugin_service: None,
version: None,
path: None,
input: None,
timeout: None,
}
}
#[inline]
#[track_caller]
pub fn pico_context(self, context: &'a PicoContext) -> Self {
self.plugin_service(context.plugin_name(), context.service_name())
.plugin_version(context.plugin_version())
}
#[inline]
pub fn plugin_service(mut self, plugin: &'a str, service: &'a str) -> Self {
let new = (plugin, service);
if let Some(old) = self.plugin_service.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder plugin.service is silently changed from {old:?} to {new:?}");
}
self.plugin_service = Some(new);
self
}
#[inline]
pub fn plugin_version(mut self, version: &'a str) -> Self {
if let Some(old) = self.version.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder service version is silently changed from {old:?} to {version:?}");
}
self.version = Some(version);
self
}
#[inline]
pub fn path(mut self, path: &'a str) -> Self {
if let Some(old) = self.path.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder path is silently changed from {old:?} to {path:?}");
}
self.path = Some(path);
self
}
#[inline]
pub fn input(mut self, input: Request<'a>) -> Self {
if let Some(old) = self.input.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder input is silently changed from {old:?} to {input:?}");
}
self.input = Some(input);
self
}
#[inline]
pub fn timeout(mut self, timeout: Duration) -> Self {
if let Some(old) = self.timeout.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder timeout is silently changed from {old:?} to {timeout:?}");
}
self.timeout = Some(timeout);
self
}
#[inline(always)]
pub fn deadline(self, deadline: Instant) -> Self {
self.timeout(deadline.duration_since(fiber::clock()))
}
#[track_caller]
fn to_ffi(&self) -> Result<FfiSafeRpcRequestArguments<'a>, BoxError> {
let Some((plugin, service)) = self.plugin_service else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "plugin.service must be specified for RPC request"));
};
let Some(version) = self.version else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "service version must be specified for RPC request"));
};
let Some(path) = self.path else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC request"));
};
let Some(input) = &self.input else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
};
let target = self.target;
Ok(FfiSafeRpcRequestArguments {
plugin: plugin.into(),
service: service.into(),
version: version.into(),
target,
path: path.into(),
input: input.as_bytes().into(),
_marker: PhantomData,
})
}
#[inline]
#[track_caller]
pub fn send(&self) -> Result<Response, BoxError> {
let arguments = self.to_ffi()?;
let res = send_rpc_request(&arguments, self.timeout)?;
Ok(res)
}
}
#[derive(Default, Debug, Clone, Copy)]
#[non_exhaustive]
pub enum RequestTarget<'a> {
#[default]
Any,
InstanceName(&'a str),
BucketId(u64, bool),
TierAndBucketId(&'a str, u64, bool),
ReplicasetName(&'a str, bool),
}
fn send_rpc_request(
arguments: &FfiSafeRpcRequestArguments,
timeout: Option<Duration>,
) -> Result<Response, BoxError> {
let mut output = MaybeUninit::uninit();
let _guard = RegionGuard::new();
let rc = unsafe {
ffi::pico_ffi_rpc_request(
arguments,
timeout.unwrap_or(tarantool::clock::INFINITY).as_secs_f64(),
output.as_mut_ptr(),
)
};
if rc == -1 {
return Err(BoxError::last());
}
let output = unsafe { output.assume_init().as_bytes() };
Ok(Response::new_owned(output))
}
#[derive(Debug, Clone)]
#[repr(C)]
pub struct FfiSafeRpcRequestArguments<'a> {
pub plugin: FfiSafeStr,
pub service: FfiSafeStr,
pub version: FfiSafeStr,
pub target: FfiSafeRpcTargetSpecifier,
pub path: FfiSafeStr,
pub input: FfiSafeBytes,
_marker: PhantomData<&'a ()>,
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub enum FfiSafeRpcTargetSpecifier {
Any,
InstanceName(FfiSafeStr),
Replicaset {
replicaset_name: FfiSafeStr,
to_master: bool,
},
BucketId {
bucket_id: u64,
to_master: bool,
},
TierAndBucketId {
tier: FfiSafeStr,
bucket_id: u64,
to_master: bool,
},
}