Skip to main content

pulse_client/
streams.rs

1//! B-107 — Kafka-Streams-like declarative DSL that compiles to a Pulse pipeline.
2//!
3//! The DSL is **server-side execution, client-side declaration**: the operator
4//! chain is built in Rust, compiled to the JSON pipeline shape that the Pulse
5//! server's `StreamingOperatorValidator` accepts, and POSTed to
6//! `/api/pulse/pipelines`. Stream processing then runs on the Pulse engine
7//! (3.6 M evt/s native throughput), not in the client process.
8//!
9//! This is the opposite of Kafka Streams (which executes in the caller's JVM).
10//! The trade-off: you can't do microsecond client-side compute, but you get
11//! infinite-scale stateful streaming, durable replicated state queryable via
12//! B-106 IQ, and the same DSL works from any of the 5 Pulse SDKs.
13//!
14//! # Quick start
15//!
16//! ```no_run
17//! use pulse_client::{aggs, windows, PulseClient, StreamBuilder};
18//!
19//! # async fn run() -> Result<(), pulse_client::PulseError> {
20//! let client = PulseClient::builder()
21//!     .base_url("http://localhost:9090")
22//!     .token("ey...")
23//!     .build()?;
24//!
25//! let mut aggregations = std::collections::BTreeMap::new();
26//! aggregations.insert("avgTemp".to_string(), aggs::avg("temperature"));
27//!
28//! let builder = StreamBuilder::new("iot-temperature-aggregator")
29//!     .from_topic_with_engine("sensor-readings", "mqtt")
30//!     .key_by("deviceId")
31//!     .window_with_aggs(windows::tumbling("60s"), aggregations)
32//!     .filter("avgTemp > 75")
33//!     .to_topic_with_channel("sensor-minute-averages", "email");
34//!
35//! client.streams().deploy(&builder).await?;
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! Supported operators (mirror the 11 validated by the server's
41//! `StreamingOperatorValidator`): `filter`, `map`, `flat_map`, `key_by`,
42//! `window`, `branch`, `enrich`, `enrich_async`, `cep`, `broadcast_join`,
43//! `cdc_join`.
44//!
45//! Conditions and field-expressions are passed as **strings** — closures /
46//! lambdas are NOT supported because they can't be serialised to JSON.
47
48use std::collections::BTreeMap;
49
50use reqwest::Method;
51use serde_json::{json, Map, Value};
52
53use crate::client::PulseClient;
54use crate::error::PulseError;
55
56// ---------------------------------------------------------------------------
57// Window specs
58// ---------------------------------------------------------------------------
59
60/// A window specification. Compiled to the string form the server expects.
61///
62/// Construct via the [`windows`] helpers — never instantiate directly unless
63/// you've validated the raw string against `WindowEngine.parseSpec`.
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct WindowSpec {
66    spec: String,
67}
68
69impl WindowSpec {
70    /// Wraps a pre-validated raw spec string. Panics on empty input.
71    pub fn new(spec: impl Into<String>) -> Self {
72        let spec = spec.into();
73        if spec.trim().is_empty() {
74            panic!("WindowSpec requires a non-empty spec string");
75        }
76        Self { spec }
77    }
78
79    /// The raw spec string as it will appear on the wire.
80    pub fn spec(&self) -> &str {
81        &self.spec
82    }
83}
84
85impl std::fmt::Display for WindowSpec {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.write_str(&self.spec)
88    }
89}
90
91/// Window-spec factory namespace.
92///
93/// Each function returns a [`WindowSpec`] compiled to the exact form the
94/// server's `WindowEngine.parseSpec` accepts.
95pub mod windows {
96    use super::WindowSpec;
97
98    /// Non-overlapping fixed windows: `tumbling("60s")`.
99    pub fn tumbling(size: &str) -> WindowSpec {
100        require_nonblank("size", size);
101        WindowSpec::new(format!("tumbling({size})"))
102    }
103
104    /// Overlapping windows: `sliding("10m", "1m")` = size, slide.
105    pub fn sliding(size: &str, slide: &str) -> WindowSpec {
106        require_nonblank("size", size);
107        require_nonblank("slide", slide);
108        WindowSpec::new(format!("sliding({size},{slide})"))
109    }
110
111    /// Inactivity-bounded windows: `session("30s")`.
112    pub fn session(timeout: &str) -> WindowSpec {
113        require_nonblank("timeout", timeout);
114        WindowSpec::new(format!("session({timeout})"))
115    }
116
117    /// Single unbounded window. Use for global aggregates.
118    pub fn global() -> WindowSpec {
119        WindowSpec::new("global")
120    }
121
122    /// Event-count tumbling: closes after `n` events. `count(100)`.
123    pub fn count(n: u64) -> WindowSpec {
124        if n == 0 {
125            panic!("count window size must be positive, got 0");
126        }
127        WindowSpec::new(format!("count({n})"))
128    }
129
130    /// Event-count sliding: `count_sliding(100, 10)` = window, slide.
131    pub fn count_sliding(size: u64, slide: u64) -> WindowSpec {
132        if size == 0 || slide == 0 {
133            panic!("count_sliding requires positive size and slide, got {size}, {slide}");
134        }
135        WindowSpec::new(format!("count_sliding({size},{slide})"))
136    }
137
138    fn require_nonblank(name: &str, value: &str) {
139        if value.trim().is_empty() {
140            panic!("{name} must be a non-empty string");
141        }
142    }
143}
144
145// ---------------------------------------------------------------------------
146// Aggregators
147// ---------------------------------------------------------------------------
148
149/// Aggregator factory namespace.
150///
151/// Each function returns the string template the server's `Aggregators.parse`
152/// accepts inside `window.aggregations` (e.g. `"avg(temperature)"`).
153pub mod aggs {
154    /// Event count — no field required.
155    pub fn count() -> String {
156        "count()".into()
157    }
158
159    /// Sum of a numeric field: `aggs::sum("amount")`.
160    pub fn sum(field: &str) -> String {
161        require_nonblank("field", field);
162        format!("sum({field})")
163    }
164
165    /// Average of a numeric field.
166    pub fn avg(field: &str) -> String {
167        require_nonblank("field", field);
168        format!("avg({field})")
169    }
170
171    /// Minimum value of a numeric field.
172    pub fn min(field: &str) -> String {
173        require_nonblank("field", field);
174        format!("min({field})")
175    }
176
177    /// Maximum value of a numeric field.
178    pub fn max(field: &str) -> String {
179        require_nonblank("field", field);
180        format!("max({field})")
181    }
182
183    /// Collect every value of `field` into a list.
184    pub fn collect_list(field: &str) -> String {
185        require_nonblank("field", field);
186        format!("collect_list({field})")
187    }
188
189    /// Cardinality of distinct values of `field`.
190    pub fn distinct_count(field: &str) -> String {
191        require_nonblank("field", field);
192        format!("distinct_count({field})")
193    }
194
195    fn require_nonblank(name: &str, value: &str) {
196        if value.trim().is_empty() {
197            panic!("{name} must be a non-empty string");
198        }
199    }
200}
201
202// ---------------------------------------------------------------------------
203// Option carriers
204// ---------------------------------------------------------------------------
205
206/// Options for [`StreamBuilder::map`].
207#[derive(Debug, Clone, Default)]
208pub struct MapOptions {
209    /// Output-field-name → source-expression-string mapping.
210    pub fields: Option<BTreeMap<String, String>>,
211    /// Tag the output event with a `type` field.
212    pub target_type: Option<String>,
213}
214
215/// Options for [`StreamBuilder::window`].
216#[derive(Debug, Clone, Default)]
217pub struct WindowOptions {
218    /// Map of output-field → aggregator-string (use [`aggs`] for the right-hand side).
219    pub aggregations: Option<BTreeMap<String, String>>,
220    /// Override for where window results go.
221    pub output_topic: Option<String>,
222    /// Server-side trigger config (passed through opaquely).
223    pub trigger: Option<Value>,
224}
225
226/// One branch of [`StreamBuilder::branch`].
227#[derive(Debug, Clone)]
228pub struct BranchSpec {
229    pub condition: String,
230    pub topic: String,
231}
232
233impl BranchSpec {
234    pub fn new(condition: impl Into<String>, topic: impl Into<String>) -> Self {
235        Self {
236            condition: condition.into(),
237            topic: topic.into(),
238        }
239    }
240}
241
242/// Options for [`StreamBuilder::enrich_async`].
243#[derive(Debug, Clone, Default)]
244pub struct EnrichAsyncOptions {
245    pub url: String,
246    pub parallelism: Option<u32>,
247    pub queue_size: Option<u32>,
248    pub timeout_ms: Option<u32>,
249    pub max_retries: Option<u32>,
250    pub retry_backoff_ms: Option<u32>,
251    /// Must be `"PRESERVE_INPUT"` or `"UNORDERED"`.
252    pub ordering: Option<String>,
253    /// Must be `"EMIT_ERROR"`, `"DROP"`, or `"PASS_THROUGH"`.
254    pub on_failure: Option<String>,
255}
256
257/// Options for [`StreamBuilder::cep`].
258#[derive(Debug, Clone, Default)]
259pub struct CepOptions {
260    pub within: Option<String>,
261    pub name: Option<String>,
262}
263
264/// Options for [`StreamBuilder::broadcast_join`].
265#[derive(Debug, Clone, Default)]
266pub struct BroadcastJoinOptions {
267    pub join_key_field: String,
268    pub streaming_topic: Option<String>,
269    pub name: Option<String>,
270    pub max_bytes: Option<i64>,
271    /// Must be `"cdc"`, `"periodic"`, or `"explicit"`.
272    pub refresh_mode: Option<String>,
273    pub interval_millis: Option<u32>,
274}
275
276/// Options for [`StreamBuilder::cdc_join`].
277#[derive(Debug, Clone, Default)]
278pub struct CdcJoinOptions {
279    pub source: String,
280    pub join_key: Option<String>,
281    pub table: Option<String>,
282    pub state_backend: Option<String>,
283}
284
285// ---------------------------------------------------------------------------
286// StreamBuilder
287// ---------------------------------------------------------------------------
288
289/// Fluent builder for a Pulse streaming pipeline.
290///
291/// Chain operator methods, then call [`build`](Self::build) (or pass to
292/// [`StreamsResource::deploy`]).
293///
294/// All operator methods take `&mut self` and return `Self` so calls chain
295/// naturally. Methods that validate their inputs panic on obviously-bad
296/// arguments (blank required fields, non-positive counts, unknown enum
297/// values) so bugs are caught at call site, not after a 400 round-trip.
298#[derive(Debug, Clone, Default)]
299pub struct StreamBuilder {
300    name: Option<String>,
301    description: Option<String>,
302    agent_label: Option<String>,
303    input_topic: Option<String>,
304    source_engine: Option<String>,
305    source_config: Map<String, Value>,
306    source_label: Option<String>,
307    output_topic: Option<String>,
308    sink_channel: Option<String>,
309    sink_config: Map<String, Value>,
310    sink_label: Option<String>,
311    operators: Vec<Map<String, Value>>,
312}
313
314impl StreamBuilder {
315    /// Builder with the given pipeline name preset.
316    pub fn new(name: impl Into<String>) -> Self {
317        let name = name.into();
318        require_nonblank("name", &name);
319        Self {
320            name: Some(name),
321            ..Self::default()
322        }
323    }
324
325    /// Builder with no preset name. Use [`named`](Self::named) or pass the
326    /// name to [`build_with_name`](Self::build_with_name).
327    pub fn anonymous() -> Self {
328        Self::default()
329    }
330
331    // ------------------------------------------------------------------
332    // Source
333    // ------------------------------------------------------------------
334
335    /// Sets the input topic. Source engine defaults to `"kafka"`.
336    pub fn from_topic(mut self, topic: impl Into<String>) -> Self {
337        let topic = topic.into();
338        require_nonblank("topic", &topic);
339        self.input_topic = Some(topic);
340        self.source_engine = Some("kafka".into());
341        self
342    }
343
344    /// Sets the input topic + source engine.
345    pub fn from_topic_with_engine(
346        mut self,
347        topic: impl Into<String>,
348        engine: impl Into<String>,
349    ) -> Self {
350        let topic = topic.into();
351        let engine = engine.into();
352        require_nonblank("topic", &topic);
353        require_nonblank("engine", &engine);
354        self.input_topic = Some(topic);
355        self.source_engine = Some(engine);
356        self
357    }
358
359    /// Merges extra config into the source node's `config` map.
360    pub fn with_source_config(mut self, key: impl Into<String>, value: Value) -> Self {
361        self.source_config.insert(key.into(), value);
362        self
363    }
364
365    /// Sets the display label for the source node.
366    pub fn with_source_label(mut self, label: impl Into<String>) -> Self {
367        self.source_label = Some(label.into());
368        self
369    }
370
371    // ------------------------------------------------------------------
372    // Operators
373    // ------------------------------------------------------------------
374
375    /// Filter operator. `condition` is a CEL-like expression string.
376    pub fn filter(mut self, condition: impl Into<String>) -> Self {
377        let condition = condition.into();
378        require_nonblank("condition", &condition);
379        let mut op = Map::new();
380        op.insert("type".into(), Value::String("filter".into()));
381        op.insert("condition".into(), Value::String(condition));
382        self.operators.push(op);
383        self
384    }
385
386    /// Map operator. At least one of `options.fields` / `options.target_type` is required.
387    pub fn map(mut self, options: MapOptions) -> Self {
388        if options.fields.is_none() && options.target_type.is_none() {
389            panic!("map operator does nothing — provide `fields` or `target_type`");
390        }
391        let mut op = Map::new();
392        op.insert("type".into(), Value::String("map".into()));
393        if let Some(fields) = options.fields {
394            let mut m = Map::new();
395            for (k, v) in fields {
396                m.insert(k, Value::String(v));
397            }
398            op.insert("fields".into(), Value::Object(m));
399        }
400        if let Some(t) = options.target_type {
401            op.insert("targetType".into(), Value::String(t));
402        }
403        self.operators.push(op);
404        self
405    }
406
407    /// Flat-map: explode an array-valued field into one event per element.
408    pub fn flat_map(mut self, split_field: impl Into<String>) -> Self {
409        let split_field = split_field.into();
410        require_nonblank("split_field", &split_field);
411        let mut op = Map::new();
412        op.insert("type".into(), Value::String("flatMap".into()));
413        op.insert("splitField".into(), Value::String(split_field));
414        self.operators.push(op);
415        self
416    }
417
418    /// Group the stream by a top-level field value. Required before stateful ops.
419    pub fn key_by(mut self, field: impl Into<String>) -> Self {
420        let field = field.into();
421        require_nonblank("field", &field);
422        let mut op = Map::new();
423        op.insert("type".into(), Value::String("keyBy".into()));
424        op.insert("field".into(), Value::String(field));
425        self.operators.push(op);
426        self
427    }
428
429    /// Window operator with no extra options.
430    pub fn window(self, spec: WindowSpec) -> Self {
431        self.window_full(spec, WindowOptions::default())
432    }
433
434    /// Window operator with aggregations only.
435    pub fn window_with_aggs(
436        self,
437        spec: WindowSpec,
438        aggregations: BTreeMap<String, String>,
439    ) -> Self {
440        self.window_full(
441            spec,
442            WindowOptions {
443                aggregations: Some(aggregations),
444                ..Default::default()
445            },
446        )
447    }
448
449    /// Window operator with the full option set.
450    pub fn window_full(mut self, spec: WindowSpec, options: WindowOptions) -> Self {
451        let mut op = Map::new();
452        op.insert("type".into(), Value::String("window".into()));
453        op.insert("spec".into(), Value::String(spec.spec.clone()));
454        if let Some(aggs_map) = options.aggregations {
455            let mut m = Map::new();
456            for (k, v) in aggs_map {
457                m.insert(k, Value::String(v));
458            }
459            op.insert("aggregations".into(), Value::Object(m));
460        }
461        if let Some(out) = options.output_topic {
462            op.insert("outputTopic".into(), Value::String(out));
463        }
464        if let Some(trig) = options.trigger {
465            op.insert("trigger".into(), trig);
466        }
467        self.operators.push(op);
468        self
469    }
470
471    /// Window operator with a raw spec string. Useful when you've already
472    /// validated the spec against `WindowEngine.parseSpec`.
473    pub fn window_from_str(mut self, spec: &str, options: WindowOptions) -> Self {
474        require_nonblank("spec", spec);
475        self = self.window_full(WindowSpec::new(spec), options);
476        self
477    }
478
479    /// Branch operator: route events to different topics by condition.
480    pub fn branch(mut self, branches: Vec<BranchSpec>) -> Self {
481        if branches.is_empty() {
482            panic!("branch operator requires at least one branch");
483        }
484        let mut normalised = Vec::with_capacity(branches.len());
485        for (i, b) in branches.iter().enumerate() {
486            if b.condition.trim().is_empty() {
487                panic!("branch[{i}] requires a non-empty `condition`");
488            }
489            if b.topic.trim().is_empty() {
490                panic!("branch[{i}] requires a non-empty `topic`");
491            }
492            normalised.push(json!({
493                "condition": b.condition,
494                "topic": b.topic,
495            }));
496        }
497        let mut op = Map::new();
498        op.insert("type".into(), Value::String("branch".into()));
499        op.insert("branches".into(), Value::Array(normalised));
500        self.operators.push(op);
501        self
502    }
503
504    /// Synchronous enrichment: join the stream against a state-store topic.
505    pub fn enrich(mut self, lookup_topic: impl Into<String>, key_field: impl Into<String>) -> Self {
506        let lookup_topic = lookup_topic.into();
507        let key_field = key_field.into();
508        require_nonblank("lookup_topic", &lookup_topic);
509        require_nonblank("key_field", &key_field);
510        let mut op = Map::new();
511        op.insert("type".into(), Value::String("enrich".into()));
512        op.insert("lookupTopic".into(), Value::String(lookup_topic));
513        op.insert("keyField".into(), Value::String(key_field));
514        self.operators.push(op);
515        self
516    }
517
518    /// Asynchronous HTTP enrichment. `url` supports `{field}` placeholders.
519    pub fn enrich_async(mut self, options: EnrichAsyncOptions) -> Self {
520        require_nonblank("url", &options.url);
521        if let Some(ref o) = options.ordering {
522            if o != "PRESERVE_INPUT" && o != "UNORDERED" {
523                panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
524            }
525        }
526        if let Some(ref f) = options.on_failure {
527            if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
528                panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
529            }
530        }
531        let mut op = Map::new();
532        op.insert("type".into(), Value::String("enrichAsync".into()));
533        op.insert("url".into(), Value::String(options.url));
534        if let Some(v) = options.parallelism {
535            op.insert("parallelism".into(), Value::Number(v.into()));
536        }
537        if let Some(v) = options.queue_size {
538            op.insert("queueSize".into(), Value::Number(v.into()));
539        }
540        if let Some(v) = options.timeout_ms {
541            op.insert("timeoutMs".into(), Value::Number(v.into()));
542        }
543        if let Some(v) = options.max_retries {
544            op.insert("maxRetries".into(), Value::Number(v.into()));
545        }
546        if let Some(v) = options.retry_backoff_ms {
547            op.insert("retryBackoffMs".into(), Value::Number(v.into()));
548        }
549        if let Some(o) = options.ordering {
550            op.insert("ordering".into(), Value::String(o));
551        }
552        if let Some(f) = options.on_failure {
553            op.insert("onFailure".into(), Value::String(f));
554        }
555        self.operators.push(op);
556        self
557    }
558
559    /// Complex Event Processing: match a sequence of conditions.
560    pub fn cep(mut self, sequence: Vec<Value>, options: CepOptions) -> Self {
561        if sequence.is_empty() {
562            panic!("cep operator requires a non-empty sequence");
563        }
564        let mut op = Map::new();
565        op.insert("type".into(), Value::String("cep".into()));
566        op.insert("sequence".into(), Value::Array(sequence));
567        if let Some(w) = options.within {
568            op.insert("within".into(), Value::String(w));
569        }
570        if let Some(n) = options.name {
571            op.insert("name".into(), Value::String(n));
572        }
573        self.operators.push(op);
574        self
575    }
576
577    /// Broadcast join: enrich the stream against a fully-replicated table.
578    pub fn broadcast_join(mut self, options: BroadcastJoinOptions) -> Self {
579        require_nonblank("join_key_field", &options.join_key_field);
580        if let Some(ref m) = options.refresh_mode {
581            if m != "cdc" && m != "periodic" && m != "explicit" {
582                panic!("refresh_mode must be cdc, periodic, or explicit, got {m:?}");
583            }
584        }
585        let mut op = Map::new();
586        op.insert("type".into(), Value::String("broadcastJoin".into()));
587        op.insert("joinKeyField".into(), Value::String(options.join_key_field));
588        if let Some(t) = options.streaming_topic {
589            op.insert("streamingTopic".into(), Value::String(t));
590        }
591        if let Some(n) = options.name {
592            op.insert("name".into(), Value::String(n));
593        }
594        if let Some(b) = options.max_bytes {
595            op.insert("maxBytes".into(), Value::Number(b.into()));
596        }
597        if let Some(m) = options.refresh_mode {
598            op.insert("refreshMode".into(), Value::String(m));
599        }
600        if let Some(i) = options.interval_millis {
601            op.insert("intervalMillis".into(), Value::Number(i.into()));
602        }
603        self.operators.push(op);
604        self
605    }
606
607    /// CDC join: stream-table join against a CDC-fed state table.
608    pub fn cdc_join(mut self, options: CdcJoinOptions) -> Self {
609        require_nonblank("source", &options.source);
610        let mut op = Map::new();
611        op.insert("type".into(), Value::String("cdcJoin".into()));
612        op.insert("source".into(), Value::String(options.source));
613        if let Some(k) = options.join_key {
614            op.insert("joinKey".into(), Value::String(k));
615        }
616        if let Some(t) = options.table {
617            op.insert("table".into(), Value::String(t));
618        }
619        if let Some(b) = options.state_backend {
620            op.insert("stateBackend".into(), Value::String(b));
621        }
622        self.operators.push(op);
623        self
624    }
625
626    // ------------------------------------------------------------------
627    // Sink
628    // ------------------------------------------------------------------
629
630    /// Sets the output topic only. No sink node is emitted.
631    pub fn to_topic(mut self, topic: impl Into<String>) -> Self {
632        let topic = topic.into();
633        require_nonblank("topic", &topic);
634        self.output_topic = Some(topic);
635        self.sink_channel = None;
636        self
637    }
638
639    /// Sets the output topic + sink channel (emits a sink node).
640    pub fn to_topic_with_channel(
641        mut self,
642        topic: impl Into<String>,
643        channel: impl Into<String>,
644    ) -> Self {
645        let topic = topic.into();
646        let channel = channel.into();
647        require_nonblank("topic", &topic);
648        require_nonblank("channel", &channel);
649        self.output_topic = Some(topic);
650        self.sink_channel = Some(channel);
651        self
652    }
653
654    /// Merges extra config into the sink node's `config` map.
655    pub fn with_sink_config(mut self, key: impl Into<String>, value: Value) -> Self {
656        self.sink_config.insert(key.into(), value);
657        self
658    }
659
660    /// Sets the display label for the sink node.
661    pub fn with_sink_label(mut self, label: impl Into<String>) -> Self {
662        self.sink_label = Some(label.into());
663        self
664    }
665
666    /// Terminate the stream in the agent's state store (queryable via B-106 IQ).
667    pub fn to_state(mut self) -> Self {
668        self.output_topic = None;
669        self.sink_channel = None;
670        self.sink_config = Map::new();
671        self.sink_label = None;
672        self
673    }
674
675    // ------------------------------------------------------------------
676    // Metadata
677    // ------------------------------------------------------------------
678
679    /// Sets / overrides the pipeline name.
680    pub fn named(mut self, name: impl Into<String>) -> Self {
681        let name = name.into();
682        require_nonblank("name", &name);
683        self.name = Some(name);
684        self
685    }
686
687    /// Sets the pipeline description.
688    pub fn described_as(mut self, description: impl Into<String>) -> Self {
689        self.description = Some(description.into());
690        self
691    }
692
693    /// Sets the display label for the streaming agent node.
694    pub fn with_agent_label(mut self, label: impl Into<String>) -> Self {
695        let label = label.into();
696        require_nonblank("label", &label);
697        self.agent_label = Some(label);
698        self
699    }
700
701    // ------------------------------------------------------------------
702    // Compilation
703    // ------------------------------------------------------------------
704
705    /// Returns a read-only view of the recorded operator chain.
706    pub fn operators(&self) -> &[Map<String, Value>] {
707        &self.operators
708    }
709
710    /// Compile the chain into a Pulse pipeline dict ready for POST.
711    pub fn build(&self) -> Result<Value, PulseError> {
712        self.build_inner(None)
713    }
714
715    /// Same as [`build`](Self::build) but overrides the pipeline name.
716    pub fn build_with_name(&self, name: &str) -> Result<Value, PulseError> {
717        require_nonblank("name", name);
718        self.build_inner(Some(name.to_string()))
719    }
720
721    fn build_inner(&self, override_name: Option<String>) -> Result<Value, PulseError> {
722        let pipeline_name = override_name.or_else(|| self.name.clone()).ok_or_else(|| {
723            PulseError::InvalidConfig(
724                "pipeline name required — pass to StreamBuilder::new or build_with_name".into(),
725            )
726        })?;
727        let input_topic = self.input_topic.as_ref().ok_or_else(|| {
728            PulseError::InvalidConfig("no source — call .from_topic(...) before build()".into())
729        })?;
730        if self.operators.is_empty() {
731            return Err(PulseError::InvalidConfig(
732                "no operators — chain at least one of .filter/.map/.key_by/... before build()"
733                    .into(),
734            ));
735        }
736
737        let source_engine = self.source_engine.as_deref().unwrap_or("kafka");
738
739        let mut nodes: Vec<Value> = Vec::with_capacity(3);
740
741        // Source node
742        let mut src_config = Map::new();
743        src_config.insert("engine".into(), Value::String(source_engine.to_string()));
744        src_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
745        for (k, v) in &self.source_config {
746            src_config.insert(k.clone(), v.clone());
747        }
748        let src_label = self
749            .source_label
750            .clone()
751            .unwrap_or_else(|| format!("{source_engine} source"));
752        nodes.push(json!({
753            "type": "source",
754            "label": src_label,
755            "config": Value::Object(src_config),
756        }));
757
758        // Agent node
759        let mut agent_config = Map::new();
760        agent_config.insert("engine".into(), Value::String("streaming".into()));
761        agent_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
762        let ops_value: Vec<Value> = self
763            .operators
764            .iter()
765            .map(|op| Value::Object(op.clone()))
766            .collect();
767        agent_config.insert("operators".into(), Value::Array(ops_value));
768        if let Some(ref out) = self.output_topic {
769            agent_config.insert("outputTopic".into(), Value::String(out.clone()));
770        }
771        let agent_label = self
772            .agent_label
773            .clone()
774            .unwrap_or_else(|| pipeline_name.clone());
775        nodes.push(json!({
776            "type": "agent",
777            "label": agent_label,
778            "config": Value::Object(agent_config),
779        }));
780
781        // Sink node — only when both output_topic AND sink_channel are set
782        if let (Some(out), Some(ch)) = (self.output_topic.as_ref(), self.sink_channel.as_ref()) {
783            let mut sink_conf = Map::new();
784            sink_conf.insert("channel".into(), Value::String(ch.clone()));
785            sink_conf.insert("inputTopic".into(), Value::String(out.clone()));
786            for (k, v) in &self.sink_config {
787                sink_conf.insert(k.clone(), v.clone());
788            }
789            let sink_label = self
790                .sink_label
791                .clone()
792                .unwrap_or_else(|| format!("{ch} sink"));
793            nodes.push(json!({
794                "type": "sink",
795                "label": sink_label,
796                "config": Value::Object(sink_conf),
797            }));
798        }
799
800        let mut pipeline = Map::new();
801        pipeline.insert("name".into(), Value::String(pipeline_name));
802        pipeline.insert("nodes".into(), Value::Array(nodes));
803        if let Some(ref desc) = self.description {
804            pipeline.insert("description".into(), Value::String(desc.clone()));
805        }
806        Ok(Value::Object(pipeline))
807    }
808}
809
810// ---------------------------------------------------------------------------
811// StreamsResource — the client.streams() accessor
812// ---------------------------------------------------------------------------
813
814/// `client.streams()` — compile + deploy [`StreamBuilder`] pipelines.
815///
816/// Sugar over `client.pipelines().create()` — the compile happens client-side,
817/// the deploy is the same POST.
818pub struct StreamsResource<'c> {
819    pub(crate) client: &'c PulseClient,
820}
821
822impl<'c> StreamsResource<'c> {
823    /// Compile the builder to a pipeline dict WITHOUT deploying.
824    pub fn compile(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
825        builder.build()
826    }
827
828    /// Compile with a name override WITHOUT deploying.
829    pub fn compile_with_name(
830        &self,
831        builder: &StreamBuilder,
832        name: &str,
833    ) -> Result<Value, PulseError> {
834        builder.build_with_name(name)
835    }
836
837    /// Compile + POST to `/api/pulse/pipelines`. Returns the server response.
838    pub async fn deploy(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
839        let definition = builder.build()?;
840        self.client
841            .request(
842                Method::POST,
843                "/api/pulse/pipelines",
844                Some(&definition),
845                true,
846            )
847            .await
848    }
849
850    /// Compile with a name override + POST to `/api/pulse/pipelines`.
851    pub async fn deploy_with_name(
852        &self,
853        builder: &StreamBuilder,
854        name: &str,
855    ) -> Result<Value, PulseError> {
856        let definition = builder.build_with_name(name)?;
857        self.client
858            .request(
859                Method::POST,
860                "/api/pulse/pipelines",
861                Some(&definition),
862                true,
863            )
864            .await
865    }
866}
867
868impl std::fmt::Debug for StreamsResource<'_> {
869    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
870        f.debug_struct("StreamsResource").finish()
871    }
872}
873
874// ---------------------------------------------------------------------------
875// Internal helpers
876// ---------------------------------------------------------------------------
877
878fn require_nonblank(name: &str, value: &str) {
879    if value.trim().is_empty() {
880        panic!("{name} must be a non-empty string");
881    }
882}