1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, ROption, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::collections::HashMap;
12use std::error::Error;
13use std::fmt::Display;
14use std::time::Duration;
15use tarantool::error::TarantoolErrorCode;
16use tarantool::error::{BoxError, IntoBoxError};
17
18#[repr(C)]
20#[derive(StableAbi, Debug)]
21pub struct PicoContext {
22 is_master: bool,
23 pub plugin_name: FfiSafeStr,
24 pub service_name: FfiSafeStr,
25 pub plugin_version: FfiSafeStr,
26}
27
28impl PicoContext {
29 #[inline]
30 pub fn new(is_master: bool) -> PicoContext {
31 Self {
32 is_master,
33 plugin_name: "<unset>".into(),
34 service_name: "<unset>".into(),
35 plugin_version: "<unset>".into(),
36 }
37 }
38
39 #[inline]
44 pub unsafe fn clone(&self) -> Self {
45 Self {
46 is_master: self.is_master,
47 plugin_name: self.plugin_name,
48 service_name: self.service_name,
49 plugin_version: self.plugin_version,
50 }
51 }
52
53 #[inline]
55 pub fn is_master(&self) -> bool {
56 self.is_master
57 }
58
59 #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
61 pub fn worker_manager(&self) -> ServiceWorkerManager {
62 ServiceWorkerManager::new(self.make_service_id())
63 }
64
65 #[inline(always)]
71 pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
72 crate::metrics::register_metrics_handler(self, callback)
73 }
74
75 #[inline(always)]
107 pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
108 where
109 F: FnOnce(background::CancellationToken) + 'static,
110 {
111 background::register_job(&self.make_service_id(), job)
112 }
113
114 #[inline(always)]
122 pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
123 where
124 F: FnOnce(background::CancellationToken) + 'static,
125 {
126 background::register_tagged_job(&self.make_service_id(), job, tag)
127 }
128
129 #[inline(always)]
141 pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
142 let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
143 if res.n_timeouts != 0 {
144 #[rustfmt::skip]
145 return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
146 }
147
148 Ok(())
149 }
150
151 #[inline(always)]
157 pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
158 crate::background::set_jobs_shutdown_timeout(
159 self.plugin_name(),
160 self.service_name(),
161 self.plugin_version(),
162 timeout,
163 )
164 }
165
166 #[inline(always)]
167 pub fn make_service_id(&self) -> ServiceId {
168 ServiceId::new(
169 self.plugin_name(),
170 self.service_name(),
171 self.plugin_version(),
172 )
173 }
174
175 #[inline]
176 pub fn plugin_name(&self) -> &str {
177 unsafe { self.plugin_name.as_str() }
179 }
180
181 #[inline]
182 pub fn service_name(&self) -> &str {
183 unsafe { self.service_name.as_str() }
185 }
186
187 #[inline]
188 pub fn plugin_version(&self) -> &str {
189 unsafe { self.plugin_version.as_str() }
191 }
192}
193
194#[derive(Debug, PartialEq, Eq, Hash, Clone)]
196pub struct ServiceId {
197 pub plugin: SmolStr,
198 pub service: SmolStr,
199 pub version: SmolStr,
200}
201
202impl std::fmt::Display for ServiceId {
203 #[inline(always)]
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
206 }
207}
208
209impl ServiceId {
210 #[inline(always)]
211 pub fn new(
212 plugin: impl Into<SmolStr>,
213 service: impl Into<SmolStr>,
214 version: impl Into<SmolStr>,
215 ) -> Self {
216 Self {
217 plugin: plugin.into(),
218 service: service.into(),
219 version: version.into(),
220 }
221 }
222
223 #[inline(always)]
224 pub fn plugin(&self) -> &str {
225 &self.plugin
226 }
227
228 #[inline(always)]
229 pub fn service(&self) -> &str {
230 &self.service
231 }
232
233 #[inline(always)]
234 pub fn version(&self) -> &str {
235 &self.version
236 }
237}
238
239pub type ErrorBox = Box<dyn Error>;
243
244pub type CallbackResult<T> = Result<T, ErrorBox>;
245
246pub trait Service {
248 type Config: DeserializeOwned;
250
251 fn on_config_change(
274 &mut self,
275 ctx: &PicoContext,
276 new_config: Self::Config,
277 old_config: Self::Config,
278 ) -> CallbackResult<()> {
279 _ = ctx;
280 _ = new_config;
281 _ = old_config;
282 Ok(())
283 }
284
285 fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
301 _ = context;
302 _ = config;
303 Ok(())
304 }
305
306 fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
318 _ = context;
319 Ok(())
320 }
321
322 fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
344 _ = context;
345 Ok(())
346 }
347
348 fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
358 _ = context;
359 Ok(())
360 }
361}
362
363#[sabi_trait]
368pub trait ServiceStable {
369 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
370 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
371 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
372 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
373 fn on_config_change(
374 &mut self,
375 ctx: &PicoContext,
376 new_config: RSlice<u8>,
377 old_config: RSlice<u8>,
378 ) -> RResult<(), ()>;
379}
380
381pub struct ServiceProxy<C: DeserializeOwned> {
383 service: Box<dyn Service<Config = C>>,
384}
385
386impl<C: DeserializeOwned> ServiceProxy<C> {
387 pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
388 Self { service }
389 }
390}
391
392fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
398 let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
399 tt_error.set_last_error();
400 RErr(())
401}
402
403macro_rules! rtry {
404 ($expr: expr) => {
405 match $expr {
406 Ok(k) => k,
407 Err(e) => return error_into_tt_error(e),
408 }
409 };
410}
411
412impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
413 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
414 match self.service.on_health_check(context) {
415 Ok(_) => ROk(()),
416 Err(e) => error_into_tt_error(e),
417 }
418 }
419
420 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
421 let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
422 match self.service.on_start(context, configuration) {
423 Ok(_) => ROk(()),
424 Err(e) => error_into_tt_error(e),
425 }
426 }
427
428 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
429 match self.service.on_stop(context) {
430 Ok(_) => ROk(()),
431 Err(e) => error_into_tt_error(e),
432 }
433 }
434
435 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
436 match self.service.on_leader_change(context) {
437 Ok(_) => ROk(()),
438 Err(e) => error_into_tt_error(e),
439 }
440 }
441
442 fn on_config_change(
443 &mut self,
444 ctx: &PicoContext,
445 new_config: RSlice<u8>,
446 old_config: RSlice<u8>,
447 ) -> RResult<(), ()> {
448 let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
449 let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
450
451 let res = self.service.on_config_change(ctx, new_config, old_config);
452 match res {
453 Ok(_) => ROk(()),
454 Err(e) => error_into_tt_error(e),
455 }
456 }
457}
458
459pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
461
462pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
465
466#[sabi_trait]
469trait Factory {
470 fn make(&self) -> ServiceBox;
471}
472
473type FactoryBox = Factory_TO<'static, RBox<()>>;
474
475struct FactoryImpl<S: Service + 'static> {
478 factory_fn: fn() -> S,
479}
480
481impl<S: Service + 'static> Factory for FactoryImpl<S> {
482 fn make(&self) -> ServiceBox {
483 let boxed = Box::new((self.factory_fn)());
484 ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
485 }
486}
487
488type ServiceIdent = RTuple!(RString, RString);
490
491#[sabi_trait]
496pub trait Validator {
497 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
505}
506
507pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
508
509struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
512 func: fn(config: CONFIG) -> CallbackResult<()>,
513}
514
515impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
516 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
517 let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
518 let res = (self.func)(config);
519 match res {
520 Ok(_) => ROk(()),
521 Err(e) => error_into_tt_error(e),
522 }
523 }
524}
525
526#[repr(C)]
528#[derive(Default, StableAbi)]
529pub struct ServiceRegistry {
530 services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
531 validators: RHashMap<ServiceIdent, ValidatorBox>,
532}
533
534impl ServiceRegistry {
535 pub fn add<S: Service + 'static>(
543 &mut self,
544 name: &str,
545 plugin_version: &str,
546 factory: fn() -> S,
547 ) {
548 let factory_inner = FactoryImpl {
549 factory_fn: factory,
550 };
551 let factory_inner =
552 FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
553
554 let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
555
556 if self.validators.get(&ident).is_none() {
557 let validator = ValidatorImpl {
560 func: |_: S::Config| Ok(()),
561 };
562 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
563 self.validators.insert(ident.clone(), validator_stable);
564 }
565
566 let entry = self.services.entry(ident).or_default();
567 entry.push(factory_inner);
568 }
569
570 #[allow(clippy::result_unit_err)]
573 #[cfg(feature = "internal")]
574 pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
575 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
576 let maybe_factories = self.services.get(&ident);
577
578 match maybe_factories {
579 None => Ok(None),
580 Some(factories) if factories.len() == 1 => {
581 Ok(factories.first().map(|factory| factory.make()))
582 }
583 Some(_) => Err(()),
584 }
585 }
586
587 #[allow(clippy::result_unit_err)]
590 #[cfg(feature = "internal")]
591 pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
592 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
593 match self.services.get(&ident) {
594 None => Ok(false),
595 Some(factories) if factories.len() == 1 => Ok(true),
596 Some(_) => Err(()),
597 }
598 }
599
600 pub fn add_config_validator<S: Service>(
609 &mut self,
610 service_name: &str,
611 plugin_version: &str,
612 validator: fn(S::Config) -> CallbackResult<()>,
613 ) where
614 S::Config: DeserializeOwned + 'static,
615 {
616 let ident =
617 ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
618
619 let validator = ValidatorImpl { func: validator };
620 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
621 self.validators.insert(ident, validator_stable);
622 }
623
624 #[cfg(feature = "internal")]
626 pub fn remove_config_validator(
627 &mut self,
628 service_name: &str,
629 version: &str,
630 ) -> Option<ValidatorBox> {
631 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
632 self.validators.remove(&ident).into_option()
633 }
634
635 #[cfg(feature = "internal")]
637 pub fn dump(&self) -> Vec<(String, String)> {
638 self.services
639 .keys()
640 .map(|key| {
641 let service = key.0.to_string();
642 let version = key.1.to_string();
643 (service, version)
644 })
645 .collect()
646 }
647}
648
649#[sabi_trait]
654pub trait MigrationContextValidator {
655 fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()>;
662
663 fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()>;
670}
671
672pub type MigrationContextValidatorBox = MigrationContextValidator_TO<'static, RBox<()>>;
673
674#[derive(Default)]
677struct MigrationContextValidatorImpl {
678 #[allow(clippy::type_complexity)]
679 context_validator: Option<fn(context: HashMap<String, String>) -> CallbackResult<()>>,
682 parameter_validator: Option<fn(name: String, value: String) -> CallbackResult<()>>,
685}
686
687impl MigrationContextValidator for MigrationContextValidatorImpl {
688 fn validate(&self, context: RHashMap<RString, RString>) -> RResult<(), ()> {
689 let Some(context_validator) = self.context_validator else {
690 return ROk(());
692 };
693 let context = context
694 .into_iter()
695 .map(|tuple| (tuple.0.into(), tuple.1.into()))
696 .collect();
697 match (context_validator)(context) {
698 Ok(_) => ROk(()),
699 Err(e) => error_into_tt_error(e),
700 }
701 }
702
703 fn validate_parameter(&self, name: RString, value: RString) -> RResult<(), ()> {
704 let Some(parameter_validator) = self.parameter_validator else {
705 return ROk(());
707 };
708 match (parameter_validator)(name.into(), value.into()) {
709 Ok(_) => ROk(()),
710 Err(e) => error_into_tt_error(e),
711 }
712 }
713}
714
715#[repr(C)]
718#[derive(StableAbi, Default)]
719pub struct MigrationValidator {
720 context_validator: ROption<MigrationContextValidatorBox>,
721}
722
723impl MigrationValidator {
724 pub fn set_context_validator(
729 &mut self,
730 context_validator_fn: fn(context: HashMap<String, String>) -> CallbackResult<()>,
731 ) {
732 match &mut self.context_validator {
733 ROption::RNone => {
734 let validator_inner = MigrationContextValidatorImpl {
735 context_validator: Some(context_validator_fn),
736 parameter_validator: None,
737 };
738 let validator_inner = MigrationContextValidatorBox::from_value(
739 validator_inner,
740 abi_stable::sabi_trait::TD_CanDowncast,
741 );
742
743 self.context_validator = ROption::RSome(validator_inner);
744 }
745 ROption::RSome(mcv) => {
746 let mcv_impl = mcv
747 .obj
748 .downcast_as_mut::<MigrationContextValidatorImpl>()
749 .expect(
750 "downcasting should not fail, because it uses the same struct for creating the obj",
751 );
752
753 mcv_impl.context_validator = Some(context_validator_fn);
754 }
755 }
756 }
757
758 pub fn set_context_parameter_validator(
763 &mut self,
764 parameter_validator_fn: fn(name: String, value: String) -> CallbackResult<()>,
765 ) {
766 match &mut self.context_validator {
767 ROption::RNone => {
768 let validator_inner = MigrationContextValidatorImpl {
769 context_validator: None,
770 parameter_validator: Some(parameter_validator_fn),
771 };
772 let validator_inner = MigrationContextValidatorBox::from_value(
773 validator_inner,
774 abi_stable::sabi_trait::TD_CanDowncast,
775 );
776
777 self.context_validator = ROption::RSome(validator_inner);
778 }
779 ROption::RSome(mcv) => {
780 let mcv_impl = mcv
781 .obj
782 .downcast_as_mut::<MigrationContextValidatorImpl>()
783 .expect(
784 "downcasting should not fail, because it uses the same struct for creating the obj",
785 );
786
787 mcv_impl.parameter_validator = Some(parameter_validator_fn);
788 }
789 }
790 }
791
792 #[cfg(feature = "internal")]
793 pub fn migration_context_validator(&mut self) -> MigrationContextValidatorBox {
794 if let ROption::RSome(mcv) = self.context_validator.take() {
795 return mcv;
796 }
797
798 let validator = MigrationContextValidatorImpl::default();
800 let validator_stable =
801 MigrationContextValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
802 validator_stable
803 }
804}
805
806pub type FnMigrationValidator = extern "C" fn(validator: &mut MigrationValidator);