1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::error::Error;
12use std::fmt::Display;
13use std::time::Duration;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::error::{BoxError, IntoBoxError};
16
17#[repr(C)]
19#[derive(StableAbi, Debug)]
20pub struct PicoContext {
21 is_master: bool,
22 pub plugin_name: FfiSafeStr,
23 pub service_name: FfiSafeStr,
24 pub plugin_version: FfiSafeStr,
25}
26
27impl PicoContext {
28 #[inline]
29 pub fn new(is_master: bool) -> PicoContext {
30 Self {
31 is_master,
32 plugin_name: "<unset>".into(),
33 service_name: "<unset>".into(),
34 plugin_version: "<unset>".into(),
35 }
36 }
37
38 #[inline]
43 pub unsafe fn clone(&self) -> Self {
44 Self {
45 is_master: self.is_master,
46 plugin_name: self.plugin_name,
47 service_name: self.service_name,
48 plugin_version: self.plugin_version,
49 }
50 }
51
52 #[inline]
54 pub fn is_master(&self) -> bool {
55 self.is_master
56 }
57
58 #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
60 pub fn worker_manager(&self) -> ServiceWorkerManager {
61 ServiceWorkerManager::new(self.make_service_id())
62 }
63
64 #[inline(always)]
70 pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
71 crate::metrics::register_metrics_handler(self, callback)
72 }
73
74 #[inline(always)]
106 pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
107 where
108 F: FnOnce(background::CancellationToken) + 'static,
109 {
110 background::register_job(&self.make_service_id(), job)
111 }
112
113 #[inline(always)]
121 pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
122 where
123 F: FnOnce(background::CancellationToken) + 'static,
124 {
125 background::register_tagged_job(&self.make_service_id(), job, tag)
126 }
127
128 #[inline(always)]
140 pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
141 let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
142 if res.n_timeouts != 0 {
143 #[rustfmt::skip]
144 return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
145 }
146
147 Ok(())
148 }
149
150 #[inline(always)]
156 pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
157 crate::background::set_jobs_shutdown_timeout(
158 self.plugin_name(),
159 self.service_name(),
160 self.plugin_version(),
161 timeout,
162 )
163 }
164
165 #[inline(always)]
166 pub fn make_service_id(&self) -> ServiceId {
167 ServiceId::new(
168 self.plugin_name(),
169 self.service_name(),
170 self.plugin_version(),
171 )
172 }
173
174 #[inline]
175 pub fn plugin_name(&self) -> &str {
176 unsafe { self.plugin_name.as_str() }
178 }
179
180 #[inline]
181 pub fn service_name(&self) -> &str {
182 unsafe { self.service_name.as_str() }
184 }
185
186 #[inline]
187 pub fn plugin_version(&self) -> &str {
188 unsafe { self.plugin_version.as_str() }
190 }
191}
192
193#[derive(Debug, PartialEq, Eq, Hash, Clone)]
195pub struct ServiceId {
196 pub plugin: SmolStr,
197 pub service: SmolStr,
198 pub version: SmolStr,
199}
200
201impl std::fmt::Display for ServiceId {
202 #[inline(always)]
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
205 }
206}
207
208impl ServiceId {
209 #[inline(always)]
210 pub fn new(
211 plugin: impl Into<SmolStr>,
212 service: impl Into<SmolStr>,
213 version: impl Into<SmolStr>,
214 ) -> Self {
215 Self {
216 plugin: plugin.into(),
217 service: service.into(),
218 version: version.into(),
219 }
220 }
221
222 #[inline(always)]
223 pub fn plugin(&self) -> &str {
224 &self.plugin
225 }
226
227 #[inline(always)]
228 pub fn service(&self) -> &str {
229 &self.service
230 }
231
232 #[inline(always)]
233 pub fn version(&self) -> &str {
234 &self.version
235 }
236}
237
238pub type ErrorBox = Box<dyn Error>;
242
243pub type CallbackResult<T> = Result<T, ErrorBox>;
244
245pub trait Service {
247 type Config: DeserializeOwned;
249
250 fn on_config_change(
273 &mut self,
274 ctx: &PicoContext,
275 new_config: Self::Config,
276 old_config: Self::Config,
277 ) -> CallbackResult<()> {
278 _ = ctx;
279 _ = new_config;
280 _ = old_config;
281 Ok(())
282 }
283
284 fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
300 _ = context;
301 _ = config;
302 Ok(())
303 }
304
305 fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
317 _ = context;
318 Ok(())
319 }
320
321 fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
343 _ = context;
344 Ok(())
345 }
346
347 fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
357 _ = context;
358 Ok(())
359 }
360}
361
362#[sabi_trait]
367pub trait ServiceStable {
368 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
369 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
370 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
371 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
372 fn on_config_change(
373 &mut self,
374 ctx: &PicoContext,
375 new_config: RSlice<u8>,
376 old_config: RSlice<u8>,
377 ) -> RResult<(), ()>;
378}
379
380pub struct ServiceProxy<C: DeserializeOwned> {
382 service: Box<dyn Service<Config = C>>,
383}
384
385impl<C: DeserializeOwned> ServiceProxy<C> {
386 pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
387 Self { service }
388 }
389}
390
391fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
397 let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
398 tt_error.set_last_error();
399 RErr(())
400}
401
402macro_rules! rtry {
403 ($expr: expr) => {
404 match $expr {
405 Ok(k) => k,
406 Err(e) => return error_into_tt_error(e),
407 }
408 };
409}
410
411impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
412 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
413 match self.service.on_health_check(context) {
414 Ok(_) => ROk(()),
415 Err(e) => error_into_tt_error(e),
416 }
417 }
418
419 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
420 let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
421 match self.service.on_start(context, configuration) {
422 Ok(_) => ROk(()),
423 Err(e) => error_into_tt_error(e),
424 }
425 }
426
427 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
428 match self.service.on_stop(context) {
429 Ok(_) => ROk(()),
430 Err(e) => error_into_tt_error(e),
431 }
432 }
433
434 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
435 match self.service.on_leader_change(context) {
436 Ok(_) => ROk(()),
437 Err(e) => error_into_tt_error(e),
438 }
439 }
440
441 fn on_config_change(
442 &mut self,
443 ctx: &PicoContext,
444 new_config: RSlice<u8>,
445 old_config: RSlice<u8>,
446 ) -> RResult<(), ()> {
447 let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
448 let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
449
450 let res = self.service.on_config_change(ctx, new_config, old_config);
451 match res {
452 Ok(_) => ROk(()),
453 Err(e) => error_into_tt_error(e),
454 }
455 }
456}
457
458pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
460
461pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
464
465#[sabi_trait]
468trait Factory {
469 fn make(&self) -> ServiceBox;
470}
471
472type FactoryBox = Factory_TO<'static, RBox<()>>;
473
474struct FactoryImpl<S: Service + 'static> {
477 factory_fn: fn() -> S,
478}
479
480impl<S: Service + 'static> Factory for FactoryImpl<S> {
481 fn make(&self) -> ServiceBox {
482 let boxed = Box::new((self.factory_fn)());
483 ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
484 }
485}
486
487type ServiceIdent = RTuple!(RString, RString);
489
490#[sabi_trait]
494pub trait Validator {
495 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
503}
504
505pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
506
507struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
510 func: fn(config: CONFIG) -> CallbackResult<()>,
511}
512
513impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
514 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
515 let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
516 let res = (self.func)(config);
517 match res {
518 Ok(_) => ROk(()),
519 Err(e) => error_into_tt_error(e),
520 }
521 }
522}
523
524#[repr(C)]
526#[derive(Default, StableAbi)]
527pub struct ServiceRegistry {
528 services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
529 validators: RHashMap<ServiceIdent, ValidatorBox>,
530}
531
532impl ServiceRegistry {
533 pub fn add<S: Service + 'static>(
541 &mut self,
542 name: &str,
543 plugin_version: &str,
544 factory: fn() -> S,
545 ) {
546 let factory_inner = FactoryImpl {
547 factory_fn: factory,
548 };
549 let factory_inner =
550 FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
551
552 let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
553
554 if self.validators.get(&ident).is_none() {
555 let validator = ValidatorImpl {
558 func: |_: S::Config| Ok(()),
559 };
560 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
561 self.validators.insert(ident.clone(), validator_stable);
562 }
563
564 let entry = self.services.entry(ident).or_default();
565 entry.push(factory_inner);
566 }
567
568 #[allow(clippy::result_unit_err)]
571 pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
572 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
573 let maybe_factories = self.services.get(&ident);
574
575 match maybe_factories {
576 None => Ok(None),
577 Some(factories) if factories.len() == 1 => {
578 Ok(factories.first().map(|factory| factory.make()))
579 }
580 Some(_) => Err(()),
581 }
582 }
583
584 #[allow(clippy::result_unit_err)]
587 pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
588 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
589 match self.services.get(&ident) {
590 None => Ok(false),
591 Some(factories) if factories.len() == 1 => Ok(true),
592 Some(_) => Err(()),
593 }
594 }
595
596 pub fn add_config_validator<S: Service>(
605 &mut self,
606 service_name: &str,
607 plugin_version: &str,
608 validator: fn(S::Config) -> CallbackResult<()>,
609 ) where
610 S::Config: DeserializeOwned + 'static,
611 {
612 let ident =
613 ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
614
615 let validator = ValidatorImpl { func: validator };
616 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
617 self.validators.insert(ident, validator_stable);
618 }
619
620 pub fn remove_config_validator(
622 &mut self,
623 service_name: &str,
624 version: &str,
625 ) -> Option<ValidatorBox> {
626 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
627 self.validators.remove(&ident).into_option()
628 }
629
630 pub fn dump(&self) -> Vec<(String, String)> {
632 self.services
633 .keys()
634 .map(|key| {
635 let service = key.0.to_string();
636 let version = key.1.to_string();
637 (service, version)
638 })
639 .collect()
640 }
641}