pub struct SubgraphTransformer {
pub subgraph_name: String,
pub ns: Vec<String>,
pub filter: Option<Arc<dyn Fn(&Value) -> bool + Send + Sync>>,
pub include_internal: bool,
}Expand description
Subgraph stream event transformer with namespace and filter
Transforms stream events from subgraph execution by adding namespace prefixes and filtering events based on configuration.
Fields§
§subgraph_name: StringSubgraph name for namespace prefix
ns: Vec<String>Current namespace stack
filter: Option<Arc<dyn Fn(&Value) -> bool + Send + Sync>>Optional filter for event types
The closure receives the event as a serde_json::Value and returns
true if the event should be included, false otherwise.
include_internal: boolWhether to include internal events
Implementations§
Source§impl SubgraphTransformer
impl SubgraphTransformer
Sourcepub const fn new(subgraph_name: String) -> Self
pub const fn new(subgraph_name: String) -> Self
Create a new subgraph transformer
§Arguments
subgraph_name- Name of the subgraph for namespace prefix
Sourcepub fn with_filter(
self,
filter: impl Fn(&Value) -> bool + Send + Sync + 'static,
) -> Self
pub fn with_filter( self, filter: impl Fn(&Value) -> bool + Send + Sync + 'static, ) -> Self
Set event filter as a closure
§Arguments
filter- Closure that receives the event as JSON and returns true if the event should be included
Sourcepub fn with_filter_types(self, types: Vec<String>) -> Self
pub fn with_filter_types(self, types: Vec<String>) -> Self
Set event filter by event type names (backward compatibility)
§Arguments
types- List of event types to include (empty means all events)
Sourcepub const fn with_internal(self, include: bool) -> Self
pub const fn with_internal(self, include: bool) -> Self
Sourcepub fn transform<S: State>(
&self,
event: &StreamEvent<S>,
) -> Option<StreamEvent<S>>
pub fn transform<S: State>( &self, event: &StreamEvent<S>, ) -> Option<StreamEvent<S>>
Transform a stream event by adding namespace prefixes.
Applies two transformations to subgraph stream events:
- Filter: if a filter closure is configured, events whose type
does not match are discarded (
Noneis returned). - Namespace: the
nodefield of matching events is prefixed with the subgraph namespace (ns/subgraph_name), and thensvector is extended with the subgraph name to reflect the nesting path.
§Arguments
event- The stream event to transform.
§Returns
Some(transformed_event) if the event passes the filter, None
otherwise. Variants that carry no node or ns fields are returned
unchanged (aside from cloning).
Sourcepub fn add_namespace(&mut self, segment: String)
pub fn add_namespace(&mut self, segment: String)
Sourcepub fn child_transformer(&self, child_name: &str) -> Self
pub fn child_transformer(&self, child_name: &str) -> Self
Create a child transformer for a nested subgraph.
The child transformer inherits the current namespace and appends the
current subgraph_name as an additional segment, then sets the child
name. This enables correct namespace propagation for sub-subgraphs.
§Arguments
child_name- Name of the nested (child) subgraph
§Examples
use juncture_core::SubgraphTransformer;
let parent = SubgraphTransformer::new("parent".to_string());
let child = parent.child_transformer("child");
// The child transformer's build_ns would produce:
// ns_prefix = "parent/child"
// full_ns = ["parent", "child"]Sourcepub fn to_emitter<S: State>(
&self,
tx: Sender<StreamEvent<S>>,
mode: StreamMode,
) -> EventEmitter<S>
pub fn to_emitter<S: State>( &self, tx: Sender<StreamEvent<S>>, mode: StreamMode, ) -> EventEmitter<S>
Create an EventEmitter with this
transformer’s namespace chain applied.
Each namespace segment in self.ns and self.subgraph_name is applied
via [EventEmitter::with_subgraph_ns] so that events emitted through
the returned emitter carry the full subgraph nesting path.
§Arguments
tx- The underlying sender for stream eventsmode- The stream mode controlling what events are emitted
§Examples
use juncture_core::{EventEmitter, SubgraphTransformer, StreamMode};
use juncture_core::state::FieldVersions;
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
struct MyState;
impl juncture_core::State for MyState {
type Update = MyUpdate;
type FieldVersions = FieldVersions;
fn apply(&mut self, _u: MyUpdate) -> juncture_core::FieldsChanged {
juncture_core::FieldsChanged(0)
}
fn reset_ephemeral(&mut self) {}
}
#[derive(Clone, Debug, Default)]
struct MyUpdate;
let (tx, _rx) = mpsc::channel(16);
let transformer = SubgraphTransformer::new("sub".to_string());
let emitter: EventEmitter<MyState> = transformer.to_emitter(tx, StreamMode::Values);
assert_eq!(emitter.ns(), &["sub"]);Trait Implementations§
Source§impl Clone for SubgraphTransformer
impl Clone for SubgraphTransformer
Source§fn clone(&self) -> SubgraphTransformer
fn clone(&self) -> SubgraphTransformer
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more