Skip to main content

picodata_plugin/plugin/
interface.rs

1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, ROption, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::collections::HashMap;
12use std::error::Error;
13use std::fmt::Display;
14use std::time::Duration;
15use tarantool::error::TarantoolErrorCode;
16use tarantool::error::{BoxError, IntoBoxError};
17
18/// Context of current instance. Produced by picodata.
19#[repr(C)]
20#[derive(StableAbi, Debug)]
21pub struct PicoContext {
22    is_master: bool,
23    pub plugin_name: FfiSafeStr,
24    pub service_name: FfiSafeStr,
25    pub plugin_version: FfiSafeStr,
26}
27
28impl PicoContext {
29    #[inline]
30    pub fn new(is_master: bool) -> PicoContext {
31        Self {
32            is_master,
33            plugin_name: "<unset>".into(),
34            service_name: "<unset>".into(),
35            plugin_version: "<unset>".into(),
36        }
37    }
38
39    /// # Safety
40    ///
41    /// Note: this is for internal use only. Plugin developers should never
42    /// be copying pico context.
43    #[inline]
44    pub unsafe fn clone(&self) -> Self {
45        Self {
46            is_master: self.is_master,
47            plugin_name: self.plugin_name,
48            service_name: self.service_name,
49            plugin_version: self.plugin_version,
50        }
51    }
52
53    /// Return true if the current instance is a replicaset leader.
54    #[inline]
55    pub fn is_master(&self) -> bool {
56        self.is_master
57    }
58
59    /// Return [`ServiceWorkerManager`] for current service.
60    #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
61    pub fn worker_manager(&self) -> ServiceWorkerManager {
62        ServiceWorkerManager::new(self.make_service_id())
63    }
64
65    // TODO:
66    // pub fn register_job(&self) -> ServiceWorkerManager {
67    // pub fn register_tagged_job(&self) -> ServiceWorkerManager {
68    // pub fn cancel_job_by_tag(&self) -> ServiceWorkerManager {
69
70    #[inline(always)]
71    pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
72        crate::metrics::register_metrics_handler(self, callback)
73    }
74
75    /// Add a new job to the execution.
76    /// Job work life cycle will be tied to the service life cycle;
77    /// this means that job will be canceled just before service is stopped.
78    ///
79    /// # Arguments
80    ///
81    /// * `job`: callback that will be executed in separated fiber.
82    ///   Note that it is your responsibility to organize job graceful shutdown, see a
83    ///   [`background::CancellationToken`] for details.
84    ///
85    /// # Examples
86    ///
87    /// ```no_run
88    /// use std::time::Duration;
89    /// use picodata_plugin::background::CancellationToken;
90    ///
91    /// # use picodata_plugin::plugin::interface::PicoContext;
92    /// # fn on_start(context: PicoContext) {
93    ///
94    /// // this job will print "hello" every second,
95    /// // and print "bye" after being canceled
96    /// fn hello_printer(cancel: CancellationToken) {
97    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
98    ///         println!("hello!");
99    ///     }
100    ///     println!("job cancelled, bye!")
101    /// }
102    /// context.register_job(hello_printer).unwrap();
103    ///
104    /// # }
105    /// ```
106    #[inline(always)]
107    pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
108    where
109        F: FnOnce(background::CancellationToken) + 'static,
110    {
111        background::register_job(&self.make_service_id(), job)
112    }
113
114    /// Same as [`Self::register_job`] but caller may provide a special tag.
115    /// This tag may be used for manual job cancellation using [`Self::cancel_tagged_jobs`].
116    ///
117    /// # Arguments
118    ///
119    /// * `job`: callback that will be executed in separated fiber
120    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
121    #[inline(always)]
122    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
123    where
124        F: FnOnce(background::CancellationToken) + 'static,
125    {
126        background::register_tagged_job(&self.make_service_id(), job, tag)
127    }
128
129    /// Cancel all jobs related to the given `tag`.
130    /// This function return after all related jobs will be gracefully shutdown or
131    /// after `timeout` duration.
132    ///
133    /// Returns error with code [`TarantoolErrorCode::Timeout`] in case some
134    /// jobs didn't finish within `timeout`.
135    ///
136    /// May also theoretically return error with code [`ErrorCode::NoSuchService`]
137    /// in case the service doesn't exist anymore (highly unlikely).
138    ///
139    /// See also [`Self::register_tagged_job`].
140    #[inline(always)]
141    pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
142        let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
143        if res.n_timeouts != 0 {
144            #[rustfmt::skip]
145            return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
146        }
147
148        Ok(())
149    }
150
151    /// In case when jobs were canceled by `picodata` use this function for determine
152    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
153    /// jobs gracefully end.
154    ///
155    /// By default, 5-second timeout are used.
156    #[inline(always)]
157    pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
158        crate::background::set_jobs_shutdown_timeout(
159            self.plugin_name(),
160            self.service_name(),
161            self.plugin_version(),
162            timeout,
163        )
164    }
165
166    #[inline(always)]
167    pub fn make_service_id(&self) -> ServiceId {
168        ServiceId::new(
169            self.plugin_name(),
170            self.service_name(),
171            self.plugin_version(),
172        )
173    }
174
175    #[inline]
176    pub fn plugin_name(&self) -> &str {
177        // SAFETY: safe because lifetime is managed by borrow checker
178        unsafe { self.plugin_name.as_str() }
179    }
180
181    #[inline]
182    pub fn service_name(&self) -> &str {
183        // SAFETY: safe because lifetime is managed by borrow checker
184        unsafe { self.service_name.as_str() }
185    }
186
187    #[inline]
188    pub fn plugin_version(&self) -> &str {
189        // SAFETY: safe because lifetime is managed by borrow checker
190        unsafe { self.plugin_version.as_str() }
191    }
192}
193
194/// Unique service identifier.
195#[derive(Debug, PartialEq, Eq, Hash, Clone)]
196pub struct ServiceId {
197    pub plugin: SmolStr,
198    pub service: SmolStr,
199    pub version: SmolStr,
200}
201
202impl std::fmt::Display for ServiceId {
203    #[inline(always)]
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
206    }
207}
208
209impl ServiceId {
210    #[inline(always)]
211    pub fn new(
212        plugin: impl Into<SmolStr>,
213        service: impl Into<SmolStr>,
214        version: impl Into<SmolStr>,
215    ) -> Self {
216        Self {
217            plugin: plugin.into(),
218            service: service.into(),
219            version: version.into(),
220        }
221    }
222
223    #[inline(always)]
224    pub fn plugin(&self) -> &str {
225        &self.plugin
226    }
227
228    #[inline(always)]
229    pub fn service(&self) -> &str {
230        &self.service
231    }
232
233    #[inline(always)]
234    pub fn version(&self) -> &str {
235        &self.version
236    }
237}
238
239// --------------------------- user interface ------------------------------------------------------
240
241/// Error type, return it from your callbacks.
242pub type ErrorBox = Box<dyn Error>;
243
244pub type CallbackResult<T> = Result<T, ErrorBox>;
245
246/// Service trait. Implement it in your code to create a service.
247pub trait Service {
248    /// Use this associated type to define configuration of your service.
249    type Config: DeserializeOwned;
250
251    /// Callback to handle service configuration change once instance receives it.
252    ///
253    /// # Idempotency
254    ///
255    /// **WARNING** This callback may be called several times in a row.
256    /// It is the responsibility of the plugin author to make this function idempotent.
257    ///
258    /// # Poison
259    ///
260    /// Return an error here to poison current instance.
261    /// This will not cancel reconfiguration process,
262    /// but will mark instance as unavailable for rpc messaging.
263    ///
264    /// Instance can be "healed"
265    /// if any of `on_leader_change` or `on_config_change`
266    /// callbacks in the future return `Ok`.
267    ///
268    /// # Arguments
269    ///
270    /// * `ctx`: instance context
271    /// * `new_config`: new configuration
272    /// * `old_config`: previous defined configuration
273    fn on_config_change(
274        &mut self,
275        ctx: &PicoContext,
276        new_config: Self::Config,
277        old_config: Self::Config,
278    ) -> CallbackResult<()> {
279        _ = ctx;
280        _ = new_config;
281        _ = old_config;
282        Ok(())
283    }
284
285    /// Called at service start on every instance.
286    ///
287    /// # Idempotency
288    ///
289    /// **WARNING** This callback may be called several times in a row (without
290    /// any calls to [`Self::on_stop`]). It is the responsibility of the plugin
291    /// author to make this function idempotent.
292    ///
293    /// An error returned here abort plugin load clusterwide thus forcing
294    /// `on_stop` callback execution on every instance.
295    ///
296    /// # Arguments
297    ///
298    /// * `context`: instance context
299    /// * `config`: initial configuration
300    fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
301        _ = context;
302        _ = config;
303        Ok(())
304    }
305
306    /// Called on instance shutdown, plugin removal or failure of the initial load.
307    /// Returned error will only be logged causing no effects on plugin lifecycle.
308    ///
309    /// # Idempotency
310    ///
311    /// **WARNING** This callback may be called several times in a row.
312    /// It is the responsibility of the plugin author to make this function idempotent.
313    ///
314    /// # Arguments
315    ///
316    /// * `context`: instance context
317    fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
318        _ = context;
319        Ok(())
320    }
321
322    /// Called when replicaset leader is changed.
323    /// This callback will be called exactly on two instances - the old leader and the new one.
324    ///
325    /// # Idempotency
326    ///
327    /// **WARNING** This callback may be called several times in a row.
328    /// It is the responsibility of the plugin author to make this function idempotent.
329    ///
330    /// # Poison
331    ///
332    /// Return an error here to poison current instance.
333    /// This will not cancel reconfiguration process,
334    /// but will mark instance as unavailable for rpc messaging.
335    ///
336    /// Instance can be "healed"
337    /// if any of `on_leader_change` or `on_config_change`
338    /// callbacks in the future return `Ok`.
339    ///
340    /// # Arguments
341    ///
342    /// * `context`: instance context
343    fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
344        _ = context;
345        Ok(())
346    }
347
348    /// `on_healthcheck` is a callback
349    /// that should be called to determine if the service is functioning properly
350    /// On an error instance will be poisoned
351    ///
352    /// # Unimplemented
353    ///
354    /// **WARNING** This feature is not yet implemented.
355    /// The callback is never called.
356    /// TODO.
357    fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
358        _ = context;
359        Ok(())
360    }
361}
362
363// ---------------------------- internal implementation ----------------------------------------------
364
365/// Safe trait for sending a service trait object between ABI boundary.
366/// Define interface like [`Service`] trait but using safe types from [`abi_stable`] crate.
367#[sabi_trait]
368pub trait ServiceStable {
369    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
370    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
371    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
372    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
373    fn on_config_change(
374        &mut self,
375        ctx: &PicoContext,
376        new_config: RSlice<u8>,
377        old_config: RSlice<u8>,
378    ) -> RResult<(), ()>;
379}
380
381/// Implementation of [`ServiceStable`]
382pub struct ServiceProxy<C: DeserializeOwned> {
383    service: Box<dyn Service<Config = C>>,
384}
385
386impl<C: DeserializeOwned> ServiceProxy<C> {
387    pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
388        Self { service }
389    }
390}
391
392/// Use this function for conversion between user error and picodata internal error.
393/// This conversion forces allocations because using user-error "as-is"
394/// may lead to use-after-free errors.
395/// UAF can happen if user error points into memory allocated by dynamic lib and lives
396/// longer than dynamic lib memory (that was unmapped by system).
397fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
398    let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
399    tt_error.set_last_error();
400    RErr(())
401}
402
403macro_rules! rtry {
404    ($expr: expr) => {
405        match $expr {
406            Ok(k) => k,
407            Err(e) => return error_into_tt_error(e),
408        }
409    };
410}
411
412impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
413    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
414        match self.service.on_health_check(context) {
415            Ok(_) => ROk(()),
416            Err(e) => error_into_tt_error(e),
417        }
418    }
419
420    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
421        let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
422        match self.service.on_start(context, configuration) {
423            Ok(_) => ROk(()),
424            Err(e) => error_into_tt_error(e),
425        }
426    }
427
428    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
429        match self.service.on_stop(context) {
430            Ok(_) => ROk(()),
431            Err(e) => error_into_tt_error(e),
432        }
433    }
434
435    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
436        match self.service.on_leader_change(context) {
437            Ok(_) => ROk(()),
438            Err(e) => error_into_tt_error(e),
439        }
440    }
441
442    fn on_config_change(
443        &mut self,
444        ctx: &PicoContext,
445        new_config: RSlice<u8>,
446        old_config: RSlice<u8>,
447    ) -> RResult<(), ()> {
448        let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
449        let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
450
451        let res = self.service.on_config_change(ctx, new_config, old_config);
452        match res {
453            Ok(_) => ROk(()),
454            Err(e) => error_into_tt_error(e),
455        }
456    }
457}
458
459/// Final safe service trait object type. Can be used on both sides of ABI.
460pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
461
462// ---------------------------- Registrar ----------------------------------------------
463
464pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
465
466/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
467/// closures.
468#[sabi_trait]
469trait Factory {
470    fn make(&self) -> ServiceBox;
471}
472
473type FactoryBox = Factory_TO<'static, RBox<()>>;
474
475/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
476/// closures.
477struct FactoryImpl<S: Service + 'static> {
478    factory_fn: fn() -> S,
479}
480
481impl<S: Service + 'static> Factory for FactoryImpl<S> {
482    fn make(&self) -> ServiceBox {
483        let boxed = Box::new((self.factory_fn)());
484        ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
485    }
486}
487
488/// Service name and plugin version pair.
489type ServiceIdent = RTuple!(RString, RString);
490
491/// Config validator stable trait.
492///
493/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
494/// closures.
495#[sabi_trait]
496pub trait Validator {
497    /// Validate plugin configuration.
498    ///
499    /// # Idempotency
500    ///
501    /// **WARNING** This callback may be called several times in a row.
502    /// It is the responsibility of the plugin author to make this function idempotent.
503    ///
504    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
505}
506
507pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
508
509/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
510/// closures.
511struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
512    func: fn(config: CONFIG) -> CallbackResult<()>,
513}
514
515impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
516    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
517        let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
518        let res = (self.func)(config);
519        match res {
520            Ok(_) => ROk(()),
521            Err(e) => error_into_tt_error(e),
522        }
523    }
524}
525
526/// Registry for services. Used by picodata to create instances of services.
527#[repr(C)]
528#[derive(Default, StableAbi)]
529pub struct ServiceRegistry {
530    services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
531    validators: RHashMap<ServiceIdent, ValidatorBox>,
532}
533
534impl ServiceRegistry {
535    /// Add new service to the registry.
536    ///
537    /// # Arguments
538    ///
539    /// * `name`: service name
540    /// * `plugin_version`: version of service's plugin
541    /// * `factory`: new service instance factory
542    pub fn add<S: Service + 'static>(
543        &mut self,
544        name: &str,
545        plugin_version: &str,
546        factory: fn() -> S,
547    ) {
548        let factory_inner = FactoryImpl {
549            factory_fn: factory,
550        };
551        let factory_inner =
552            FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
553
554        let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
555
556        if self.validators.get(&ident).is_none() {
557            // default validator implementation,
558            // just check that configuration may be deserialized into `S::Config` type
559            let validator = ValidatorImpl {
560                func: |_: S::Config| Ok(()),
561            };
562            let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
563            self.validators.insert(ident.clone(), validator_stable);
564        }
565
566        let entry = self.services.entry(ident).or_default();
567        entry.push(factory_inner);
568    }
569
570    /// Create service from service name and plugin version pair.
571    /// Return an error if there is more than one factory suitable for creating a service.
572    #[allow(clippy::result_unit_err)]
573    #[cfg(feature = "internal")]
574    pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
575        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
576        let maybe_factories = self.services.get(&ident);
577
578        match maybe_factories {
579            None => Ok(None),
580            Some(factories) if factories.len() == 1 => {
581                Ok(factories.first().map(|factory| factory.make()))
582            }
583            Some(_) => Err(()),
584        }
585    }
586
587    /// Return true if registry contains needle service, false elsewhere.
588    /// Return an error if there is more than one factory suitable for creating a service.
589    #[allow(clippy::result_unit_err)]
590    #[cfg(feature = "internal")]
591    pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
592        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
593        match self.services.get(&ident) {
594            None => Ok(false),
595            Some(factories) if factories.len() == 1 => Ok(true),
596            Some(_) => Err(()),
597        }
598    }
599
600    /// Add validator for service configuration. Called before new configration is loaded.
601    /// Returning an error for validator will abort configuration change clusterwide.
602    ///
603    /// # Arguments
604    ///
605    /// * `service_name`: service name which configuration will be validated
606    /// * `plugin_version`: plugin version
607    /// * `validator`: validation function
608    pub fn add_config_validator<S: Service>(
609        &mut self,
610        service_name: &str,
611        plugin_version: &str,
612        validator: fn(S::Config) -> CallbackResult<()>,
613    ) where
614        S::Config: DeserializeOwned + 'static,
615    {
616        let ident =
617            ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
618
619        let validator = ValidatorImpl { func: validator };
620        let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
621        self.validators.insert(ident, validator_stable);
622    }
623
624    /// Remove config validator for service.
625    #[cfg(feature = "internal")]
626    pub fn remove_config_validator(
627        &mut self,
628        service_name: &str,
629        version: &str,
630    ) -> Option<ValidatorBox> {
631        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
632        self.validators.remove(&ident).into_option()
633    }
634
635    /// Return a registered list of (service name, plugin version) pairs.
636    #[cfg(feature = "internal")]
637    pub fn dump(&self) -> Vec<(String, String)> {
638        self.services
639            .keys()
640            .map(|key| {
641                let service = key.0.to_string();
642                let version = key.1.to_string();
643                (service, version)
644            })
645            .collect()
646    }
647}
648
649/// Migration context validator stable trait.
650///
651/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
652/// closures.
653#[sabi_trait]
654pub trait MigrationContextValidator {
655    /// Validate the whole plugin migration context.
656    ///
657    /// # Idempotency
658    ///
659    /// **WARNING** This callback may be called several times in a row.
660    /// It is the responsibility of the plugin author to make this function idempotent.
661    fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()>;
662
663    /// Validate a single parameter value from the migration context.
664    ///
665    /// # Idempotency
666    ///
667    /// **WARNING** This callback may be called several times in a row.
668    /// It is the responsibility of the plugin author to make this function idempotent.
669    fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()>;
670}
671
672pub type MigrationContextValidatorBox = MigrationContextValidator_TO<'static, RBox<()>>;
673
674/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
675/// closures.
676#[derive(Default)]
677struct MigrationContextValidatorImpl {
678    #[allow(clippy::type_complexity)]
679    /// Whole migration context validation function. It's only argument is a
680    /// `HashMap`, that maps migration context parameter names to their values.
681    context_validator: Option<fn(context: HashMap<String, String>) -> CallbackResult<()>>,
682    /// Parameter validation function. It's first argument is the parameter name
683    /// that is currently being validated, the second argument is its value.
684    parameter_validator: Option<fn(name: String, value: String) -> CallbackResult<()>>,
685}
686
687impl MigrationContextValidator for MigrationContextValidatorImpl {
688    fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()> {
689        let Some(context_validator) = self.context_validator else {
690            // the context validator is not set, so don't do any validation
691            return ROk(());
692        };
693        let context = context
694            .into_iter()
695            .map(|tuple| (tuple.0.into(), tuple.1.into()))
696            .collect();
697        match (context_validator)(context) {
698            Ok(_) => ROk(()),
699            Err(e) => error_into_tt_error(e),
700        }
701    }
702
703    fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()> {
704        let Some(parameter_validator) = self.parameter_validator else {
705            // the parameter validator is not set, so don't do any validation
706            return ROk(());
707        };
708        match (parameter_validator)(name.into(), value.into()) {
709            Ok(_) => ROk(()),
710            Err(e) => error_into_tt_error(e),
711        }
712    }
713}
714
715/// A struct that holds all entities used for migration validation (currently, only the
716/// migration context validator)
717#[repr(C)]
718#[derive(StableAbi, Default)]
719pub struct MigrationValidator {
720    context_validator: ROption<MigrationContextValidatorBox>,
721}
722
723impl MigrationValidator {
724    /// Set the whole migration context validation function. The only argument of `context_validator_fn`
725    /// is a `HashMap`, that maps migration context parameter names to their values.
726    ///
727    /// The validator function is called on `ALTER PLUGIN ... ENABLE`.
728    pub fn set_context_validator(
729        &mut self,
730        context_validator_fn: fn(context: HashMap<String, String>) -> CallbackResult<()>,
731    ) {
732        match &mut self.context_validator {
733            ROption::RNone => {
734                let validator_inner = MigrationContextValidatorImpl {
735                    context_validator: Some(context_validator_fn),
736                    parameter_validator: None,
737                };
738                let validator_inner = MigrationContextValidatorBox::from_value(
739                    validator_inner,
740                    abi_stable::sabi_trait::TD_CanDowncast,
741                );
742
743                self.context_validator = ROption::RSome(validator_inner);
744            }
745            ROption::RSome(mcv) => {
746                let mcv_impl = mcv
747                    .obj
748                    .downcast_as_mut::<MigrationContextValidatorImpl>()
749                    .expect(
750                    "downcasting should not fail, because it uses the same struct for creating the obj",
751                );
752
753                mcv_impl.context_validator = Some(context_validator_fn);
754            }
755        }
756    }
757
758    /// Set the migartion context parameter validation function. First argument of `parameter_validator_fn`
759    /// is the parameter name that is currently being validated, the second is its value.
760    ///
761    /// The validator function is called on `ALTER PLUGIN ... SET migration_context.<name>='<value>'`.
762    pub fn set_context_parameter_validator(
763        &mut self,
764        parameter_validator_fn: fn(name: String, value: String) -> CallbackResult<()>,
765    ) {
766        match &mut self.context_validator {
767            ROption::RNone => {
768                let validator_inner = MigrationContextValidatorImpl {
769                    context_validator: None,
770                    parameter_validator: Some(parameter_validator_fn),
771                };
772                let validator_inner = MigrationContextValidatorBox::from_value(
773                    validator_inner,
774                    abi_stable::sabi_trait::TD_CanDowncast,
775                );
776
777                self.context_validator = ROption::RSome(validator_inner);
778            }
779            ROption::RSome(mcv) => {
780                let mcv_impl = mcv
781                    .obj
782                    .downcast_as_mut::<MigrationContextValidatorImpl>()
783                    .expect(
784                    "downcasting should not fail, because it uses the same struct for creating the obj",
785                );
786
787                mcv_impl.parameter_validator = Some(parameter_validator_fn);
788            }
789        }
790    }
791
792    #[cfg(feature = "internal")]
793    pub fn migration_context_validator(&mut self) -> MigrationContextValidatorBox {
794        if let ROption::RSome(mcv) = self.context_validator.take() {
795            return mcv;
796        }
797
798        // context validator is not set, return a default implementation, that just returns Ok
799        let validator = MigrationContextValidatorImpl::default();
800        let validator_stable =
801            MigrationContextValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
802        validator_stable
803    }
804}
805
806pub type FnMigrationValidator = extern "C" fn(validator: &mut MigrationValidator);