1use crate::background;
2use crate::background::ServiceWorkerManager;
3use crate::error_code::ErrorCode;
4use crate::util::FfiSafeStr;
5pub use abi_stable;
6use abi_stable::pmr::{RErr, RResult, RSlice};
7use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec};
8use abi_stable::{sabi_trait, RTuple, StableAbi};
9use serde::de::DeserializeOwned;
10use smol_str::SmolStr;
11use std::error::Error;
12use std::fmt::Display;
13use std::time::Duration;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::error::{BoxError, IntoBoxError};
16
17#[repr(C)]
19#[derive(StableAbi, Debug)]
20pub struct PicoContext {
21 is_master: bool,
22 pub plugin_name: FfiSafeStr,
23 pub service_name: FfiSafeStr,
24 pub plugin_version: FfiSafeStr,
25}
26
27impl PicoContext {
28 #[inline]
29 pub fn new(is_master: bool) -> PicoContext {
30 Self {
31 is_master,
32 plugin_name: "<unset>".into(),
33 service_name: "<unset>".into(),
34 plugin_version: "<unset>".into(),
35 }
36 }
37
38 #[inline]
41 pub unsafe fn clone(&self) -> Self {
42 Self {
43 is_master: self.is_master,
44 plugin_name: self.plugin_name.clone(),
45 service_name: self.service_name.clone(),
46 plugin_version: self.plugin_version.clone(),
47 }
48 }
49
50 #[inline]
52 pub fn is_master(&self) -> bool {
53 self.is_master
54 }
55
56 #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
58 pub fn worker_manager(&self) -> ServiceWorkerManager {
59 ServiceWorkerManager::new(self.make_service_id())
60 }
61
62 #[inline(always)]
68 pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
69 crate::metrics::register_metrics_handler(self, callback)
70 }
71
72 #[inline(always)]
104 pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
105 where
106 F: FnOnce(background::CancellationToken) + 'static,
107 {
108 background::register_job(&self.make_service_id(), job)
109 }
110
111 #[inline(always)]
119 pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
120 where
121 F: FnOnce(background::CancellationToken) + 'static,
122 {
123 background::register_tagged_job(&self.make_service_id(), job, tag)
124 }
125
126 #[inline(always)]
138 pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
139 let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
140 if res.n_timeouts != 0 {
141 #[rustfmt::skip]
142 return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
143 }
144
145 Ok(())
146 }
147
148 #[inline(always)]
154 pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
155 crate::background::set_jobs_shutdown_timeout(
156 self.plugin_name(),
157 self.service_name(),
158 self.plugin_version(),
159 timeout,
160 )
161 }
162
163 #[inline(always)]
164 pub fn make_service_id(&self) -> ServiceId {
165 ServiceId::new(
166 self.plugin_name(),
167 self.service_name(),
168 self.plugin_version(),
169 )
170 }
171
172 #[inline]
173 pub fn plugin_name(&self) -> &str {
174 unsafe { self.plugin_name.as_str() }
176 }
177
178 #[inline]
179 pub fn service_name(&self) -> &str {
180 unsafe { self.service_name.as_str() }
182 }
183
184 #[inline]
185 pub fn plugin_version(&self) -> &str {
186 unsafe { self.plugin_version.as_str() }
188 }
189}
190
191#[derive(Debug, PartialEq, Eq, Hash, Clone)]
193pub struct ServiceId {
194 pub plugin: SmolStr,
195 pub service: SmolStr,
196 pub version: SmolStr,
197}
198
199impl std::fmt::Display for ServiceId {
200 #[inline(always)]
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(f, "{}.{}:v{}", self.plugin, self.service, self.version,)
203 }
204}
205
206impl ServiceId {
207 #[inline(always)]
208 pub fn new(
209 plugin: impl Into<SmolStr>,
210 service: impl Into<SmolStr>,
211 version: impl Into<SmolStr>,
212 ) -> Self {
213 Self {
214 plugin: plugin.into(),
215 service: service.into(),
216 version: version.into(),
217 }
218 }
219
220 #[inline(always)]
221 pub fn plugin(&self) -> &str {
222 &self.plugin
223 }
224
225 #[inline(always)]
226 pub fn service(&self) -> &str {
227 &self.service
228 }
229
230 #[inline(always)]
231 pub fn version(&self) -> &str {
232 &self.version
233 }
234}
235
236pub type ErrorBox = Box<dyn Error>;
240
241pub type CallbackResult<T> = Result<T, ErrorBox>;
242
243pub trait Service {
245 type Config: DeserializeOwned;
247
248 fn on_config_change(
271 &mut self,
272 ctx: &PicoContext,
273 new_config: Self::Config,
274 old_config: Self::Config,
275 ) -> CallbackResult<()> {
276 _ = ctx;
277 _ = new_config;
278 _ = old_config;
279 Ok(())
280 }
281
282 fn on_start(&mut self, context: &PicoContext, config: Self::Config) -> CallbackResult<()> {
298 _ = context;
299 _ = config;
300 Ok(())
301 }
302
303 fn on_stop(&mut self, context: &PicoContext) -> CallbackResult<()> {
315 _ = context;
316 Ok(())
317 }
318
319 fn on_leader_change(&mut self, context: &PicoContext) -> CallbackResult<()> {
341 _ = context;
342 Ok(())
343 }
344
345 fn on_health_check(&self, context: &PicoContext) -> CallbackResult<()> {
355 _ = context;
356 Ok(())
357 }
358}
359
360#[sabi_trait]
365pub trait ServiceStable {
366 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()>;
367 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()>;
368 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()>;
369 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()>;
370 fn on_config_change(
371 &mut self,
372 ctx: &PicoContext,
373 new_config: RSlice<u8>,
374 old_config: RSlice<u8>,
375 ) -> RResult<(), ()>;
376}
377
378pub struct ServiceProxy<C: DeserializeOwned> {
380 service: Box<dyn Service<Config = C>>,
381}
382
383impl<C: DeserializeOwned> ServiceProxy<C> {
384 pub fn from_service(service: Box<dyn Service<Config = C>>) -> Self {
385 Self { service }
386 }
387}
388
389fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
395 let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
396 tt_error.set_last_error();
397 RErr(())
398}
399
400macro_rules! rtry {
401 ($expr: expr) => {
402 match $expr {
403 Ok(k) => k,
404 Err(e) => return error_into_tt_error(e),
405 }
406 };
407}
408
409impl<C: DeserializeOwned> ServiceStable for ServiceProxy<C> {
410 fn on_health_check(&self, context: &PicoContext) -> RResult<(), ()> {
411 match self.service.on_health_check(context) {
412 Ok(_) => ROk(()),
413 Err(e) => error_into_tt_error(e),
414 }
415 }
416
417 fn on_start(&mut self, context: &PicoContext, configuration: RSlice<u8>) -> RResult<(), ()> {
418 let configuration: C = rtry!(rmp_serde::from_slice(configuration.as_slice()));
419 match self.service.on_start(context, configuration) {
420 Ok(_) => ROk(()),
421 Err(e) => error_into_tt_error(e),
422 }
423 }
424
425 fn on_stop(&mut self, context: &PicoContext) -> RResult<(), ()> {
426 match self.service.on_stop(context) {
427 Ok(_) => ROk(()),
428 Err(e) => error_into_tt_error(e),
429 }
430 }
431
432 fn on_leader_change(&mut self, context: &PicoContext) -> RResult<(), ()> {
433 match self.service.on_leader_change(context) {
434 Ok(_) => ROk(()),
435 Err(e) => error_into_tt_error(e),
436 }
437 }
438
439 fn on_config_change(
440 &mut self,
441 ctx: &PicoContext,
442 new_config: RSlice<u8>,
443 old_config: RSlice<u8>,
444 ) -> RResult<(), ()> {
445 let new_config: C = rtry!(rmp_serde::from_slice(new_config.as_slice()));
446 let old_config: C = rtry!(rmp_serde::from_slice(old_config.as_slice()));
447
448 let res = self.service.on_config_change(ctx, new_config, old_config);
449 match res {
450 Ok(_) => ROk(()),
451 Err(e) => error_into_tt_error(e),
452 }
453 }
454}
455
456pub type ServiceBox = ServiceStable_TO<'static, RBox<()>>;
458
459pub type FnServiceRegistrar = extern "C" fn(registry: &mut ServiceRegistry);
462
463#[sabi_trait]
466trait Factory {
467 fn make(&self) -> ServiceBox;
468}
469
470type FactoryBox = Factory_TO<'static, RBox<()>>;
471
472struct FactoryImpl<S: Service + 'static> {
475 factory_fn: fn() -> S,
476}
477
478impl<S: Service + 'static> Factory for FactoryImpl<S> {
479 fn make(&self) -> ServiceBox {
480 let boxed = Box::new((self.factory_fn)());
481 ServiceBox::from_value(ServiceProxy::from_service(boxed), sabi_trait::TD_Opaque)
482 }
483}
484
485type ServiceIdent = RTuple!(RString, RString);
487
488#[sabi_trait]
492pub trait Validator {
493 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()>;
501}
502
503pub type ValidatorBox = Validator_TO<'static, RBox<()>>;
504
505struct ValidatorImpl<CONFIG: DeserializeOwned + 'static> {
508 func: fn(config: CONFIG) -> CallbackResult<()>,
509}
510
511impl<C: DeserializeOwned> Validator for ValidatorImpl<C> {
512 fn validate(&self, config: RSlice<u8>) -> RResult<(), ()> {
513 let config: C = rtry!(rmp_serde::from_slice(config.as_slice()));
514 let res = (self.func)(config);
515 match res {
516 Ok(_) => ROk(()),
517 Err(e) => error_into_tt_error(e),
518 }
519 }
520}
521
522#[repr(C)]
524#[derive(Default, StableAbi)]
525pub struct ServiceRegistry {
526 services: RHashMap<ServiceIdent, RVec<FactoryBox>>,
527 validators: RHashMap<ServiceIdent, ValidatorBox>,
528}
529
530impl ServiceRegistry {
531 pub fn add<S: Service + 'static>(
539 &mut self,
540 name: &str,
541 plugin_version: &str,
542 factory: fn() -> S,
543 ) {
544 let factory_inner = FactoryImpl {
545 factory_fn: factory,
546 };
547 let factory_inner =
548 FactoryBox::from_value(factory_inner, abi_stable::sabi_trait::TD_Opaque);
549
550 let ident = ServiceIdent::from((RString::from(name), RString::from(plugin_version)));
551
552 if self.validators.get(&ident).is_none() {
553 let validator = ValidatorImpl {
556 func: |_: S::Config| Ok(()),
557 };
558 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
559 self.validators.insert(ident.clone(), validator_stable);
560 }
561
562 let entry = self.services.entry(ident).or_default();
563 entry.push(factory_inner);
564 }
565
566 pub fn make(&self, service_name: &str, version: &str) -> Result<Option<ServiceBox>, ()> {
569 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
570 let maybe_factories = self.services.get(&ident);
571
572 match maybe_factories {
573 None => Ok(None),
574 Some(factories) if factories.len() == 1 => {
575 Ok(factories.first().map(|factory| factory.make()))
576 }
577 Some(_) => Err(()),
578 }
579 }
580
581 pub fn contains(&self, service_name: &str, version: &str) -> Result<bool, ()> {
584 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
585 match self.services.get(&ident) {
586 None => Ok(false),
587 Some(factories) if factories.len() == 1 => Ok(true),
588 Some(_) => Err(()),
589 }
590 }
591
592 pub fn add_config_validator<S: Service>(
601 &mut self,
602 service_name: &str,
603 plugin_version: &str,
604 validator: fn(S::Config) -> CallbackResult<()>,
605 ) where
606 S::Config: DeserializeOwned + 'static,
607 {
608 let ident =
609 ServiceIdent::from((RString::from(service_name), RString::from(plugin_version)));
610
611 let validator = ValidatorImpl { func: validator };
612 let validator_stable = ValidatorBox::from_value(validator, sabi_trait::TD_Opaque);
613 self.validators.insert(ident, validator_stable);
614 }
615
616 pub fn remove_config_validator(
618 &mut self,
619 service_name: &str,
620 version: &str,
621 ) -> Option<ValidatorBox> {
622 let ident = ServiceIdent::from((RString::from(service_name), RString::from(version)));
623 self.validators.remove(&ident).into_option()
624 }
625
626 pub fn dump(&self) -> Vec<(String, String)> {
628 self.services
629 .keys()
630 .map(|key| {
631 let service = key.0.to_string();
632 let version = key.1.to_string();
633 (service, version)
634 })
635 .collect()
636 }
637}