use crate::background;
use crate::background::ServiceWorkerManager;
use crate::error_code::ErrorCode;
use crate::util::FfiSafeStr;
pub use abi_stable;
use abi_stable::pmr::{RErr, RResult, RSlice};
use abi_stable::std_types::{RBox, RHashMap, ROk, ROption, RString, RVec};
use abi_stable::{sabi_trait, RTuple, StableAbi};
use serde::de::DeserializeOwned;
use smol_str::SmolStr;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use std::time::Duration;
use tarantool::error::TarantoolErrorCode;
use tarantool::error::{BoxError, IntoBoxError};
#[repr(C)]
#[derive(StableAbi, Debug)]
pub struct PicoContext {
is_master: bool,
pub plugin_name: FfiSafeStr,
pub service_name: FfiSafeStr,
pub plugin_version: FfiSafeStr,
}
impl PicoContext {
#[inline]
pub fn new(is_master: bool) -> PicoContext {
Self {
is_master,
plugin_name: "<unset>".into(),
service_name: "<unset>".into(),
plugin_version: "<unset>".into(),
}
}
#[inline]
pub unsafe fn clone(&self) -> Self {
Self {
is_master: self.is_master,
plugin_name: self.plugin_name,
service_name: self.service_name,
plugin_version: self.plugin_version,
}
}
#[inline]
pub fn is_master(&self) -> bool {
self.is_master
}
#[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
pub fn worker_manager(&self) -> ServiceWorkerManager {
ServiceWorkerManager::new(self.make_service_id())
}
#[inline(always)]
pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
crate::metrics::register_metrics_handler(self, callback)
}
#[inline(always)]
pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
where
F: FnOnce(background::CancellationToken) + 'static,
{
background::register_job(&self.make_service_id(), job)
}
#[inline(always)]
pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
where
F: FnOnce(background::CancellationToken) + 'static,
{
background::register_tagged_job(&self.make_service_id(), job, tag)
}
#[inline(always)]
pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
if res.n_timeouts != 0 {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
}
Ok(())
}
#[inline(always)]
pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
crate::background::set_jobs_shutdown_timeout(
self.plugin_name(),
self.service_name(),
self.plugin_version(),
timeout,
)
}
#[inline(always)]
pub fn make_service_id(&self) -> ServiceId {
ServiceId::new(
self.plugin_name(),
self.service_name(),
self.plugin_version(),
)
}
#[inline]
pub fn plugin_name(&self) -> &str {
unsafe { self.plugin_name.as_str() }
}
#[inline]
pub fn service_name(&self) -> &str {
unsafe { self.service_name.as_str() }
}
#[inline]
pub fn plugin_version(&self) -> &str {
unsafe { self.plugin_version.as_str() }
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct ServiceId {
pub plugin: SmolStr,
pub service: SmolStr,
pub version: SmolStr,
}
impl std::fmt::Display for ServiceId {
#[inline(always)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
}
}
impl ServiceId {
#[inline(always)]
pub fn new(
plugin: impl Into<SmolStr>,
service: impl Into<SmolStr>,
version: impl Into<SmolStr>,
) -> Self {
Self {
plugin: plugin.into(),
service: service.into(),
version: version.into(),
}
}
#[inline(always)]
pub fn plugin(&self) -> &str {
&self.plugin
}
#[inline(always)]
pub fn service(&self) -> &str {
&self.service
}
#[inline(always)]
pub fn version(&self) -> &str {
&self.version
}
}
pub type ErrorBox = Box<dyn Error>;
pub type CallbackResult<T> = Result<T, ErrorBox>;
pub trait Service {
type Config: DeserializeOwned;
fn on_config_change(
&mut self,
ctx: &PicoContext,
new_config: Self::Config,
old_config: Self::Config,
) -> CallbackResult<()> {
_ = ctx;
_ = new_config;
_ = old_config;
Ok(())
}
fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
_ = context;
_ = config;
Ok(())
}
fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
_ = context;
Ok(())
}
fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
_ = context;
Ok(())
}
fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
_ = context;
Ok(())
}
}
#[sabi_trait]
pub trait ServiceStable {
fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
fn on_config_change(
&mut self,
ctx: &PicoContext,
new_config: RSlice<u8>,
old_config: RSlice<u8>,
) -> RResult<(), ()>;
}
pub struct ServiceProxy<C: DeserializeOwned> {
service: Box<dyn Service<Config = C>>,
}
impl<C: DeserializeOwned> ServiceProxy<C> {
pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
Self { service }
}
}
fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
tt_error.set_last_error();
RErr(())
}
macro_rules! rtry {
($expr: expr) => {
match $expr {
Ok(k) => k,
Err(e) => return error_into_tt_error(e),
}
};
}
impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
match self.service.on_health_check(context) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
match self.service.on_start(context, configuration) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
match self.service.on_stop(context) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
match self.service.on_leader_change(context) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
fn on_config_change(
&mut self,
ctx: &PicoContext,
new_config: RSlice<u8>,
old_config: RSlice<u8>,
) -> RResult<(), ()> {
let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
let res = self.service.on_config_change(ctx, new_config, old_config);
match res {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
}
pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
#[sabi_trait]
trait Factory {
fn make(&self) -> ServiceBox;
}
type FactoryBox = Factory_TO<'static, RBox<()>>;
struct FactoryImpl<S: Service + 'static> {
factory_fn: fn() -> S,
}
impl<S: Service + 'static> Factory for FactoryImpl<S> {
fn make(&self) -> ServiceBox {
let boxed = Box::new((self.factory_fn)());
ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
}
}
type ServiceIdent = RTuple!(RString, RString);
#[sabi_trait]
pub trait Validator {
fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
}
pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
func: fn(config: CONFIG) -> CallbackResult<()>,
}
impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
let res = (self.func)(config);
match res {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
}
#[repr(C)]
#[derive(Default, StableAbi)]
pub struct ServiceRegistry {
services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
validators: RHashMap<ServiceIdent, ValidatorBox>,
}
impl ServiceRegistry {
pub fn add<S: Service + 'static>(
&mut self,
name: &str,
plugin_version: &str,
factory: fn() -> S,
) {
let factory_inner = FactoryImpl {
factory_fn: factory,
};
let factory_inner =
FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
if self.validators.get(&ident).is_none() {
let validator = ValidatorImpl {
func: |_: S::Config| Ok(()),
};
let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
self.validators.insert(ident.clone(), validator_stable);
}
let entry = self.services.entry(ident).or_default();
entry.push(factory_inner);
}
#[allow(clippy::result_unit_err)]
#[cfg(feature = "internal")]
pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
let maybe_factories = self.services.get(&ident);
match maybe_factories {
None => Ok(None),
Some(factories) if factories.len() == 1 => {
Ok(factories.first().map(|factory| factory.make()))
}
Some(_) => Err(()),
}
}
#[allow(clippy::result_unit_err)]
#[cfg(feature = "internal")]
pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
match self.services.get(&ident) {
None => Ok(false),
Some(factories) if factories.len() == 1 => Ok(true),
Some(_) => Err(()),
}
}
pub fn add_config_validator<S: Service>(
&mut self,
service_name: &str,
plugin_version: &str,
validator: fn(S::Config) -> CallbackResult<()>,
) where
S::Config: DeserializeOwned + 'static,
{
let ident =
ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
let validator = ValidatorImpl { func: validator };
let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
self.validators.insert(ident, validator_stable);
}
#[cfg(feature = "internal")]
pub fn remove_config_validator(
&mut self,
service_name: &str,
version: &str,
) -> Option<ValidatorBox> {
let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
self.validators.remove(&ident).into_option()
}
#[cfg(feature = "internal")]
pub fn dump(&self) -> Vec<(String, String)> {
self.services
.keys()
.map(|key| {
let service = key.0.to_string();
let version = key.1.to_string();
(service, version)
})
.collect()
}
}
#[sabi_trait]
pub trait MigrationContextValidator {
fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()>;
fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()>;
}
pub type MigrationContextValidatorBox = MigrationContextValidator_TO<'static, RBox<()>>;
#[derive(Default)]
struct MigrationContextValidatorImpl {
#[allow(clippy::type_complexity)]
context_validator: Option<fn(context: HashMap<String, String>) -> CallbackResult<()>>,
parameter_validator: Option<fn(name: String, value: String) -> CallbackResult<()>>,
}
impl MigrationContextValidator for MigrationContextValidatorImpl {
fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()> {
let Some(context_validator) = self.context_validator else {
return ROk(());
};
let context = context
.into_iter()
.map(|tuple| (tuple.0.into(), tuple.1.into()))
.collect();
match (context_validator)(context) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()> {
let Some(parameter_validator) = self.parameter_validator else {
return ROk(());
};
match (parameter_validator)(name.into(), value.into()) {
Ok(_) => ROk(()),
Err(e) => error_into_tt_error(e),
}
}
}
#[repr(C)]
#[derive(StableAbi, Default)]
pub struct MigrationValidator {
context_validator: ROption<MigrationContextValidatorBox>,
}
impl MigrationValidator {
pub fn set_context_validator(
&mut self,
context_validator_fn: fn(context: HashMap<String, String>) -> CallbackResult<()>,
) {
match &mut self.context_validator {
ROption::RNone => {
let validator_inner = MigrationContextValidatorImpl {
context_validator: Some(context_validator_fn),
parameter_validator: None,
};
let validator_inner = MigrationContextValidatorBox::from_value(
validator_inner,
abi_stable::sabi_trait::TD_CanDowncast,
);
self.context_validator = ROption::RSome(validator_inner);
}
ROption::RSome(mcv) => {
let mcv_impl = mcv
.obj
.downcast_as_mut::<MigrationContextValidatorImpl>()
.expect(
"downcasting should not fail, because it uses the same struct for creating the obj",
);
mcv_impl.context_validator = Some(context_validator_fn);
}
}
}
pub fn set_context_parameter_validator(
&mut self,
parameter_validator_fn: fn(name: String, value: String) -> CallbackResult<()>,
) {
match &mut self.context_validator {
ROption::RNone => {
let validator_inner = MigrationContextValidatorImpl {
context_validator: None,
parameter_validator: Some(parameter_validator_fn),
};
let validator_inner = MigrationContextValidatorBox::from_value(
validator_inner,
abi_stable::sabi_trait::TD_CanDowncast,
);
self.context_validator = ROption::RSome(validator_inner);
}
ROption::RSome(mcv) => {
let mcv_impl = mcv
.obj
.downcast_as_mut::<MigrationContextValidatorImpl>()
.expect(
"downcasting should not fail, because it uses the same struct for creating the obj",
);
mcv_impl.parameter_validator = Some(parameter_validator_fn);
}
}
}
#[cfg(feature = "internal")]
pub fn migration_context_validator(&mut self) -> MigrationContextValidatorBox {
if let ROption::RSome(mcv) = self.context_validator.take() {
return mcv;
}
let validator = MigrationContextValidatorImpl::default();
let validator_stable =
MigrationContextValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
validator_stable
}
}
pub type FnMigrationValidator = extern "C" fn(validator: &mut MigrationValidator);