picodata_plugin/plugin/
interface.rs

1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::error::Error;
12use std::fmt::Display;
13use std::time::Duration;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::error::{BoxError, IntoBoxError};
16
17/// Context of current instance. Produced by picodata.
18#[repr(C)]
19#[derive(StableAbi, Debug)]
20pub struct PicoContext {
21    is_master: bool,
22    pub plugin_name: FfiSafeStr,
23    pub service_name: FfiSafeStr,
24    pub plugin_version: FfiSafeStr,
25}
26
27impl PicoContext {
28    #[inline]
29    pub fn new(is_master: bool) -> PicoContext {
30        Self {
31            is_master,
32            plugin_name: "<unset>".into(),
33            service_name: "<unset>".into(),
34            plugin_version: "<unset>".into(),
35        }
36    }
37
38    /// # Safety
39    ///
40    /// Note: this is for internal use only. Plugin developers should never
41    /// be copying pico context.
42    #[inline]
43    pub unsafe fn clone(&self) -> Self {
44        Self {
45            is_master: self.is_master,
46            plugin_name: self.plugin_name,
47            service_name: self.service_name,
48            plugin_version: self.plugin_version,
49        }
50    }
51
52    /// Return true if the current instance is a replicaset leader.
53    #[inline]
54    pub fn is_master(&self) -> bool {
55        self.is_master
56    }
57
58    /// Return [`ServiceWorkerManager`] for current service.
59    #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
60    pub fn worker_manager(&self) -> ServiceWorkerManager {
61        ServiceWorkerManager::new(self.make_service_id())
62    }
63
64    // TODO:
65    // pub fn register_job(&self) -> ServiceWorkerManager {
66    // pub fn register_tagged_job(&self) -> ServiceWorkerManager {
67    // pub fn cancel_job_by_tag(&self) -> ServiceWorkerManager {
68
69    #[inline(always)]
70    pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
71        crate::metrics::register_metrics_handler(self, callback)
72    }
73
74    /// Add a new job to the execution.
75    /// Job work life cycle will be tied to the service life cycle;
76    /// this means that job will be canceled just before service is stopped.
77    ///
78    /// # Arguments
79    ///
80    /// * `job`: callback that will be executed in separated fiber.
81    ///   Note that it is your responsibility to organize job graceful shutdown, see a
82    ///   [`background::CancellationToken`] for details.
83    ///
84    /// # Examples
85    ///
86    /// ```no_run
87    /// use std::time::Duration;
88    /// use picodata_plugin::background::CancellationToken;
89    ///
90    /// # use picodata_plugin::plugin::interface::PicoContext;
91    /// # fn on_start(context: PicoContext) {
92    ///
93    /// // this job will print "hello" every second,
94    /// // and print "bye" after being canceled
95    /// fn hello_printer(cancel: CancellationToken) {
96    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
97    ///         println!("hello!");
98    ///     }
99    ///     println!("job cancelled, bye!")
100    /// }
101    /// context.register_job(hello_printer).unwrap();
102    ///
103    /// # }
104    /// ```
105    #[inline(always)]
106    pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
107    where
108        F: FnOnce(background::CancellationToken) + 'static,
109    {
110        background::register_job(&self.make_service_id(), job)
111    }
112
113    /// Same as [`Self::register_job`] but caller may provide a special tag.
114    /// This tag may be used for manual job cancellation using [`Self::cancel_tagged_jobs`].
115    ///
116    /// # Arguments
117    ///
118    /// * `job`: callback that will be executed in separated fiber
119    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
120    #[inline(always)]
121    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
122    where
123        F: FnOnce(background::CancellationToken) + 'static,
124    {
125        background::register_tagged_job(&self.make_service_id(), job, tag)
126    }
127
128    /// Cancel all jobs related to the given `tag`.
129    /// This function return after all related jobs will be gracefully shutdown or
130    /// after `timeout` duration.
131    ///
132    /// Returns error with code [`TarantoolErrorCode::Timeout`] in case some
133    /// jobs didn't finish within `timeout`.
134    ///
135    /// May also theoretically return error with code [`ErrorCode::NoSuchService`]
136    /// in case the service doesn't exist anymore (highly unlikely).
137    ///
138    /// See also [`Self::register_tagged_job`].
139    #[inline(always)]
140    pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
141        let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
142        if res.n_timeouts != 0 {
143            #[rustfmt::skip]
144            return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
145        }
146
147        Ok(())
148    }
149
150    /// In case when jobs were canceled by `picodata` use this function for determine
151    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
152    /// jobs gracefully end.
153    ///
154    /// By default, 5-second timeout are used.
155    #[inline(always)]
156    pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
157        crate::background::set_jobs_shutdown_timeout(
158            self.plugin_name(),
159            self.service_name(),
160            self.plugin_version(),
161            timeout,
162        )
163    }
164
165    #[inline(always)]
166    pub fn make_service_id(&self) -> ServiceId {
167        ServiceId::new(
168            self.plugin_name(),
169            self.service_name(),
170            self.plugin_version(),
171        )
172    }
173
174    #[inline]
175    pub fn plugin_name(&self) -> &str {
176        // SAFETY: safe because lifetime is managed by borrow checker
177        unsafe { self.plugin_name.as_str() }
178    }
179
180    #[inline]
181    pub fn service_name(&self) -> &str {
182        // SAFETY: safe because lifetime is managed by borrow checker
183        unsafe { self.service_name.as_str() }
184    }
185
186    #[inline]
187    pub fn plugin_version(&self) -> &str {
188        // SAFETY: safe because lifetime is managed by borrow checker
189        unsafe { self.plugin_version.as_str() }
190    }
191}
192
193/// Unique service identifier.
194#[derive(Debug, PartialEq, Eq, Hash, Clone)]
195pub struct ServiceId {
196    pub plugin: SmolStr,
197    pub service: SmolStr,
198    pub version: SmolStr,
199}
200
201impl std::fmt::Display for ServiceId {
202    #[inline(always)]
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
205    }
206}
207
208impl ServiceId {
209    #[inline(always)]
210    pub fn new(
211        plugin: impl Into<SmolStr>,
212        service: impl Into<SmolStr>,
213        version: impl Into<SmolStr>,
214    ) -> Self {
215        Self {
216            plugin: plugin.into(),
217            service: service.into(),
218            version: version.into(),
219        }
220    }
221
222    #[inline(always)]
223    pub fn plugin(&self) -> &str {
224        &self.plugin
225    }
226
227    #[inline(always)]
228    pub fn service(&self) -> &str {
229        &self.service
230    }
231
232    #[inline(always)]
233    pub fn version(&self) -> &str {
234        &self.version
235    }
236}
237
238// --------------------------- user interface ------------------------------------------------------
239
240/// Error type, return it from your callbacks.
241pub type ErrorBox = Box<dyn Error>;
242
243pub type CallbackResult<T> = Result<T, ErrorBox>;
244
245/// Service trait. Implement it in your code to create a service.
246pub trait Service {
247    /// Use this associated type to define configuration of your service.
248    type Config: DeserializeOwned;
249
250    /// Callback to handle service configuration change once instance receives it.
251    ///
252    /// # Idempotency
253    ///
254    /// **WARNING** This callback may be called several times in a row.
255    /// It is the responsibility of the plugin author to make this function idempotent.
256    ///
257    /// # Poison
258    ///
259    /// Return an error here to poison current instance.
260    /// This will not cancel reconfiguration process,
261    /// but will mark instance as unavailable for rpc messaging.
262    ///
263    /// Instance can be "healed"
264    /// if any of `on_leader_change` or `on_config_change`
265    /// callbacks in the future return `Ok`.
266    ///
267    /// # Arguments
268    ///
269    /// * `ctx`: instance context
270    /// * `new_config`: new configuration
271    /// * `old_config`: previous defined configuration
272    fn on_config_change(
273        &mut self,
274        ctx: &PicoContext,
275        new_config: Self::Config,
276        old_config: Self::Config,
277    ) -> CallbackResult<()> {
278        _ = ctx;
279        _ = new_config;
280        _ = old_config;
281        Ok(())
282    }
283
284    /// Called at service start on every instance.
285    ///
286    /// # Idempotency
287    ///
288    /// **WARNING** This callback may be called several times in a row (without
289    /// any calls to [`Self::on_stop`]). It is the responsibility of the plugin
290    /// author to make this function idempotent.
291    ///
292    /// An error returned here abort plugin load clusterwide thus forcing
293    /// `on_stop` callback execution on every instance.
294    ///
295    /// # Arguments
296    ///
297    /// * `context`: instance context
298    /// * `config`: initial configuration
299    fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
300        _ = context;
301        _ = config;
302        Ok(())
303    }
304
305    /// Called on instance shutdown, plugin removal or failure of the initial load.
306    /// Returned error will only be logged causing no effects on plugin lifecycle.
307    ///
308    /// # Idempotency
309    ///
310    /// **WARNING** This callback may be called several times in a row.
311    /// It is the responsibility of the plugin author to make this function idempotent.
312    ///
313    /// # Arguments
314    ///
315    /// * `context`: instance context
316    fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
317        _ = context;
318        Ok(())
319    }
320
321    /// Called when replicaset leader is changed.
322    /// This callback will be called exactly on two instances - the old leader and the new one.
323    ///
324    /// # Idempotency
325    ///
326    /// **WARNING** This callback may be called several times in a row.
327    /// It is the responsibility of the plugin author to make this function idempotent.
328    ///
329    /// # Poison
330    ///
331    /// Return an error here to poison current instance.
332    /// This will not cancel reconfiguration process,
333    /// but will mark instance as unavailable for rpc messaging.
334    ///
335    /// Instance can be "healed"
336    /// if any of `on_leader_change` or `on_config_change`
337    /// callbacks in the future return `Ok`.
338    ///
339    /// # Arguments
340    ///
341    /// * `context`: instance context
342    fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
343        _ = context;
344        Ok(())
345    }
346
347    /// `on_healthcheck` is a callback
348    /// that should be called to determine if the service is functioning properly
349    /// On an error instance will be poisoned
350    ///
351    /// # Unimplemented
352    ///
353    /// **WARNING** This feature is not yet implemented.
354    /// The callback is never called.
355    /// TODO.
356    fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
357        _ = context;
358        Ok(())
359    }
360}
361
362// ---------------------------- internal implementation ----------------------------------------------
363
364/// Safe trait for sending a service trait object between ABI boundary.
365/// Define interface like [`Service`] trait but using safe types from [`abi_stable`] crate.
366#[sabi_trait]
367pub trait ServiceStable {
368    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
369    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
370    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
371    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
372    fn on_config_change(
373        &mut self,
374        ctx: &PicoContext,
375        new_config: RSlice<u8>,
376        old_config: RSlice<u8>,
377    ) -> RResult<(), ()>;
378}
379
380/// Implementation of [`ServiceStable`]
381pub struct ServiceProxy<C: DeserializeOwned> {
382    service: Box<dyn Service<Config = C>>,
383}
384
385impl<C: DeserializeOwned> ServiceProxy<C> {
386    pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
387        Self { service }
388    }
389}
390
391/// Use this function for conversion between user error and picodata internal error.
392/// This conversion forces allocations because using user-error "as-is"
393/// may lead to use-after-free errors.
394/// UAF can happen if user error points into memory allocated by dynamic lib and lives
395/// longer than dynamic lib memory (that was unmapped by system).
396fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
397    let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
398    tt_error.set_last_error();
399    RErr(())
400}
401
402macro_rules! rtry {
403    ($expr: expr) => {
404        match $expr {
405            Ok(k) => k,
406            Err(e) => return error_into_tt_error(e),
407        }
408    };
409}
410
411impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
412    fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
413        match self.service.on_health_check(context) {
414            Ok(_) => ROk(()),
415            Err(e) => error_into_tt_error(e),
416        }
417    }
418
419    fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
420        let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
421        match self.service.on_start(context, configuration) {
422            Ok(_) => ROk(()),
423            Err(e) => error_into_tt_error(e),
424        }
425    }
426
427    fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
428        match self.service.on_stop(context) {
429            Ok(_) => ROk(()),
430            Err(e) => error_into_tt_error(e),
431        }
432    }
433
434    fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
435        match self.service.on_leader_change(context) {
436            Ok(_) => ROk(()),
437            Err(e) => error_into_tt_error(e),
438        }
439    }
440
441    fn on_config_change(
442        &mut self,
443        ctx: &PicoContext,
444        new_config: RSlice<u8>,
445        old_config: RSlice<u8>,
446    ) -> RResult<(), ()> {
447        let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
448        let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
449
450        let res = self.service.on_config_change(ctx, new_config, old_config);
451        match res {
452            Ok(_) => ROk(()),
453            Err(e) => error_into_tt_error(e),
454        }
455    }
456}
457
458/// Final safe service trait object type. Can be used on both sides of ABI.
459pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
460
461// ---------------------------- Registrar ----------------------------------------------
462
463pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
464
465/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
466/// closures.
467#[sabi_trait]
468trait Factory {
469    fn make(&self) -> ServiceBox;
470}
471
472type FactoryBox = Factory_TO<'static, RBox<()>>;
473
474/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
475/// closures.
476struct FactoryImpl<S: Service + 'static> {
477    factory_fn: fn() -> S,
478}
479
480impl<S: Service + 'static> Factory for FactoryImpl<S> {
481    fn make(&self) -> ServiceBox {
482        let boxed = Box::new((self.factory_fn)());
483        ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
484    }
485}
486
487/// Service name and plugin version pair.
488type ServiceIdent = RTuple!(RString, RString);
489
490/// Config validator stable trait.
491/// The reason for the existence of this trait is that [`abi_stable`] crate doesn't support
492/// closures.
493#[sabi_trait]
494pub trait Validator {
495    /// Validate plugin configuration.
496    ///
497    /// # Idempotency
498    ///
499    /// **WARNING** This callback may be called several times in a row.
500    /// It is the responsibility of the plugin author to make this function idempotent.
501    ///
502    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
503}
504
505pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
506
507/// The reason for the existence of this struct is that [`abi_stable`] crate doesn't support
508/// closures.
509struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
510    func: fn(config: CONFIG) -> CallbackResult<()>,
511}
512
513impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
514    fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
515        let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
516        let res = (self.func)(config);
517        match res {
518            Ok(_) => ROk(()),
519            Err(e) => error_into_tt_error(e),
520        }
521    }
522}
523
524/// Registry for services. Used by picodata to create instances of services.
525#[repr(C)]
526#[derive(Default, StableAbi)]
527pub struct ServiceRegistry {
528    services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
529    validators: RHashMap<ServiceIdent, ValidatorBox>,
530}
531
532impl ServiceRegistry {
533    /// Add new service to the registry.
534    ///
535    /// # Arguments
536    ///
537    /// * `name`: service name
538    /// * `plugin_version`: version of service's plugin
539    /// * `factory`: new service instance factory
540    pub fn add<S: Service + 'static>(
541        &mut self,
542        name: &str,
543        plugin_version: &str,
544        factory: fn() -> S,
545    ) {
546        let factory_inner = FactoryImpl {
547            factory_fn: factory,
548        };
549        let factory_inner =
550            FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
551
552        let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
553
554        if self.validators.get(&ident).is_none() {
555            // default validator implementation,
556            // just check that configuration may be deserialized into `S::Config` type
557            let validator = ValidatorImpl {
558                func: |_: S::Config| Ok(()),
559            };
560            let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
561            self.validators.insert(ident.clone(), validator_stable);
562        }
563
564        let entry = self.services.entry(ident).or_default();
565        entry.push(factory_inner);
566    }
567
568    /// Create service from service name and plugin version pair.
569    /// Return an error if there is more than one factory suitable for creating a service.
570    #[allow(clippy::result_unit_err)]
571    pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
572        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
573        let maybe_factories = self.services.get(&ident);
574
575        match maybe_factories {
576            None => Ok(None),
577            Some(factories) if factories.len() == 1 => {
578                Ok(factories.first().map(|factory| factory.make()))
579            }
580            Some(_) => Err(()),
581        }
582    }
583
584    /// Return true if registry contains needle service, false elsewhere.
585    /// Return an error if there is more than one factory suitable for creating a service.
586    #[allow(clippy::result_unit_err)]
587    pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
588        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
589        match self.services.get(&ident) {
590            None => Ok(false),
591            Some(factories) if factories.len() == 1 => Ok(true),
592            Some(_) => Err(()),
593        }
594    }
595
596    /// Add validator for service configuration. Called before new configration is loaded.
597    /// Returning an error for validator will abort configuration change clusterwide.
598    ///
599    /// # Arguments
600    ///
601    /// * `service_name`: service name which configuration will be validated
602    /// * `plugin_version`: plugin version
603    /// * `validator`: validation function
604    pub fn add_config_validator<S: Service>(
605        &mut self,
606        service_name: &str,
607        plugin_version: &str,
608        validator: fn(S::Config) -> CallbackResult<()>,
609    ) where
610        S::Config: DeserializeOwned + 'static,
611    {
612        let ident =
613            ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
614
615        let validator = ValidatorImpl { func: validator };
616        let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
617        self.validators.insert(ident, validator_stable);
618    }
619
620    /// Remove config validator for service.
621    pub fn remove_config_validator(
622        &mut self,
623        service_name: &str,
624        version: &str,
625    ) -> Option<ValidatorBox> {
626        let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
627        self.validators.remove(&ident).into_option()
628    }
629
630    /// Return a registered list of (service name, plugin version) pairs.
631    pub fn dump(&self) -> Vec<(String, String)> {
632        self.services
633            .keys()
634            .map(|key| {
635                let service = key.0.to_string();
636                let version = key.1.to_string();
637                (service, version)
638            })
639            .collect()
640    }
641}