use super::Request;
use super::Response;
use crate::internal::ffi;
use crate::plugin::interface::PicoContext;
use crate::plugin::interface::ServiceId;
use crate::transport::context::Context;
use crate::transport::context::FfiSafeContext;
#[allow(unused_imports)]
use crate::transport::rpc::client::RequestBuilder;
use crate::util::FfiSafeBytes;
use crate::util::FfiSafeStr;
use std::mem::MaybeUninit;
use std::ptr::NonNull;
use tarantool::error::BoxError;
use tarantool::error::TarantoolErrorCode;
#[derive(Debug, Clone)]
pub struct RouteBuilder<'a> {
plugin: &'a str,
service: &'a str,
version: &'a str,
path: Option<&'a str>,
}
impl<'a> RouteBuilder<'a> {
#[inline(always)]
pub fn from_pico_context(context: &'a PicoContext) -> Self {
Self {
plugin: context.plugin_name(),
service: context.service_name(),
version: context.plugin_version(),
path: None,
}
}
#[inline(always)]
pub unsafe fn from_service_info(plugin: &'a str, service: &'a str, version: &'a str) -> Self {
Self {
plugin,
service,
version,
path: None,
}
}
#[inline]
pub fn path(mut self, path: &'a str) -> Self {
if let Some(old) = self.path.take() {
#[rustfmt::skip]
tarantool::say_warn!("RouteBuilder path is silently changed from {old:?} to {path:?}");
}
self.path = Some(path);
self
}
#[track_caller]
pub fn register<F>(self, f: F) -> Result<(), BoxError>
where
F: Fn(Request<'_>, &mut Context) -> Result<Response, BoxError> + 'static,
{
let Some(path) = self.path else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC endpoint"));
};
let identifier =
PackedServiceIdentifier::pack(path, self.plugin, self.service, self.version)?;
let handler = FfiRpcHandler::new(identifier, f);
if let Err(e) = register_rpc_handler(handler) {
#[rustfmt::skip]
return Err(BoxError::new(e.error_code(), e.message()));
}
Ok(())
}
}
impl<'a> From<&'a PicoContext> for RouteBuilder<'a> {
#[inline(always)]
fn from(context: &'a PicoContext) -> Self {
Self::from_pico_context(context)
}
}
#[inline]
fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> {
let rc = unsafe { ffi::pico_ffi_register_rpc_handler(handler) };
if rc == -1 {
return Err(BoxError::last());
}
Ok(())
}
type RpcHandlerCallback = extern "C" fn(
handler: *const FfiRpcHandler,
input: FfiSafeBytes,
context: *const FfiSafeContext,
output: *mut FfiSafeBytes,
) -> std::ffi::c_int;
#[repr(C)]
pub struct FfiRpcHandler {
callback: RpcHandlerCallback,
drop: extern "C" fn(*mut FfiRpcHandler),
closure_pointer: *mut (),
pub identifier: PackedServiceIdentifier,
}
impl Drop for FfiRpcHandler {
#[inline(always)]
fn drop(&mut self) {
(self.drop)(self)
}
}
impl FfiRpcHandler {
fn new<F>(identifier: PackedServiceIdentifier, f: F) -> Self
where
F: Fn(Request<'_>, &mut Context) -> Result<Response, BoxError> + 'static,
{
let closure = Box::new(f);
let closure_pointer: *mut F = Box::into_raw(closure);
FfiRpcHandler {
callback: Self::trampoline::<F>,
drop: Self::drop_handler::<F>,
closure_pointer: closure_pointer.cast(),
identifier,
}
}
extern "C" fn trampoline<F>(
handler: *const FfiRpcHandler,
input: FfiSafeBytes,
context: *const FfiSafeContext,
output: *mut FfiSafeBytes,
) -> std::ffi::c_int
where
F: Fn(Request<'_>, &mut Context) -> Result<Response, BoxError> + 'static,
{
let closure_pointer: *const F = unsafe { (*handler).closure_pointer.cast::<F>() };
let closure = unsafe { &*closure_pointer };
let input = unsafe { input.as_bytes() };
let context = unsafe { &*context };
let mut context = Context::new(context);
let request = Request::from_bytes(input);
let result = (|| {
let response = closure(request, &mut context)?;
response.to_region_slice()
})();
match result {
Ok(region_slice) => {
unsafe { std::ptr::write(output, region_slice.into()) }
0
}
Err(e) => {
e.set_last();
-1
}
}
}
extern "C" fn drop_handler<F>(handler: *mut FfiRpcHandler) {
unsafe {
let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
let closure = Box::from_raw(closure_pointer);
drop(closure);
if cfg!(debug_assertions) {
(*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
}
(*handler).identifier.drop();
}
}
#[inline(always)]
pub fn identity(&self) -> usize {
self.callback as *const RpcHandlerCallback as _
}
#[inline(always)]
#[allow(clippy::result_unit_err)]
pub fn call(&self, input: &[u8], context: &FfiSafeContext) -> Result<&'static [u8], ()> {
let mut output = MaybeUninit::uninit();
let rc = (self.callback)(self, input.into(), context, output.as_mut_ptr());
if rc == -1 {
return Err(());
}
let result = unsafe { output.assume_init().as_bytes() };
Ok(result)
}
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct PackedServiceIdentifier {
pub storage: FfiSafeStr,
pub plugin_len: u16,
pub service_len: u16,
pub path_len: u16,
pub version_len: u16,
}
impl PackedServiceIdentifier {
pub(crate) fn pack(
path: &str,
plugin: &str,
service: &str,
version: &str,
) -> Result<Self, BoxError> {
let Ok(plugin_len) = plugin.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("plugin name length must not exceed 65535, got {}", plugin.len())));
};
let Ok(service_len) = service.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("service name length must not exceed 65535, got {}", service.len())));
};
let Ok(path_len) = path.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("route path length must not exceed 65535, got {}", path.len())));
};
let Ok(version_len) = version.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("version string length must not exceed 65535, got {}", version.len())));
};
let total_string_len = plugin_len
+ 1
+ service_len
+ path_len
+ version_len;
let mut string_storage = Vec::with_capacity(total_string_len as _);
string_storage.extend_from_slice(plugin.as_bytes());
string_storage.push(b'.');
string_storage.extend_from_slice(service.as_bytes());
string_storage.extend_from_slice(path.as_bytes());
string_storage.extend_from_slice(version.as_bytes());
let start = string_storage.as_mut_ptr();
let capacity = string_storage.capacity();
let storage =
unsafe { FfiSafeStr::from_raw_parts(NonNull::new_unchecked(start), capacity) };
std::mem::forget(string_storage);
Ok(Self {
storage,
plugin_len,
service_len,
path_len,
version_len,
})
}
#[allow(unreachable_code)]
pub(crate) fn drop(&mut self) {
let (pointer, capacity) = self.storage.into_raw_parts();
if capacity == 0 {
#[cfg(debug_assertions)]
unreachable!("drop should only be called once");
return;
}
unsafe {
let string_storage = Vec::from_raw_parts(pointer, capacity, capacity);
drop(string_storage);
}
self.storage = FfiSafeStr::from("");
}
#[inline(always)]
fn storage_slice(&self, start: u16, len: u16) -> &str {
let storage = unsafe { self.storage.as_str() };
let end = (start + len) as usize;
&storage[start as usize..end]
}
#[inline(always)]
pub fn plugin(&self) -> &str {
self.storage_slice(0, self.plugin_len)
}
#[inline(always)]
pub fn service(&self) -> &str {
self.storage_slice(self.plugin_len + 1, self.service_len)
}
#[inline(always)]
pub fn service_id(&self) -> ServiceId {
ServiceId::new(self.plugin(), self.service(), self.version())
}
#[inline(always)]
pub fn path(&self) -> &str {
self.storage_slice(self.plugin_len + 1 + self.service_len, self.path_len)
}
#[inline(always)]
pub fn route_repr(&self) -> &str {
self.storage_slice(0, self.plugin_len + 1 + self.service_len + self.path_len)
}
#[inline(always)]
pub fn version(&self) -> &str {
self.storage_slice(
self.plugin_len + 1 + self.service_len + self.path_len,
self.version_len,
)
}
}
impl std::fmt::Display for PackedServiceIdentifier {
#[inline(always)]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}.{}:v{}{}",
self.plugin(),
self.service(),
self.version(),
self.path()
)
}
}