Skip to main content

jellyflow_runtime/schema/registry/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use jellyflow_core::core::{CanvasPoint, NodeId, NodeKindKey, PortId};
5
6use super::migration::NodeKindMigrator;
7use super::types::{NodeInstantiation, NodeInstantiationError, NodeKindViewDescriptor, NodeSchema};
8
9mod plans;
10
11/// Registry for node kinds.
12#[derive(Default, Clone)]
13pub struct NodeRegistry {
14    by_kind: BTreeMap<NodeKindKey, NodeSchema>,
15    by_alias: BTreeMap<NodeKindKey, NodeKindKey>,
16    migrators: BTreeMap<NodeKindKey, Arc<dyn NodeKindMigrator>>,
17}
18
19impl std::fmt::Debug for NodeRegistry {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("NodeRegistry")
22            .field("schema_count", &self.by_kind.len())
23            .field("alias_count", &self.by_alias.len())
24            .field("migrator_count", &self.migrators.len())
25            .finish()
26    }
27}
28
29impl NodeRegistry {
30    /// Creates an empty registry.
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// Registers a schema.
36    ///
37    /// Aliases are mapped to the schema's canonical kind.
38    pub fn register(&mut self, schema: NodeSchema) {
39        for alias in &schema.kind_aliases {
40            self.by_alias.insert(alias.clone(), schema.kind.clone());
41        }
42        self.by_kind.insert(schema.kind.clone(), schema);
43    }
44
45    /// Registers a per-kind data migrator.
46    ///
47    /// The migrator is stored as an in-memory hook (not serialized as part of the schema data).
48    pub fn register_migrator(
49        &mut self,
50        kind: NodeKindKey,
51        migrator: Arc<dyn NodeKindMigrator>,
52    ) -> &mut Self {
53        self.migrators.insert(kind, migrator);
54        self
55    }
56
57    /// Resolves an input kind to a canonical kind (via aliases).
58    pub fn resolve_kind<'a>(&'a self, kind: &'a NodeKindKey) -> &'a NodeKindKey {
59        self.by_alias.get(kind).unwrap_or(kind)
60    }
61
62    /// Looks up a schema by canonical kind key.
63    pub fn get(&self, kind: &NodeKindKey) -> Option<&NodeSchema> {
64        self.by_kind.get(kind)
65    }
66
67    /// Iterates all registered schemas in deterministic order (by kind key).
68    pub fn schemas(&self) -> impl Iterator<Item = &NodeSchema> {
69        self.by_kind.values()
70    }
71
72    /// Returns the adapter-facing descriptor for a node kind or alias.
73    pub fn view_descriptor(&self, kind: &NodeKindKey) -> Option<NodeKindViewDescriptor> {
74        let canonical = self.resolve_kind(kind);
75        self.get(canonical).map(NodeKindViewDescriptor::from_schema)
76    }
77
78    /// Returns adapter-facing node-kind descriptors in deterministic order.
79    pub fn view_descriptors(&self) -> Vec<NodeKindViewDescriptor> {
80        self.schemas()
81            .map(NodeKindViewDescriptor::from_schema)
82            .collect()
83    }
84
85    /// Instantiates a node kind or alias with freshly allocated node and port ids.
86    pub fn instantiate_node(
87        &self,
88        kind: &NodeKindKey,
89        pos: CanvasPoint,
90    ) -> Result<NodeInstantiation, NodeInstantiationError> {
91        let canonical = self.resolve_kind(kind);
92        self.get(canonical)
93            .map(|schema| schema.instantiate(pos))
94            .ok_or_else(|| NodeInstantiationError::MissingSchema(kind.clone()))
95    }
96
97    /// Instantiates a node kind or alias with caller-provided ids.
98    pub fn instantiate_node_with_ids(
99        &self,
100        kind: &NodeKindKey,
101        node_id: NodeId,
102        pos: CanvasPoint,
103        port_ids: impl IntoIterator<Item = PortId>,
104    ) -> Result<NodeInstantiation, NodeInstantiationError> {
105        let canonical = self.resolve_kind(kind);
106        self.get(canonical)
107            .ok_or_else(|| NodeInstantiationError::MissingSchema(kind.clone()))?
108            .instantiate_with_ids(node_id, pos, port_ids)
109    }
110}