1use std::collections::HashMap;
28use std::fmt;
29
30use crate::action::{
31 AckAction, ActionPrimitive, ActionRegistry, ActionValidationError, ActionValueType,
32 AnnotateAction, ContextSetBoolAction, ContextSetNumberAction, ContextSetSeriesAction,
33 ContextSetStringAction,
34};
35use crate::cluster::{
36 Cardinality, InputMetadata, OutputMetadata, ParameterMetadata, ParameterType, ParameterValue,
37 PrimitiveCatalog, PrimitiveKind, PrimitiveMetadata, PrimitiveVersionIndex, ValueType, Version,
38};
39use crate::common;
40use crate::common::ValidationError;
41use crate::compute::implementations::{
42 Abs, Add, And, Append, ConstBool, ConstNumber, Divide, Eq, Gt, Gte, Len, Lt, Lte, Max, Mean,
43 Min, Multiply, Negate, Neq, Not, Or, SafeDivide, Select, SelectBool, Subtract, Sum, Window,
44};
45use crate::compute::{
46 ComputePrimitive, ComputePrimitiveManifest, PrimitiveRegistry as ComputeRegistry,
47};
48use crate::source::{
49 BooleanSource, ContextBoolSource, ContextNumberSource, ContextSeriesSource,
50 ContextStringSource, NumberSource, SourcePrimitive, SourceRegistry, SourceValidationError,
51 StringSource,
52};
53use crate::trigger::{
54 EmitIfEventAndTrue, EmitIfTrue, TriggerPrimitive, TriggerRegistry, TriggerValidationError,
55 TriggerValueType,
56};
57
58#[derive(Debug)]
59#[non_exhaustive]
60pub enum CoreRegistrationError {
61 Source(SourceValidationError),
62 Compute(ValidationError),
63 Trigger(TriggerValidationError),
64 Action(ActionValidationError),
65}
66
67impl fmt::Display for CoreRegistrationError {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 match self {
70 Self::Source(err) => write!(f, "source registration failed: {err}"),
71 Self::Compute(err) => write!(f, "compute registration failed: {err}"),
72 Self::Trigger(err) => write!(f, "trigger registration failed: {err}"),
73 Self::Action(err) => write!(f, "action registration failed: {err}"),
74 }
75 }
76}
77
78impl std::error::Error for CoreRegistrationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 match self {
81 Self::Source(err) => Some(err),
82 Self::Compute(err) => Some(err),
83 Self::Trigger(err) => Some(err),
84 Self::Action(err) => Some(err),
85 }
86 }
87}
88
89pub struct CoreRegistries {
90 pub sources: SourceRegistry,
91 pub computes: ComputeRegistry,
92 pub triggers: TriggerRegistry,
93 pub actions: ActionRegistry,
94}
95
96impl CoreRegistries {
97 pub fn new(
98 sources: SourceRegistry,
99 computes: ComputeRegistry,
100 triggers: TriggerRegistry,
101 actions: ActionRegistry,
102 ) -> Self {
103 Self {
104 sources,
105 computes,
106 triggers,
107 actions,
108 }
109 }
110}
111
112fn core_source_primitives() -> Vec<Box<dyn SourcePrimitive>> {
113 vec![
114 Box::new(NumberSource::new()),
115 Box::new(ContextNumberSource::new()),
116 Box::new(ContextSeriesSource::new()),
117 Box::new(ContextBoolSource::new()),
118 Box::new(ContextStringSource::new()),
119 Box::new(BooleanSource::new()),
120 Box::new(StringSource::new()),
121 ]
122}
123
124fn core_compute_primitives() -> Vec<Box<dyn ComputePrimitive>> {
125 vec![
126 Box::new(ConstNumber::new()),
127 Box::new(ConstBool::new()),
128 Box::new(Abs::new()),
129 Box::new(Add::new()),
130 Box::new(Subtract::new()),
131 Box::new(Multiply::new()),
132 Box::new(Divide::new()),
133 Box::new(SafeDivide::new()),
134 Box::new(Negate::new()),
135 Box::new(Gt::new()),
136 Box::new(Gte::new()),
137 Box::new(Lt::new()),
138 Box::new(Lte::new()),
139 Box::new(Min::new()),
140 Box::new(Max::new()),
141 Box::new(Eq::new()),
142 Box::new(Neq::new()),
143 Box::new(And::new()),
144 Box::new(Or::new()),
145 Box::new(Not::new()),
146 Box::new(Select::new()),
147 Box::new(SelectBool::new()),
148 Box::new(Append::new()),
149 Box::new(Window::new()),
150 Box::new(Mean::new()),
151 Box::new(Sum::new()),
152 Box::new(Len::new()),
153 ]
154}
155
156fn core_trigger_primitives() -> Vec<Box<dyn TriggerPrimitive>> {
157 vec![
158 Box::new(EmitIfTrue::new()),
159 Box::new(EmitIfEventAndTrue::new()),
160 ]
161}
162
163fn core_action_primitives() -> Vec<Box<dyn ActionPrimitive>> {
164 vec![
165 Box::new(AckAction::new()),
166 Box::new(AnnotateAction::new()),
167 Box::new(ContextSetNumberAction::new()),
168 Box::new(ContextSetSeriesAction::new()),
169 Box::new(ContextSetBoolAction::new()),
170 Box::new(ContextSetStringAction::new()),
171 ]
172}
173
174struct PrimitiveInventory {
175 sources: Vec<Box<dyn SourcePrimitive>>,
176 computes: Vec<Box<dyn ComputePrimitive>>,
177 triggers: Vec<Box<dyn TriggerPrimitive>>,
178 actions: Vec<Box<dyn ActionPrimitive>>,
179}
180
181impl PrimitiveInventory {
182 fn with_core() -> Self {
183 Self {
184 sources: core_source_primitives(),
185 computes: core_compute_primitives(),
186 triggers: core_trigger_primitives(),
187 actions: core_action_primitives(),
188 }
189 }
190}
191
192pub struct CatalogBuilder {
193 inventory: PrimitiveInventory,
194}
195
196impl CatalogBuilder {
197 pub fn new() -> Self {
198 Self {
199 inventory: PrimitiveInventory::with_core(),
200 }
201 }
202
203 pub fn add_source(&mut self, primitive: Box<dyn SourcePrimitive>) -> &mut Self {
204 self.inventory.sources.push(primitive);
205 self
206 }
207
208 pub fn add_compute(&mut self, primitive: Box<dyn ComputePrimitive>) -> &mut Self {
209 self.inventory.computes.push(primitive);
210 self
211 }
212
213 pub fn add_trigger(&mut self, primitive: Box<dyn TriggerPrimitive>) -> &mut Self {
214 self.inventory.triggers.push(primitive);
215 self
216 }
217
218 pub fn add_action(&mut self, primitive: Box<dyn ActionPrimitive>) -> &mut Self {
219 self.inventory.actions.push(primitive);
220 self
221 }
222
223 pub fn build(self) -> Result<(CoreRegistries, CorePrimitiveCatalog), CoreRegistrationError> {
224 build_from_inventory(self.inventory)
225 }
226}
227
228impl Default for CatalogBuilder {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234pub fn build_core() -> Result<(CoreRegistries, CorePrimitiveCatalog), CoreRegistrationError> {
235 CatalogBuilder::new().build()
236}
237
238fn build_from_inventory(
239 inventory: PrimitiveInventory,
240) -> Result<(CoreRegistries, CorePrimitiveCatalog), CoreRegistrationError> {
241 let PrimitiveInventory {
242 sources: source_primitives,
243 computes: compute_primitives,
244 triggers: trigger_primitives,
245 actions: action_primitives,
246 } = inventory;
247
248 let mut sources = SourceRegistry::new();
249 let mut computes = ComputeRegistry::new();
250 let mut triggers = TriggerRegistry::new();
251 let mut actions = ActionRegistry::new();
252 let mut catalog = CorePrimitiveCatalog::new();
253
254 for primitive in source_primitives {
255 let manifest = primitive.manifest().clone();
256 sources
257 .register(primitive)
258 .map_err(CoreRegistrationError::Source)?;
259 catalog.register_source(manifest);
260 }
261
262 for primitive in compute_primitives {
263 let manifest = primitive.manifest().clone();
264 computes
265 .register(primitive)
266 .map_err(CoreRegistrationError::Compute)?;
267 catalog
268 .register_compute(manifest)
269 .map_err(CoreRegistrationError::Compute)?;
270 }
271
272 for primitive in trigger_primitives {
273 let manifest = primitive.manifest().clone();
274 triggers
275 .register(primitive)
276 .map_err(CoreRegistrationError::Trigger)?;
277 catalog.register_trigger(manifest);
278 }
279
280 for primitive in action_primitives {
281 let manifest = primitive.manifest().clone();
282 actions
283 .register(primitive)
284 .map_err(CoreRegistrationError::Action)?;
285 catalog.register_action(manifest);
286 }
287
288 let registries = CoreRegistries::new(sources, computes, triggers, actions);
289 debug_assert_registry_catalog_key_parity(®istries, &catalog);
290
291 Ok((registries, catalog))
292}
293
294pub fn core_registries() -> Result<CoreRegistries, CoreRegistrationError> {
295 let (registries, _catalog) = build_core()?;
296 Ok(registries)
297}
298
299pub struct CorePrimitiveCatalog {
300 metadata: HashMap<(String, Version), PrimitiveMetadata>,
301}
302
303impl CorePrimitiveCatalog {
304 pub(crate) fn new() -> Self {
305 Self {
306 metadata: HashMap::new(),
307 }
308 }
309
310 pub(crate) fn register_compute(
311 &mut self,
312 manifest: ComputePrimitiveManifest,
313 ) -> Result<(), ValidationError> {
314 let inputs = manifest
315 .inputs
316 .into_iter()
317 .map(|i| InputMetadata {
318 name: i.name,
319 value_type: map_common_value_type(i.value_type),
320 required: i.required,
321 })
322 .collect();
323
324 let outputs = manifest
325 .outputs
326 .into_iter()
327 .map(|o| {
328 (
329 o.name,
330 OutputMetadata {
331 value_type: map_common_value_type(o.value_type),
332 cardinality: Cardinality::Single,
333 },
334 )
335 })
336 .collect();
337
338 let mut parameters = Vec::new();
341 for p in manifest.parameters {
342 let param_value_type = p.value_type.clone();
343 let ty = map_compute_param_type(p.value_type).ok_or_else(|| {
344 ValidationError::UnsupportedParameterType {
345 primitive: manifest.id.clone(),
346 version: manifest.version.clone(),
347 parameter: p.name.clone(),
348 got: param_value_type.clone(),
349 }
350 })?;
351 let default = match p.default {
352 Some(v) => Some(map_compute_param_value(v).map_err(|_| {
353 ValidationError::UnsupportedParameterType {
354 primitive: manifest.id.clone(),
355 version: manifest.version.clone(),
356 parameter: p.name.clone(),
357 got: param_value_type,
358 }
359 })?),
360 None => None,
361 };
362 parameters.push(ParameterMetadata {
363 name: p.name,
364 ty,
365 default,
366 required: p.required,
367 });
368 }
369
370 self.metadata.insert(
371 (manifest.id.clone(), manifest.version.clone()),
372 PrimitiveMetadata {
373 kind: PrimitiveKind::Compute,
374 inputs,
375 outputs,
376 parameters,
377 },
378 );
379 Ok(())
380 }
381
382 pub(crate) fn register_trigger(&mut self, manifest: crate::trigger::TriggerPrimitiveManifest) {
383 let inputs = manifest
384 .inputs
385 .into_iter()
386 .map(|i| InputMetadata {
387 name: i.name,
388 value_type: map_trigger_value_type(i.value_type),
389 required: i.required,
390 })
391 .collect();
392
393 let outputs = manifest
394 .outputs
395 .into_iter()
396 .map(|o| {
397 (
398 o.name,
399 OutputMetadata {
400 value_type: map_trigger_value_type(o.value_type),
401 cardinality: Cardinality::Single,
402 },
403 )
404 })
405 .collect();
406
407 let parameters = manifest
409 .parameters
410 .into_iter()
411 .map(|p| ParameterMetadata {
412 name: p.name,
413 ty: map_trigger_param_type(p.value_type),
414 default: p.default.map(map_trigger_param_value),
415 required: p.required,
416 })
417 .collect();
418
419 self.metadata.insert(
420 (manifest.id.clone(), manifest.version.clone()),
421 PrimitiveMetadata {
422 kind: PrimitiveKind::Trigger,
423 inputs,
424 outputs,
425 parameters,
426 },
427 );
428 }
429
430 pub(crate) fn register_source(&mut self, manifest: crate::source::SourcePrimitiveManifest) {
431 let inputs = vec![];
432 let outputs = manifest
433 .outputs
434 .into_iter()
435 .map(|o| {
436 (
437 o.name,
438 OutputMetadata {
439 value_type: map_common_value_type(o.value_type),
440 cardinality: Cardinality::Single,
441 },
442 )
443 })
444 .collect();
445
446 let parameters = manifest
448 .parameters
449 .into_iter()
450 .map(|p| {
451 let required = p.default.is_none();
452 ParameterMetadata {
453 name: p.name,
454 ty: map_source_param_type(p.value_type),
455 default: p.default.map(map_source_param_value),
456 required,
457 }
458 })
459 .collect();
460
461 self.metadata.insert(
462 (manifest.id.clone(), manifest.version.clone()),
463 PrimitiveMetadata {
464 kind: PrimitiveKind::Source,
465 inputs,
466 outputs,
467 parameters,
468 },
469 );
470 }
471
472 pub(crate) fn register_action(&mut self, manifest: crate::action::ActionPrimitiveManifest) {
473 let inputs = manifest
474 .inputs
475 .into_iter()
476 .map(|i| InputMetadata {
477 name: i.name,
478 value_type: map_action_value_type(i.value_type),
479 required: i.required,
480 })
481 .collect();
482
483 let outputs = manifest
484 .outputs
485 .into_iter()
486 .map(|o| {
487 (
488 o.name,
489 OutputMetadata {
490 value_type: map_action_value_type(o.value_type),
491 cardinality: Cardinality::Single,
492 },
493 )
494 })
495 .collect();
496
497 let parameters = manifest
499 .parameters
500 .into_iter()
501 .map(|p| ParameterMetadata {
502 name: p.name,
503 ty: map_action_param_type(p.value_type),
504 default: p.default.map(map_action_param_value),
505 required: p.required,
506 })
507 .collect();
508
509 self.metadata.insert(
510 (manifest.id.clone(), manifest.version.clone()),
511 PrimitiveMetadata {
512 kind: PrimitiveKind::Action,
513 inputs,
514 outputs,
515 parameters,
516 },
517 );
518 }
519
520 pub(crate) fn keys_for_kind(&self, kind: PrimitiveKind) -> Vec<(String, String)> {
521 let mut keys: Vec<(String, String)> = self
522 .metadata
523 .iter()
524 .filter_map(|((id, version), meta)| {
525 if meta.kind == kind {
526 Some((id.clone(), version.to_string()))
527 } else {
528 None
529 }
530 })
531 .collect();
532 keys.sort();
533 keys
534 }
535}
536
537impl PrimitiveCatalog for CorePrimitiveCatalog {
538 fn get(&self, id: &str, version: &Version) -> Option<PrimitiveMetadata> {
539 self.metadata
540 .get(&(id.to_string(), version.clone()))
541 .cloned()
542 }
543}
544
545impl PrimitiveVersionIndex for CorePrimitiveCatalog {
546 fn available_versions(&self, id: &str) -> Vec<Version> {
547 let mut versions = self
548 .metadata
549 .keys()
550 .filter_map(|(candidate_id, version)| {
551 if candidate_id == id {
552 Some(version.clone())
553 } else {
554 None
555 }
556 })
557 .collect::<Vec<_>>();
558 versions.sort();
559 versions
560 }
561}
562
563fn debug_assert_registry_catalog_key_parity(
564 registries: &CoreRegistries,
565 catalog: &CorePrimitiveCatalog,
566) {
567 debug_assert_eq!(
568 registries.sources.keys(),
569 catalog.keys_for_kind(PrimitiveKind::Source),
570 "registry/catalog key drift for sources"
571 );
572 debug_assert_eq!(
573 registries.computes.keys(),
574 catalog.keys_for_kind(PrimitiveKind::Compute),
575 "registry/catalog key drift for computes"
576 );
577 debug_assert_eq!(
578 registries.triggers.keys(),
579 catalog.keys_for_kind(PrimitiveKind::Trigger),
580 "registry/catalog key drift for triggers"
581 );
582 debug_assert_eq!(
583 registries.actions.keys(),
584 catalog.keys_for_kind(PrimitiveKind::Action),
585 "registry/catalog key drift for actions"
586 );
587}
588
589pub fn build_core_catalog() -> CorePrimitiveCatalog {
590 let (_registries, catalog) = build_core().expect("core registries/catalog should build");
591 catalog
592}
593
594fn map_common_value_type(value_type: common::ValueType) -> ValueType {
595 match value_type {
596 common::ValueType::Number => ValueType::Number,
597 common::ValueType::Series => ValueType::Series,
598 common::ValueType::Bool => ValueType::Bool,
599 common::ValueType::String => ValueType::String,
600 }
601}
602
603fn map_trigger_value_type(value_type: TriggerValueType) -> ValueType {
604 match value_type {
605 TriggerValueType::Number => ValueType::Number,
606 TriggerValueType::Series => ValueType::Series,
607 TriggerValueType::Bool => ValueType::Bool,
608 TriggerValueType::Event => ValueType::Event,
609 }
610}
611
612fn map_action_value_type(value_type: ActionValueType) -> ValueType {
613 match value_type {
614 ActionValueType::Event => ValueType::Event,
615 ActionValueType::Number => ValueType::Number,
616 ActionValueType::Series => ValueType::Series,
617 ActionValueType::Bool => ValueType::Bool,
618 ActionValueType::String => ValueType::String,
619 }
620}
621
622fn map_source_param_type(ty: crate::source::ParameterType) -> ParameterType {
625 match ty {
626 crate::source::ParameterType::Int => ParameterType::Int,
627 crate::source::ParameterType::Number => ParameterType::Number,
628 crate::source::ParameterType::Bool => ParameterType::Bool,
629 crate::source::ParameterType::String => ParameterType::String,
630 crate::source::ParameterType::Enum => ParameterType::Enum,
631 }
632}
633
634fn map_source_param_value(val: crate::source::ParameterValue) -> ParameterValue {
635 match val {
636 crate::source::ParameterValue::Int(i) => ParameterValue::Int(i),
637 crate::source::ParameterValue::Number(n) => ParameterValue::Number(n),
638 crate::source::ParameterValue::Bool(b) => ParameterValue::Bool(b),
639 crate::source::ParameterValue::String(s) => ParameterValue::String(s),
640 crate::source::ParameterValue::Enum(e) => ParameterValue::Enum(e),
641 }
642}
643
644fn map_compute_param_type(ty: common::ValueType) -> Option<ParameterType> {
646 match ty {
647 common::ValueType::Number => Some(ParameterType::Number),
648 common::ValueType::Series => None, common::ValueType::Bool => Some(ParameterType::Bool),
650 common::ValueType::String => None, }
652}
653
654fn map_compute_param_value(val: common::Value) -> Result<ParameterValue, &'static str> {
655 match val {
656 common::Value::Number(n) => Ok(ParameterValue::Number(n)),
657 common::Value::Series(_) => {
660 Err("X.10: Series parameter type should be rejected at registration")
661 }
662 common::Value::Bool(b) => Ok(ParameterValue::Bool(b)),
663 common::Value::String(s) => Ok(ParameterValue::String(s)),
666 }
667}
668
669fn map_trigger_param_type(ty: crate::trigger::ParameterType) -> ParameterType {
670 match ty {
671 crate::trigger::ParameterType::Int => ParameterType::Int,
672 crate::trigger::ParameterType::Number => ParameterType::Number,
673 crate::trigger::ParameterType::Bool => ParameterType::Bool,
674 crate::trigger::ParameterType::String => ParameterType::String,
675 crate::trigger::ParameterType::Enum => ParameterType::Enum,
676 }
677}
678
679fn map_trigger_param_value(val: crate::trigger::ParameterValue) -> ParameterValue {
680 match val {
681 crate::trigger::ParameterValue::Int(i) => ParameterValue::Int(i),
682 crate::trigger::ParameterValue::Number(n) => ParameterValue::Number(n),
683 crate::trigger::ParameterValue::Bool(b) => ParameterValue::Bool(b),
684 crate::trigger::ParameterValue::String(s) => ParameterValue::String(s),
685 crate::trigger::ParameterValue::Enum(e) => ParameterValue::Enum(e),
686 }
687}
688
689fn map_action_param_type(ty: crate::action::ParameterType) -> ParameterType {
690 match ty {
691 crate::action::ParameterType::Int => ParameterType::Int,
692 crate::action::ParameterType::Number => ParameterType::Number,
693 crate::action::ParameterType::Bool => ParameterType::Bool,
694 crate::action::ParameterType::String => ParameterType::String,
695 crate::action::ParameterType::Enum => ParameterType::Enum,
696 }
697}
698
699fn map_action_param_value(val: crate::action::ParameterValue) -> ParameterValue {
700 match val {
701 crate::action::ParameterValue::Int(i) => ParameterValue::Int(i),
702 crate::action::ParameterValue::Number(n) => ParameterValue::Number(n),
703 crate::action::ParameterValue::Bool(b) => ParameterValue::Bool(b),
704 crate::action::ParameterValue::String(s) => ParameterValue::String(s),
705 crate::action::ParameterValue::Enum(e) => ParameterValue::Enum(e),
706 }
707}
708
709#[cfg(test)]
710mod tests;