picodata_plugin/plugin/
interface.rs

1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::error::Error;
12use std::fmt::Display;
13use std::time::Duration;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::error::{BoxError, IntoBoxError};
16
17/// Context of current instance. Produced by picodata.
18#[repr(C)]
19#[derive(StableAbi, Debug)]
20pub struct PicoContext {
21    is_master: bool,
22    pub plugin_name: FfiSafeStr,
23    pub service_name: FfiSafeStr,
24    pub plugin_version: FfiSafeStr,
25}
26
27impl PicoContext {
28    #[inline]
29    pub fn new(is_master: bool) -> PicoContext {
30        Self {
31            is_master,
32            plugin_name: "<unset>".into(),
33            service_name: "<unset>".into(),
34            plugin_version: "<unset>".into(),
35        }
36    }
37
38    /// Note: this is for internal use only. Plugin developers should never
39    /// be copying pico context.
40    #[inline]
41    pub unsafe fn clone(&self) -> Self {
42        Self {
43            is_master: self.is_master,
44            plugin_name: self.plugin_name.clone(),
45            service_name: self.service_name.clone(),
46            plugin_version: self.plugin_version.clone(),
47        }
48    }
49
50    /// Return true if the current instance is a replicaset leader.
51    #[inline]
52    pub fn is_master(&self) -> bool {
53        self.is_master
54    }
55
56    /// Return [`ServiceWorkerManager`] for current service.
57    #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
58    pub fn worker_manager(&self) -> ServiceWorkerManager {
59        ServiceWorkerManager::new(self.make_service_id())
60    }
61
62    // TODO:
63    // pub fn register_job(&self) -> ServiceWorkerManager {
64    // pub fn register_tagged_job(&self) -> ServiceWorkerManager {
65    // pub fn cancel_job_by_tag(&self) -> ServiceWorkerManager {
66
67    #[inline(always)]
68    pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
69        crate::metrics::register_metrics_handler(self, callback)
70    }
71
72    /// Add a new job to the execution.
73    /// Job work life cycle will be tied to the service life cycle;
74    /// this means that job will be canceled just before service is stopped.
75    ///
76    /// # Arguments
77    ///
78    /// * `job`: callback that will be executed in separated fiber.
79    /// Note that it is your responsibility to organize job graceful shutdown, see a
80    /// [`background::CancellationToken`] for details.
81    ///
82    /// # Examples
83    ///
84    /// ```no_run
85    /// use std::time::Duration;
86    /// use picodata_plugin::background::CancellationToken;
87    ///
88    /// # use picodata_plugin::plugin::interface::PicoContext;
89    /// # fn on_start(context: PicoContext) {
90    ///
91    /// // this job will print "hello" every second,
92    /// // and print "bye" after being canceled
93    /// fn hello_printer(cancel: CancellationToken) {
94    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
95    ///         println!("hello!");
96    ///     }
97    ///     println!("job cancelled, bye!")
98    /// }
99    /// context.register_job(hello_printer).unwrap();
100    ///
101    /// # }
102    /// ```
103    #[inline(always)]
104    pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
105    where
106        F: FnOnce(background::CancellationToken) + 'static,
107    {
108        background::register_job(&self.make_service_id(), job)
109    }
110
111    /// Same as [`Self::register_job`] but caller may provide a special tag.
112    /// This tag may be used for manual job cancellation using [`Self::cancel_tagged_jobs`].
113    ///
114    /// # Arguments
115    ///
116    /// * `job`: callback that will be executed in separated fiber
117    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
118    #[inline(always)]
119    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
120    where
121        F: FnOnce(background::CancellationToken) + 'static,
122    {
123        background::register_tagged_job(&self.make_service_id(), job, tag)
124    }
125
126    /// Cancel all jobs related to the given `tag`.
127    /// This function return after all related jobs will be gracefully shutdown or
128    /// after `timeout` duration.
129    ///
130    /// Returns error with code [`TarantoolErrorCode::Timeout`] in case some
131    /// jobs didn't finish within `timeout`.
132    ///
133    /// May also theoretically return error with code [`ErrorCode::NoSuchService`]
134    /// in case the service doesn't exist anymore (highly unlikely).
135    ///
136    /// See also [`Self::register_tagged_job`].
137    #[inline(always)]
138    pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
139        let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
140        if res.n_timeouts != 0 {
141            #[rustfmt::skip]
142            return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
143        }
144
145        Ok(())
146    }
147
148    /// In case when jobs were canceled by `picodata` use this function for determine
149    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
150    /// jobs gracefully end.
151    ///
152    /// By default, 5-second timeout are used.
153    #[inline(always)]
154    pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
155        crate::background::set_jobs_shutdown_timeout(
156            self.plugin_name(),
157            self.service_name(),
158            self.plugin_version(),
159            timeout,
160        )
161    }
162
163    #[inline(always)]
164    pub fn make_service_id(&self) -> ServiceId {
165        ServiceId::new(
166            self.plugin_name(),
167            self.service_name(),
168            self.plugin_version(),
169        )
170    }
171
172    #[inline]
173    pub fn plugin_name(&self) -> &str {
174        // SAFETY: safe because lifetime is managed by borrow checker
175        unsafe { self.plugin_name.as_str() }
176    }
177
178    #[inline]
179    pub fn service_name(&self) -> &str {
180        // SAFETY: safe because lifetime is managed by borrow checker
181        unsafe { self.service_name.as_str() }
182    }
183
184    #[inline]
185    pub fn plugin_version(&self) -> &str {
186        // SAFETY: safe because lifetime is managed by borrow checker
187        unsafe { self.plugin_version.as_str() }
188    }
189}
190
191/// Unique service identifier.
192#[derive(Debug, PartialEq, Eq, Hash, Clone)]
193pub struct ServiceId {
194    pub plugin: SmolStr,
195    pub service: SmolStr,
196    pub version: SmolStr,
197}
198
199impl std::fmt::Display for ServiceId {
200    #[inline(always)]
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
203    }
204}
205
206impl ServiceId {
207    #[inline(always)]
208    pub fn new(
209        plugin: impl Into<SmolStr>,
210        service: impl Into<SmolStr>,
211        version: impl Into<SmolStr>,
212    ) -> Self {
213        Self {
214            plugin: plugin.into(),
215            service: service.into(),
216            version: version.into(),
217        }
218    }
219
220    #[inline(always)]
221    pub fn plugin(&self) -> &str {
222        &self.plugin
223    }
224
225    #[inline(always)]
226    pub fn service(&self) -> &str {
227        &self.service
228    }
229
230    #[inline(always)]
231    pub fn version(&self) -> &str {
232        &self.version
233    }
234}
235
236// --------------------------- user interface ------------------------------------------------------
237
238/// Error type, return it from your callbacks.
239pub type ErrorBox = Box<dyn Error>;
240
241pub type CallbackResult<T> = Result<T, ErrorBox>;
242
243/// Service trait. Implement it in your code to create a service.
244pub trait Service {
245    /// Use this associated type to define configuration of your service.
246    type Config: DeserializeOwned;
247
248    /// Callback to handle service configuration change once instance receives it.
249    ///
250    /// # Idempotency
251    ///
252    /// **WARNING** This callback may be called several times in a row.
253    /// It is the responsibility of the plugin author to make this function idempotent.
254    ///
255    /// # Poison
256    ///
257    /// Return an error here to poison current instance.
258    /// This will not cancel reconfiguration process,
259    /// but will mark instance as unavailable for rpc messaging.
260    ///
261    /// Instance can be "healed"
262    /// if any of `on_leader_change` or `on_config_change`
263    /// callbacks in the future return `Ok`.
264    ///
265    /// # Arguments
266    ///
267    /// * `ctx`: instance context
268    /// * `new_config`: new configuration
269    /// * `old_config`: previous defined configuration
270    fn on_config_change(
271        &mut self,
272        ctx: &PicoContext,
273        new_config: Self::Config,
274        old_config: Self::Config,
275    ) -> CallbackResult<()> {
276        _ = ctx;
277        _ = new_config;
278        _ = old_config;
279        Ok(())
280    }
281
282    /// Called at service start on every instance.
283    ///
284    /// # Idempotency
285    ///
286    /// **WARNING** This callback may be called several times in a row (without
287    /// any calls to [`Self::on_stop`]). It is the responsibility of the plugin
288    /// author to make this function idempotent.
289    ///
290    /// An error returned here abort plugin load clusterwide thus forcing
291    /// `on_stop` callback execution on every instance.
292    ///
293    /// # Arguments
294    ///
295    /// * `context`: instance context
296    /// * `config`: initial configuration
297    fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
298        _ = context;
299        _ = config;
300        Ok(())
301    }
302
303    /// Called on instance shutdown, plugin removal or failure of the initial load.
304    /// Returned error will only be logged causing no effects on plugin lifecycle.
305    ///
306    /// # Idempotency
307    ///
308    /// **WARNING** This callback may be called several times in a row.
309    /// It is the responsibility of the plugin author to make this function idempotent.
310    ///
311    /// # Arguments
312    ///
313    /// * `context`: instance context
314    fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
315        _ = context;
316        Ok(())
317    }
318
319    /// Called when replicaset leader is changed.
320    /// This callback will be called exactly on two instances - the old leader and the new one.
321    ///
322    /// # Idempotency
323    ///
324    /// **WARNING** This callback may be called several times in a row.
325    /// It is the responsibility of the plugin author to make this function idempotent.
326    ///
327    /// # Poison
328    ///
329    /// Return an error here to poison current instance.
330    /// This will not cancel reconfiguration process,
331    /// but will mark instance as unavailable for rpc messaging.
332    ///
333    /// Instance can be "healed"
334    /// if any of `on_leader_change` or `on_config_change`
335    /// callbacks in the future return `Ok`.
336    ///
337    /// # Arguments
338    ///
339    /// * `context`: instance context
340    fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
341        _ = context;
342        Ok(())
343    }
344
345    /// `on_healthcheck` is a callback
346    /// that should be called to determine if the service is functioning properly
347    /// On an error instance will be poisoned
348    ///
349    /// # Unimplemented
350    ///
351    /// **WARNING** This feature is not yet implemented.
352    /// The callback is never called.
353    /// TODO.
354    fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
355        _ = context;
356        Ok(())
357    }
358}
359
360// ---------------------------- internal implementation ----------------------------------------------
361
362/// Safe trait for sending a service trait object between ABI boundary.
363/// Define interface like [`Service`] trait but using safe types from [`abi_stable`] crate.
364#[sabi_trait]
365pub trait ServiceStable {
366    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
367    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
368    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
369    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
370    fn on_config_change(
371        &mut self,
372        ctx: &PicoContext,
373        new_config: RSlice<u8>,
374        old_config: RSlice<u8>,
375    ) -> RResult<(), ()>;
376}
377
378/// Implementation of [`ServiceStable`]
379pub struct ServiceProxy<C: DeserializeOwned> {
380    service: Box<dyn Service<Config = C>>,
381}
382
383impl<C: DeserializeOwned> ServiceProxy<C> {
384    pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
385        Self { service }
386    }
387}
388
389/// Use this function for conversion between user error and picodata internal error.
390/// This conversion forces allocations because using user-error "as-is"
391/// may lead to use-after-free errors.
392/// UAF can happen if user error points into memory allocated by dynamic lib and lives
393/// longer than dynamic lib memory (that was unmapped by system).
394fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
395    let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
396    tt_error.set_last_error();
397    RErr(())
398}
399
400macro_rules! rtry {
401    ($expr: expr) => {
402        match $expr {
403            Ok(k) => k,
404            Err(e) => return error_into_tt_error(e),
405        }
406    };
407}
408
409impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
410    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
411        match self.service.on_health_check(context) {
412            Ok(_) => ROk(()),
413            Err(e) => error_into_tt_error(e),
414        }
415    }
416
417    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
418        let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
419        match self.service.on_start(context, configuration) {
420            Ok(_) => ROk(()),
421            Err(e) => error_into_tt_error(e),
422        }
423    }
424
425    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
426        match self.service.on_stop(context) {
427            Ok(_) => ROk(()),
428            Err(e) => error_into_tt_error(e),
429        }
430    }
431
432    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
433        match self.service.on_leader_change(context) {
434            Ok(_) => ROk(()),
435            Err(e) => error_into_tt_error(e),
436        }
437    }
438
439    fn on_config_change(
440        &mut self,
441        ctx: &PicoContext,
442        new_config: RSlice<u8>,
443        old_config: RSlice<u8>,
444    ) -> RResult<(), ()> {
445        let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
446        let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
447
448        let res = self.service.on_config_change(ctx, new_config, old_config);
449        match res {
450            Ok(_) => ROk(()),
451            Err(e) => error_into_tt_error(e),
452        }
453    }
454}
455
456/// Final safe service trait object type. Can be used on both sides of ABI.
457pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
458
459// ---------------------------- Registrar ----------------------------------------------
460
461pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
462
463/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
464/// closures.
465#[sabi_trait]
466trait Factory {
467    fn make(&self) -> ServiceBox;
468}
469
470type FactoryBox = Factory_TO<'static, RBox<()>>;
471
472/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
473/// closures.
474struct FactoryImpl<S: Service + 'static> {
475    factory_fn: fn() -> S,
476}
477
478impl<S: Service + 'static> Factory for FactoryImpl<S> {
479    fn make(&self) -> ServiceBox {
480        let boxed = Box::new((self.factory_fn)());
481        ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
482    }
483}
484
485/// Service name and plugin version pair.
486type ServiceIdent = RTuple!(RString, RString);
487
488/// Config validator stable trait.
489/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
490/// closures.
491#[sabi_trait]
492pub trait Validator {
493    /// Validate plugin configuration.
494    ///
495    /// # Idempotency
496    ///
497    /// **WARNING** This callback may be called several times in a row.
498    /// It is the responsibility of the plugin author to make this function idempotent.
499    ///
500    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
501}
502
503pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
504
505/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
506/// closures.
507struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
508    func: fn(config: CONFIG) -> CallbackResult<()>,
509}
510
511impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
512    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
513        let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
514        let res = (self.func)(config);
515        match res {
516            Ok(_) => ROk(()),
517            Err(e) => error_into_tt_error(e),
518        }
519    }
520}
521
522/// Registry for services. Used by picodata to create instances of services.
523#[repr(C)]
524#[derive(Default, StableAbi)]
525pub struct ServiceRegistry {
526    services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
527    validators: RHashMap<ServiceIdent, ValidatorBox>,
528}
529
530impl ServiceRegistry {
531    /// Add new service to the registry.
532    ///
533    /// # Arguments
534    ///
535    /// * `name`: service name
536    /// * `plugin_version`: version of service's plugin
537    /// * `factory`: new service instance factory
538    pub fn add<S: Service + 'static>(
539        &mut self,
540        name: &str,
541        plugin_version: &str,
542        factory: fn() -> S,
543    ) {
544        let factory_inner = FactoryImpl {
545            factory_fn: factory,
546        };
547        let factory_inner =
548            FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
549
550        let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
551
552        if self.validators.get(&ident).is_none() {
553            // default validator implementation,
554            // just check that configuration may be deserialized into `S::Config` type
555            let validator = ValidatorImpl {
556                func: |_: S::Config| Ok(()),
557            };
558            let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
559            self.validators.insert(ident.clone(), validator_stable);
560        }
561
562        let entry = self.services.entry(ident).or_default();
563        entry.push(factory_inner);
564    }
565
566    /// Create service from service name and plugin version pair.
567    /// Return an error if there is more than one factory suitable for creating a service.
568    pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
569        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
570        let maybe_factories = self.services.get(&ident);
571
572        match maybe_factories {
573            None => Ok(None),
574            Some(factories) if factories.len() == 1 => {
575                Ok(factories.first().map(|factory| factory.make()))
576            }
577            Some(_) => Err(()),
578        }
579    }
580
581    /// Return true if registry contains needle service, false elsewhere.
582    /// Return an error if there is more than one factory suitable for creating a service.
583    pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
584        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
585        match self.services.get(&ident) {
586            None => Ok(false),
587            Some(factories) if factories.len() == 1 => Ok(true),
588            Some(_) => Err(()),
589        }
590    }
591
592    /// Add validator for service configuration. Called before new configration is loaded.
593    /// Returning an error for validator will abort configuration change clusterwide.
594    ///
595    /// # Arguments
596    ///
597    /// * `service_name`: service name which configuration will be validated
598    /// * `plugin_version`: plugin version
599    /// * `validator`: validation function
600    pub fn add_config_validator<S: Service>(
601        &mut self,
602        service_name: &str,
603        plugin_version: &str,
604        validator: fn(S::Config) -> CallbackResult<()>,
605    ) where
606        S::Config: DeserializeOwned + 'static,
607    {
608        let ident =
609            ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
610
611        let validator = ValidatorImpl { func: validator };
612        let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
613        self.validators.insert(ident, validator_stable);
614    }
615
616    /// Remove config validator for service.
617    pub fn remove_config_validator(
618        &mut self,
619        service_name: &str,
620        version: &str,
621    ) -> Option<ValidatorBox> {
622        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
623        self.validators.remove(&ident).into_option()
624    }
625
626    /// Return a registered list of (service name, plugin version) pairs.
627    pub fn dump(&self) -> Vec<(String, String)> {
628        self.services
629            .keys()
630            .map(|key| {
631                let service = key.0.to_string();
632                let version = key.1.to_string();
633                (service, version)
634            })
635            .collect()
636    }
637}