Skip to main content

pulse_client/
streams.rs

1//! B-107 — Kafka-Streams-like declarative DSL that compiles to a Pulse pipeline.
2//!
3//! The DSL is **server-side execution, client-side declaration**: the operator
4//! chain is built in Rust, compiled to the JSON pipeline shape that the Pulse
5//! server's `StreamingOperatorValidator` accepts, and POSTed to
6//! `/api/pulse/pipelines`. Stream processing then runs on the Pulse engine
7//! (3.6 M evt/s native throughput), not in the client process.
8//!
9//! This is the opposite of Kafka Streams (which executes in the caller's JVM).
10//! The trade-off: you can't do microsecond client-side compute, but you get
11//! infinite-scale stateful streaming, durable replicated state queryable via
12//! B-106 IQ, and the same DSL works from any of the 5 Pulse SDKs.
13//!
14//! # Quick start
15//!
16//! ```no_run
17//! use pulse_client::{aggs, windows, PulseClient, StreamBuilder};
18//!
19//! # async fn run() -> Result<(), pulse_client::PulseError> {
20//! let client = PulseClient::builder()
21//!     .base_url("http://localhost:9090")
22//!     .token("ey...")
23//!     .build()?;
24//!
25//! let mut aggregations = std::collections::BTreeMap::new();
26//! aggregations.insert("avgTemp".to_string(), aggs::avg("temperature"));
27//!
28//! let builder = StreamBuilder::new("iot-temperature-aggregator")
29//!     .from_topic_with_engine("sensor-readings", "mqtt")
30//!     .key_by("deviceId")
31//!     .window_with_aggs(windows::tumbling("60s"), aggregations)
32//!     .filter("avgTemp > 75")
33//!     .to_topic_with_channel("sensor-minute-averages", "email");
34//!
35//! client.streams().deploy(&builder).await?;
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! Supported operators (mirror the 11 validated by the server's
41//! `StreamingOperatorValidator`): `filter`, `map`, `flat_map`, `key_by`,
42//! `window`, `branch`, `enrich`, `enrich_async`, `cep`, `broadcast_join`,
43//! `cdc_join`.
44//!
45//! Conditions and field-expressions are passed as **strings** — closures /
46//! lambdas are NOT supported because they can't be serialised to JSON.
47
48use std::collections::BTreeMap;
49
50use reqwest::Method;
51use serde_json::{json, Map, Value};
52
53use crate::client::PulseClient;
54use crate::error::PulseError;
55
56// ---------------------------------------------------------------------------
57// Window specs
58// ---------------------------------------------------------------------------
59
60/// A window specification. Compiled to the string form the server expects.
61///
62/// Construct via the [`windows`] helpers — never instantiate directly unless
63/// you've validated the raw string against `WindowEngine.parseSpec`.
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct WindowSpec {
66    spec: String,
67}
68
69impl WindowSpec {
70    /// Wraps a pre-validated raw spec string. Panics on empty input.
71    pub fn new(spec: impl Into<String>) -> Self {
72        let spec = spec.into();
73        if spec.trim().is_empty() {
74            panic!("WindowSpec requires a non-empty spec string");
75        }
76        Self { spec }
77    }
78
79    /// The raw spec string as it will appear on the wire.
80    pub fn spec(&self) -> &str {
81        &self.spec
82    }
83}
84
85impl std::fmt::Display for WindowSpec {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.write_str(&self.spec)
88    }
89}
90
91/// Window-spec factory namespace.
92///
93/// Each function returns a [`WindowSpec`] compiled to the exact form the
94/// server's `WindowEngine.parseSpec` accepts.
95pub mod windows {
96    use super::WindowSpec;
97
98    /// Non-overlapping fixed windows: `tumbling("60s")`.
99    pub fn tumbling(size: &str) -> WindowSpec {
100        require_nonblank("size", size);
101        WindowSpec::new(format!("tumbling({size})"))
102    }
103
104    /// Overlapping windows: `sliding("10m", "1m")` = size, slide.
105    pub fn sliding(size: &str, slide: &str) -> WindowSpec {
106        require_nonblank("size", size);
107        require_nonblank("slide", slide);
108        WindowSpec::new(format!("sliding({size},{slide})"))
109    }
110
111    /// Inactivity-bounded windows: `session("30s")`.
112    pub fn session(timeout: &str) -> WindowSpec {
113        require_nonblank("timeout", timeout);
114        WindowSpec::new(format!("session({timeout})"))
115    }
116
117    /// Single unbounded window. Use for global aggregates.
118    pub fn global() -> WindowSpec {
119        WindowSpec::new("global")
120    }
121
122    /// Event-count tumbling: closes after `n` events. `count(100)`.
123    pub fn count(n: u64) -> WindowSpec {
124        if n == 0 {
125            panic!("count window size must be positive, got 0");
126        }
127        WindowSpec::new(format!("count({n})"))
128    }
129
130    /// Event-count sliding: `count_sliding(100, 10)` = window, slide.
131    pub fn count_sliding(size: u64, slide: u64) -> WindowSpec {
132        if size == 0 || slide == 0 {
133            panic!("count_sliding requires positive size and slide, got {size}, {slide}");
134        }
135        WindowSpec::new(format!("count_sliding({size},{slide})"))
136    }
137
138    fn require_nonblank(name: &str, value: &str) {
139        if value.trim().is_empty() {
140            panic!("{name} must be a non-empty string");
141        }
142    }
143}
144
145// ---------------------------------------------------------------------------
146// Aggregators
147// ---------------------------------------------------------------------------
148
149/// Aggregator factory namespace.
150///
151/// Each function returns the string template the server's `Aggregators.parse`
152/// accepts inside `window.aggregations` (e.g. `"avg(temperature)"`).
153pub mod aggs {
154    /// Event count — no field required.
155    pub fn count() -> String {
156        "count()".into()
157    }
158
159    /// Sum of a numeric field: `aggs::sum("amount")`.
160    pub fn sum(field: &str) -> String {
161        require_nonblank("field", field);
162        format!("sum({field})")
163    }
164
165    /// Average of a numeric field.
166    pub fn avg(field: &str) -> String {
167        require_nonblank("field", field);
168        format!("avg({field})")
169    }
170
171    /// Minimum value of a numeric field.
172    pub fn min(field: &str) -> String {
173        require_nonblank("field", field);
174        format!("min({field})")
175    }
176
177    /// Maximum value of a numeric field.
178    pub fn max(field: &str) -> String {
179        require_nonblank("field", field);
180        format!("max({field})")
181    }
182
183    /// Collect every value of `field` into a list.
184    pub fn collect_list(field: &str) -> String {
185        require_nonblank("field", field);
186        format!("collect_list({field})")
187    }
188
189    /// Cardinality of distinct values of `field`.
190    pub fn distinct_count(field: &str) -> String {
191        require_nonblank("field", field);
192        format!("distinct_count({field})")
193    }
194
195    fn require_nonblank(name: &str, value: &str) {
196        if value.trim().is_empty() {
197            panic!("{name} must be a non-empty string");
198        }
199    }
200}
201
202// ---------------------------------------------------------------------------
203// Option carriers
204// ---------------------------------------------------------------------------
205
206/// Options for [`StreamBuilder::map`].
207#[derive(Debug, Clone, Default)]
208pub struct MapOptions {
209    /// Output-field-name → source-expression-string mapping.
210    pub fields: Option<BTreeMap<String, String>>,
211    /// Tag the output event with a `type` field.
212    pub target_type: Option<String>,
213}
214
215/// Options for [`StreamBuilder::window`].
216#[derive(Debug, Clone, Default)]
217pub struct WindowOptions {
218    /// Map of output-field → aggregator-string (use [`aggs`] for the right-hand side).
219    pub aggregations: Option<BTreeMap<String, String>>,
220    /// Override for where window results go.
221    pub output_topic: Option<String>,
222    /// Server-side trigger config (passed through opaquely).
223    pub trigger: Option<Value>,
224}
225
226/// One branch of [`StreamBuilder::branch`].
227#[derive(Debug, Clone)]
228pub struct BranchSpec {
229    pub condition: String,
230    pub topic: String,
231}
232
233impl BranchSpec {
234    pub fn new(condition: impl Into<String>, topic: impl Into<String>) -> Self {
235        Self {
236            condition: condition.into(),
237            topic: topic.into(),
238        }
239    }
240}
241
242/// Options for [`StreamBuilder::enrich_async`].
243#[derive(Debug, Clone, Default)]
244pub struct EnrichAsyncOptions {
245    pub url: String,
246    pub parallelism: Option<u32>,
247    pub queue_size: Option<u32>,
248    pub timeout_ms: Option<u32>,
249    pub max_retries: Option<u32>,
250    pub retry_backoff_ms: Option<u32>,
251    /// Must be `"PRESERVE_INPUT"` or `"UNORDERED"`.
252    pub ordering: Option<String>,
253    /// Must be `"EMIT_ERROR"`, `"DROP"`, or `"PASS_THROUGH"`.
254    pub on_failure: Option<String>,
255}
256
257/// Options for [`StreamBuilder::cep`].
258#[derive(Debug, Clone, Default)]
259pub struct CepOptions {
260    pub within: Option<String>,
261    pub name: Option<String>,
262}
263
264/// Options for [`StreamBuilder::broadcast_join`].
265#[derive(Debug, Clone, Default)]
266pub struct BroadcastJoinOptions {
267    pub join_key_field: String,
268    pub streaming_topic: Option<String>,
269    pub name: Option<String>,
270    pub max_bytes: Option<i64>,
271    /// Must be `"cdc"`, `"periodic"`, or `"explicit"`.
272    pub refresh_mode: Option<String>,
273    pub interval_millis: Option<u32>,
274}
275
276/// Options for [`StreamBuilder::cdc_join`].
277#[derive(Debug, Clone, Default)]
278pub struct CdcJoinOptions {
279    pub source: String,
280    pub join_key: Option<String>,
281    pub table: Option<String>,
282    pub state_backend: Option<String>,
283}
284
285/// B-109 — options for [`StreamBuilder::map_llm`]. `output_field` is required.
286#[derive(Debug, Clone, Default)]
287pub struct MapLlmOptions {
288    pub output_field: String,
289    pub model: Option<String>,
290    pub temperature: Option<f64>,
291    pub max_tokens: Option<u32>,
292    pub parallelism: Option<u32>,
293    /// Must be `"PRESERVE_INPUT"` or `"UNORDERED"`.
294    pub ordering: Option<String>,
295    /// Must be `"EMIT_ERROR"`, `"DROP"`, or `"PASS_THROUGH"`.
296    pub on_failure: Option<String>,
297    pub max_calls_per_sec: Option<u32>,
298}
299
300/// B-109 — options for [`StreamBuilder::extract`]. `instruction` + `schema` required.
301#[derive(Debug, Clone, Default)]
302pub struct ExtractOptions {
303    pub instruction: String,
304    pub schema: BTreeMap<String, String>,
305    pub model: Option<String>,
306    pub temperature: Option<f64>,
307    pub max_tokens: Option<u32>,
308    pub on_failure: Option<String>,
309}
310
311/// B-109 Phase 2 — options for [`StreamBuilder::mcp_call`].
312#[derive(Debug, Clone, Default)]
313pub struct McpCallOptions {
314    pub args: Option<BTreeMap<String, Value>>,
315    pub output_field: Option<String>,
316    pub parallelism: Option<u32>,
317    pub ordering: Option<String>,
318    pub on_failure: Option<String>,
319}
320
321/// B-112 — options for [`StreamBuilder::ml_predict`]. `model`, `input_fields`
322/// and `output_field` are required.
323#[derive(Debug, Clone, Default)]
324pub struct MlPredictOptions {
325    /// Registered model name (upload first via `client.models().upload(...)`).
326    pub model: String,
327    /// Feature names pulled from the event, in the model's input order.
328    /// Dotted paths (`customer.tier`) resolve through nested objects.
329    pub input_fields: Vec<String>,
330    /// Event field the prediction object is written to.
331    pub output_field: String,
332    pub parallelism: Option<u32>,
333    /// Must be `"PRESERVE_INPUT"` or `"UNORDERED"`.
334    pub ordering: Option<String>,
335    /// Must be `"EMIT_ERROR"`, `"DROP"`, or `"PASS_THROUGH"`.
336    pub on_failure: Option<String>,
337}
338
339// ---------------------------------------------------------------------------
340// StreamBuilder
341// ---------------------------------------------------------------------------
342
343/// Fluent builder for a Pulse streaming pipeline.
344///
345/// Chain operator methods, then call [`build`](Self::build) (or pass to
346/// [`StreamsResource::deploy`]).
347///
348/// All operator methods take `&mut self` and return `Self` so calls chain
349/// naturally. Methods that validate their inputs panic on obviously-bad
350/// arguments (blank required fields, non-positive counts, unknown enum
351/// values) so bugs are caught at call site, not after a 400 round-trip.
352#[derive(Debug, Clone, Default)]
353pub struct StreamBuilder {
354    name: Option<String>,
355    description: Option<String>,
356    agent_label: Option<String>,
357    input_topic: Option<String>,
358    source_engine: Option<String>,
359    source_config: Map<String, Value>,
360    source_label: Option<String>,
361    output_topic: Option<String>,
362    sink_channel: Option<String>,
363    sink_config: Map<String, Value>,
364    sink_label: Option<String>,
365    operators: Vec<Map<String, Value>>,
366}
367
368impl StreamBuilder {
369    /// Builder with the given pipeline name preset.
370    pub fn new(name: impl Into<String>) -> Self {
371        let name = name.into();
372        require_nonblank("name", &name);
373        Self {
374            name: Some(name),
375            ..Self::default()
376        }
377    }
378
379    /// Builder with no preset name. Use [`named`](Self::named) or pass the
380    /// name to [`build_with_name`](Self::build_with_name).
381    pub fn anonymous() -> Self {
382        Self::default()
383    }
384
385    // ------------------------------------------------------------------
386    // Source
387    // ------------------------------------------------------------------
388
389    /// Sets the input topic. Source engine defaults to `"kafka"`.
390    pub fn from_topic(mut self, topic: impl Into<String>) -> Self {
391        let topic = topic.into();
392        require_nonblank("topic", &topic);
393        self.input_topic = Some(topic);
394        self.source_engine = Some("kafka".into());
395        self
396    }
397
398    /// Sets the input topic + source engine.
399    pub fn from_topic_with_engine(
400        mut self,
401        topic: impl Into<String>,
402        engine: impl Into<String>,
403    ) -> Self {
404        let topic = topic.into();
405        let engine = engine.into();
406        require_nonblank("topic", &topic);
407        require_nonblank("engine", &engine);
408        self.input_topic = Some(topic);
409        self.source_engine = Some(engine);
410        self
411    }
412
413    /// Merges extra config into the source node's `config` map.
414    pub fn with_source_config(mut self, key: impl Into<String>, value: Value) -> Self {
415        self.source_config.insert(key.into(), value);
416        self
417    }
418
419    /// Sets the display label for the source node.
420    pub fn with_source_label(mut self, label: impl Into<String>) -> Self {
421        self.source_label = Some(label.into());
422        self
423    }
424
425    // ------------------------------------------------------------------
426    // Operators
427    // ------------------------------------------------------------------
428
429    /// Filter operator. `condition` is a CEL-like expression string.
430    pub fn filter(mut self, condition: impl Into<String>) -> Self {
431        let condition = condition.into();
432        require_nonblank("condition", &condition);
433        let mut op = Map::new();
434        op.insert("type".into(), Value::String("filter".into()));
435        op.insert("condition".into(), Value::String(condition));
436        self.operators.push(op);
437        self
438    }
439
440    /// Map operator. At least one of `options.fields` / `options.target_type` is required.
441    pub fn map(mut self, options: MapOptions) -> Self {
442        if options.fields.is_none() && options.target_type.is_none() {
443            panic!("map operator does nothing — provide `fields` or `target_type`");
444        }
445        let mut op = Map::new();
446        op.insert("type".into(), Value::String("map".into()));
447        if let Some(fields) = options.fields {
448            let mut m = Map::new();
449            for (k, v) in fields {
450                m.insert(k, Value::String(v));
451            }
452            op.insert("fields".into(), Value::Object(m));
453        }
454        if let Some(t) = options.target_type {
455            op.insert("targetType".into(), Value::String(t));
456        }
457        self.operators.push(op);
458        self
459    }
460
461    /// Flat-map: explode an array-valued field into one event per element.
462    pub fn flat_map(mut self, split_field: impl Into<String>) -> Self {
463        let split_field = split_field.into();
464        require_nonblank("split_field", &split_field);
465        let mut op = Map::new();
466        op.insert("type".into(), Value::String("flatMap".into()));
467        op.insert("splitField".into(), Value::String(split_field));
468        self.operators.push(op);
469        self
470    }
471
472    /// Group the stream by a top-level field value. Required before stateful ops.
473    pub fn key_by(mut self, field: impl Into<String>) -> Self {
474        let field = field.into();
475        require_nonblank("field", &field);
476        let mut op = Map::new();
477        op.insert("type".into(), Value::String("keyBy".into()));
478        op.insert("field".into(), Value::String(field));
479        self.operators.push(op);
480        self
481    }
482
483    /// Window operator with no extra options.
484    pub fn window(self, spec: WindowSpec) -> Self {
485        self.window_full(spec, WindowOptions::default())
486    }
487
488    /// Window operator with aggregations only.
489    pub fn window_with_aggs(
490        self,
491        spec: WindowSpec,
492        aggregations: BTreeMap<String, String>,
493    ) -> Self {
494        self.window_full(
495            spec,
496            WindowOptions {
497                aggregations: Some(aggregations),
498                ..Default::default()
499            },
500        )
501    }
502
503    /// Window operator with the full option set.
504    pub fn window_full(mut self, spec: WindowSpec, options: WindowOptions) -> Self {
505        let mut op = Map::new();
506        op.insert("type".into(), Value::String("window".into()));
507        op.insert("spec".into(), Value::String(spec.spec.clone()));
508        if let Some(aggs_map) = options.aggregations {
509            let mut m = Map::new();
510            for (k, v) in aggs_map {
511                m.insert(k, Value::String(v));
512            }
513            op.insert("aggregations".into(), Value::Object(m));
514        }
515        if let Some(out) = options.output_topic {
516            op.insert("outputTopic".into(), Value::String(out));
517        }
518        if let Some(trig) = options.trigger {
519            op.insert("trigger".into(), trig);
520        }
521        self.operators.push(op);
522        self
523    }
524
525    /// Window operator with a raw spec string. Useful when you've already
526    /// validated the spec against `WindowEngine.parseSpec`.
527    pub fn window_from_str(mut self, spec: &str, options: WindowOptions) -> Self {
528        require_nonblank("spec", spec);
529        self = self.window_full(WindowSpec::new(spec), options);
530        self
531    }
532
533    /// Branch operator: route events to different topics by condition.
534    pub fn branch(mut self, branches: Vec<BranchSpec>) -> Self {
535        if branches.is_empty() {
536            panic!("branch operator requires at least one branch");
537        }
538        let mut normalised = Vec::with_capacity(branches.len());
539        for (i, b) in branches.iter().enumerate() {
540            if b.condition.trim().is_empty() {
541                panic!("branch[{i}] requires a non-empty `condition`");
542            }
543            if b.topic.trim().is_empty() {
544                panic!("branch[{i}] requires a non-empty `topic`");
545            }
546            normalised.push(json!({
547                "condition": b.condition,
548                "topic": b.topic,
549            }));
550        }
551        let mut op = Map::new();
552        op.insert("type".into(), Value::String("branch".into()));
553        op.insert("branches".into(), Value::Array(normalised));
554        self.operators.push(op);
555        self
556    }
557
558    /// Synchronous enrichment: join the stream against a state-store topic.
559    pub fn enrich(mut self, lookup_topic: impl Into<String>, key_field: impl Into<String>) -> Self {
560        let lookup_topic = lookup_topic.into();
561        let key_field = key_field.into();
562        require_nonblank("lookup_topic", &lookup_topic);
563        require_nonblank("key_field", &key_field);
564        let mut op = Map::new();
565        op.insert("type".into(), Value::String("enrich".into()));
566        op.insert("lookupTopic".into(), Value::String(lookup_topic));
567        op.insert("keyField".into(), Value::String(key_field));
568        self.operators.push(op);
569        self
570    }
571
572    /// Asynchronous HTTP enrichment. `url` supports `{field}` placeholders.
573    pub fn enrich_async(mut self, options: EnrichAsyncOptions) -> Self {
574        require_nonblank("url", &options.url);
575        if let Some(ref o) = options.ordering {
576            if o != "PRESERVE_INPUT" && o != "UNORDERED" {
577                panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
578            }
579        }
580        if let Some(ref f) = options.on_failure {
581            if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
582                panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
583            }
584        }
585        let mut op = Map::new();
586        op.insert("type".into(), Value::String("enrichAsync".into()));
587        op.insert("url".into(), Value::String(options.url));
588        if let Some(v) = options.parallelism {
589            op.insert("parallelism".into(), Value::Number(v.into()));
590        }
591        if let Some(v) = options.queue_size {
592            op.insert("queueSize".into(), Value::Number(v.into()));
593        }
594        if let Some(v) = options.timeout_ms {
595            op.insert("timeoutMs".into(), Value::Number(v.into()));
596        }
597        if let Some(v) = options.max_retries {
598            op.insert("maxRetries".into(), Value::Number(v.into()));
599        }
600        if let Some(v) = options.retry_backoff_ms {
601            op.insert("retryBackoffMs".into(), Value::Number(v.into()));
602        }
603        if let Some(o) = options.ordering {
604            op.insert("ordering".into(), Value::String(o));
605        }
606        if let Some(f) = options.on_failure {
607            op.insert("onFailure".into(), Value::String(f));
608        }
609        self.operators.push(op);
610        self
611    }
612
613    /// Complex Event Processing: match a sequence of conditions.
614    pub fn cep(mut self, sequence: Vec<Value>, options: CepOptions) -> Self {
615        if sequence.is_empty() {
616            panic!("cep operator requires a non-empty sequence");
617        }
618        let mut op = Map::new();
619        op.insert("type".into(), Value::String("cep".into()));
620        op.insert("sequence".into(), Value::Array(sequence));
621        if let Some(w) = options.within {
622            op.insert("within".into(), Value::String(w));
623        }
624        if let Some(n) = options.name {
625            op.insert("name".into(), Value::String(n));
626        }
627        self.operators.push(op);
628        self
629    }
630
631    /// B-109 — enrich each event with an LLM completion. `prompt` supports
632    /// `{field}` placeholders (and `{__payload__}`) substituted from the event
633    /// server-side; the completion lands on the event under `output_field`.
634    pub fn map_llm(mut self, prompt: impl Into<String>, options: MapLlmOptions) -> Self {
635        let prompt = prompt.into();
636        require_nonblank("prompt", &prompt);
637        require_nonblank("output_field", &options.output_field);
638        if let Some(ref o) = options.ordering {
639            if o != "PRESERVE_INPUT" && o != "UNORDERED" {
640                panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
641            }
642        }
643        check_failure(&options.on_failure);
644        let mut op = Map::new();
645        op.insert("type".into(), Value::String("mapLlm".into()));
646        op.insert("prompt".into(), Value::String(prompt));
647        op.insert("outputField".into(), Value::String(options.output_field));
648        if let Some(m) = options.model {
649            op.insert("model".into(), Value::String(m));
650        }
651        if let Some(t) = options.temperature {
652            op.insert("temperature".into(), json!(t));
653        }
654        if let Some(n) = options.max_tokens {
655            op.insert("maxTokens".into(), Value::Number(n.into()));
656        }
657        if let Some(n) = options.parallelism {
658            op.insert("parallelism".into(), Value::Number(n.into()));
659        }
660        if let Some(o) = options.ordering {
661            op.insert("ordering".into(), Value::String(o));
662        }
663        if let Some(f) = options.on_failure {
664            op.insert("onFailure".into(), Value::String(f));
665        }
666        if let Some(n) = options.max_calls_per_sec {
667            op.insert("maxCallsPerSec".into(), Value::Number(n.into()));
668        }
669        self.operators.push(op);
670        self
671    }
672
673    /// B-109 — LLM → typed structured fields merged into the event. The LLM is
674    /// asked for a JSON object keyed by `options.schema`'s fields; missing /
675    /// malformed fields become null server-side.
676    pub fn extract(mut self, options: ExtractOptions) -> Self {
677        require_nonblank("instruction", &options.instruction);
678        if options.schema.is_empty() {
679            panic!("extract operator requires a non-empty schema");
680        }
681        check_failure(&options.on_failure);
682        let mut schema = Map::new();
683        for (k, v) in options.schema {
684            schema.insert(k, Value::String(v));
685        }
686        let mut op = Map::new();
687        op.insert("type".into(), Value::String("extract".into()));
688        op.insert("instruction".into(), Value::String(options.instruction));
689        op.insert("schema".into(), Value::Object(schema));
690        if let Some(m) = options.model {
691            op.insert("model".into(), Value::String(m));
692        }
693        if let Some(t) = options.temperature {
694            op.insert("temperature".into(), json!(t));
695        }
696        if let Some(n) = options.max_tokens {
697            op.insert("maxTokens".into(), Value::Number(n.into()));
698        }
699        if let Some(f) = options.on_failure {
700            op.insert("onFailure".into(), Value::String(f));
701        }
702        self.operators.push(op);
703        self
704    }
705
706    /// B-109 Phase 2 — invoke an MCP tool per event. `options.args` string
707    /// values support `{field}` substitution. On success the tool output is
708    /// written to `options.output_field` (omit for a fire-and-forget effect).
709    pub fn mcp_call(mut self, tool: impl Into<String>, options: McpCallOptions) -> Self {
710        let tool = tool.into();
711        require_nonblank("tool", &tool);
712        if let Some(ref o) = options.ordering {
713            if o != "PRESERVE_INPUT" && o != "UNORDERED" {
714                panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
715            }
716        }
717        check_failure(&options.on_failure);
718        let mut op = Map::new();
719        op.insert("type".into(), Value::String("mcpCall".into()));
720        op.insert("tool".into(), Value::String(tool));
721        if let Some(args) = options.args {
722            let mut m = Map::new();
723            for (k, v) in args {
724                m.insert(k, v);
725            }
726            op.insert("args".into(), Value::Object(m));
727        }
728        if let Some(f) = options.output_field {
729            op.insert("outputField".into(), Value::String(f));
730        }
731        if let Some(n) = options.parallelism {
732            op.insert("parallelism".into(), Value::Number(n.into()));
733        }
734        if let Some(o) = options.ordering {
735            op.insert("ordering".into(), Value::String(o));
736        }
737        if let Some(f) = options.on_failure {
738            op.insert("onFailure".into(), Value::String(f));
739        }
740        self.operators.push(op);
741        self
742    }
743
744    /// B-112 — score each event with an embedded ML model. The uploaded ONNX
745    /// model runs in-process on the Pulse engine (no model-server hop): the
746    /// named `options.input_fields` are pulled from the event payload, fed to
747    /// the model, and the model's output is written as a nested object under
748    /// `options.output_field` (so downstream operators can branch on it, e.g.
749    /// `.filter("prediction.fraud_score > 0.8")`).
750    ///
751    /// Upload the model first with [`ModelsResource::upload`](crate::ModelsResource::upload).
752    pub fn ml_predict(mut self, options: MlPredictOptions) -> Self {
753        require_nonblank("model", &options.model);
754        require_nonblank("output_field", &options.output_field);
755        if options.input_fields.is_empty()
756            || options.input_fields.iter().any(|f| f.trim().is_empty())
757        {
758            panic!("input_fields must be a non-empty list of non-blank strings");
759        }
760        if let Some(ref o) = options.ordering {
761            if o != "PRESERVE_INPUT" && o != "UNORDERED" {
762                panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
763            }
764        }
765        check_failure(&options.on_failure);
766        let mut op = Map::new();
767        op.insert("type".into(), Value::String("mlPredict".into()));
768        op.insert("model".into(), Value::String(options.model));
769        op.insert(
770            "inputFields".into(),
771            Value::Array(
772                options
773                    .input_fields
774                    .into_iter()
775                    .map(Value::String)
776                    .collect(),
777            ),
778        );
779        op.insert("outputField".into(), Value::String(options.output_field));
780        if let Some(n) = options.parallelism {
781            op.insert("parallelism".into(), Value::Number(n.into()));
782        }
783        if let Some(o) = options.ordering {
784            op.insert("ordering".into(), Value::String(o));
785        }
786        if let Some(f) = options.on_failure {
787            op.insert("onFailure".into(), Value::String(f));
788        }
789        self.operators.push(op);
790        self
791    }
792
793    /// Broadcast join: enrich the stream against a fully-replicated table.
794    pub fn broadcast_join(mut self, options: BroadcastJoinOptions) -> Self {
795        require_nonblank("join_key_field", &options.join_key_field);
796        if let Some(ref m) = options.refresh_mode {
797            if m != "cdc" && m != "periodic" && m != "explicit" {
798                panic!("refresh_mode must be cdc, periodic, or explicit, got {m:?}");
799            }
800        }
801        let mut op = Map::new();
802        op.insert("type".into(), Value::String("broadcastJoin".into()));
803        op.insert("joinKeyField".into(), Value::String(options.join_key_field));
804        if let Some(t) = options.streaming_topic {
805            op.insert("streamingTopic".into(), Value::String(t));
806        }
807        if let Some(n) = options.name {
808            op.insert("name".into(), Value::String(n));
809        }
810        if let Some(b) = options.max_bytes {
811            op.insert("maxBytes".into(), Value::Number(b.into()));
812        }
813        if let Some(m) = options.refresh_mode {
814            op.insert("refreshMode".into(), Value::String(m));
815        }
816        if let Some(i) = options.interval_millis {
817            op.insert("intervalMillis".into(), Value::Number(i.into()));
818        }
819        self.operators.push(op);
820        self
821    }
822
823    /// CDC join: stream-table join against a CDC-fed state table.
824    pub fn cdc_join(mut self, options: CdcJoinOptions) -> Self {
825        require_nonblank("source", &options.source);
826        let mut op = Map::new();
827        op.insert("type".into(), Value::String("cdcJoin".into()));
828        op.insert("source".into(), Value::String(options.source));
829        if let Some(k) = options.join_key {
830            op.insert("joinKey".into(), Value::String(k));
831        }
832        if let Some(t) = options.table {
833            op.insert("table".into(), Value::String(t));
834        }
835        if let Some(b) = options.state_backend {
836            op.insert("stateBackend".into(), Value::String(b));
837        }
838        self.operators.push(op);
839        self
840    }
841
842    // ------------------------------------------------------------------
843    // Sink
844    // ------------------------------------------------------------------
845
846    /// Sets the output topic only. No sink node is emitted.
847    pub fn to_topic(mut self, topic: impl Into<String>) -> Self {
848        let topic = topic.into();
849        require_nonblank("topic", &topic);
850        self.output_topic = Some(topic);
851        self.sink_channel = None;
852        self
853    }
854
855    /// Sets the output topic + sink channel (emits a sink node).
856    pub fn to_topic_with_channel(
857        mut self,
858        topic: impl Into<String>,
859        channel: impl Into<String>,
860    ) -> Self {
861        let topic = topic.into();
862        let channel = channel.into();
863        require_nonblank("topic", &topic);
864        require_nonblank("channel", &channel);
865        self.output_topic = Some(topic);
866        self.sink_channel = Some(channel);
867        self
868    }
869
870    /// Terminate the stream in a connector sink (Segment, Kafka, Postgres, …) —
871    /// an ergonomic, connector-first alias for
872    /// [`to_topic_with_channel`](Self::to_topic_with_channel) using an
873    /// intermediate `<connector_type>-sink-out` topic. Chain
874    /// [`with_sink_config`](Self::with_sink_config) for the connector config.
875    /// `connector_type` is a subType from `client.connectors()`; bridged
876    /// connectors require the enterprise bridge JAR on the server.
877    pub fn to_connector(self, connector_type: impl Into<String>) -> Self {
878        let ct = connector_type.into();
879        require_nonblank("connector_type", &ct);
880        let topic = format!("{ct}-sink-out");
881        self.to_topic_with_channel(topic, ct)
882    }
883
884    /// Merges extra config into the sink node's `config` map.
885    pub fn with_sink_config(mut self, key: impl Into<String>, value: Value) -> Self {
886        self.sink_config.insert(key.into(), value);
887        self
888    }
889
890    /// Sets the display label for the sink node.
891    pub fn with_sink_label(mut self, label: impl Into<String>) -> Self {
892        self.sink_label = Some(label.into());
893        self
894    }
895
896    /// Terminate the stream in the agent's state store (queryable via B-106 IQ).
897    pub fn to_state(mut self) -> Self {
898        self.output_topic = None;
899        self.sink_channel = None;
900        self.sink_config = Map::new();
901        self.sink_label = None;
902        self
903    }
904
905    // ------------------------------------------------------------------
906    // Metadata
907    // ------------------------------------------------------------------
908
909    /// Sets / overrides the pipeline name.
910    pub fn named(mut self, name: impl Into<String>) -> Self {
911        let name = name.into();
912        require_nonblank("name", &name);
913        self.name = Some(name);
914        self
915    }
916
917    /// Sets the pipeline description.
918    pub fn described_as(mut self, description: impl Into<String>) -> Self {
919        self.description = Some(description.into());
920        self
921    }
922
923    /// Sets the display label for the streaming agent node.
924    pub fn with_agent_label(mut self, label: impl Into<String>) -> Self {
925        let label = label.into();
926        require_nonblank("label", &label);
927        self.agent_label = Some(label);
928        self
929    }
930
931    // ------------------------------------------------------------------
932    // Compilation
933    // ------------------------------------------------------------------
934
935    /// Returns a read-only view of the recorded operator chain.
936    pub fn operators(&self) -> &[Map<String, Value>] {
937        &self.operators
938    }
939
940    /// Compile the chain into a Pulse pipeline dict ready for POST.
941    pub fn build(&self) -> Result<Value, PulseError> {
942        self.build_inner(None)
943    }
944
945    /// Same as [`build`](Self::build) but overrides the pipeline name.
946    pub fn build_with_name(&self, name: &str) -> Result<Value, PulseError> {
947        require_nonblank("name", name);
948        self.build_inner(Some(name.to_string()))
949    }
950
951    fn build_inner(&self, override_name: Option<String>) -> Result<Value, PulseError> {
952        let pipeline_name = override_name.or_else(|| self.name.clone()).ok_or_else(|| {
953            PulseError::InvalidConfig(
954                "pipeline name required — pass to StreamBuilder::new or build_with_name".into(),
955            )
956        })?;
957        let input_topic = self.input_topic.as_ref().ok_or_else(|| {
958            PulseError::InvalidConfig("no source — call .from_topic(...) before build()".into())
959        })?;
960        if self.operators.is_empty() {
961            return Err(PulseError::InvalidConfig(
962                "no operators — chain at least one of .filter/.map/.key_by/... before build()"
963                    .into(),
964            ));
965        }
966
967        let source_engine = self.source_engine.as_deref().unwrap_or("kafka");
968
969        let mut nodes: Vec<Value> = Vec::with_capacity(3);
970
971        // Source node
972        let mut src_config = Map::new();
973        src_config.insert("engine".into(), Value::String(source_engine.to_string()));
974        src_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
975        for (k, v) in &self.source_config {
976            src_config.insert(k.clone(), v.clone());
977        }
978        let src_label = self
979            .source_label
980            .clone()
981            .unwrap_or_else(|| format!("{source_engine} source"));
982        nodes.push(json!({
983            "type": "source",
984            "label": src_label,
985            "config": Value::Object(src_config),
986        }));
987
988        // Agent node
989        let mut agent_config = Map::new();
990        agent_config.insert("engine".into(), Value::String("streaming".into()));
991        agent_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
992        let ops_value: Vec<Value> = self
993            .operators
994            .iter()
995            .map(|op| Value::Object(op.clone()))
996            .collect();
997        agent_config.insert("operators".into(), Value::Array(ops_value));
998        if let Some(ref out) = self.output_topic {
999            agent_config.insert("outputTopic".into(), Value::String(out.clone()));
1000        }
1001        let agent_label = self
1002            .agent_label
1003            .clone()
1004            .unwrap_or_else(|| pipeline_name.clone());
1005        nodes.push(json!({
1006            "type": "agent",
1007            "label": agent_label,
1008            "config": Value::Object(agent_config),
1009        }));
1010
1011        // Sink node — only when both output_topic AND sink_channel are set
1012        if let (Some(out), Some(ch)) = (self.output_topic.as_ref(), self.sink_channel.as_ref()) {
1013            let mut sink_conf = Map::new();
1014            sink_conf.insert("channel".into(), Value::String(ch.clone()));
1015            sink_conf.insert("inputTopic".into(), Value::String(out.clone()));
1016            for (k, v) in &self.sink_config {
1017                sink_conf.insert(k.clone(), v.clone());
1018            }
1019            let sink_label = self
1020                .sink_label
1021                .clone()
1022                .unwrap_or_else(|| format!("{ch} sink"));
1023            nodes.push(json!({
1024                "type": "sink",
1025                "label": sink_label,
1026                "config": Value::Object(sink_conf),
1027            }));
1028        }
1029
1030        let mut pipeline = Map::new();
1031        pipeline.insert("name".into(), Value::String(pipeline_name));
1032        pipeline.insert("nodes".into(), Value::Array(nodes));
1033        if let Some(ref desc) = self.description {
1034            pipeline.insert("description".into(), Value::String(desc.clone()));
1035        }
1036        Ok(Value::Object(pipeline))
1037    }
1038}
1039
1040// ---------------------------------------------------------------------------
1041// StreamsResource — the client.streams() accessor
1042// ---------------------------------------------------------------------------
1043
1044/// `client.streams()` — compile + deploy [`StreamBuilder`] pipelines.
1045///
1046/// Sugar over `client.pipelines().create()` — the compile happens client-side,
1047/// the deploy is the same POST.
1048pub struct StreamsResource<'c> {
1049    pub(crate) client: &'c PulseClient,
1050}
1051
1052impl<'c> StreamsResource<'c> {
1053    /// Compile the builder to a pipeline dict WITHOUT deploying.
1054    pub fn compile(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
1055        builder.build()
1056    }
1057
1058    /// Compile with a name override WITHOUT deploying.
1059    pub fn compile_with_name(
1060        &self,
1061        builder: &StreamBuilder,
1062        name: &str,
1063    ) -> Result<Value, PulseError> {
1064        builder.build_with_name(name)
1065    }
1066
1067    /// Compile + POST to `/api/pulse/pipelines`. Returns the server response.
1068    pub async fn deploy(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
1069        let definition = builder.build()?;
1070        self.client
1071            .request(
1072                Method::POST,
1073                "/api/pulse/pipelines",
1074                Some(&definition),
1075                true,
1076            )
1077            .await
1078    }
1079
1080    /// Compile with a name override + POST to `/api/pulse/pipelines`.
1081    pub async fn deploy_with_name(
1082        &self,
1083        builder: &StreamBuilder,
1084        name: &str,
1085    ) -> Result<Value, PulseError> {
1086        let definition = builder.build_with_name(name)?;
1087        self.client
1088            .request(
1089                Method::POST,
1090                "/api/pulse/pipelines",
1091                Some(&definition),
1092                true,
1093            )
1094            .await
1095    }
1096}
1097
1098impl std::fmt::Debug for StreamsResource<'_> {
1099    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100        f.debug_struct("StreamsResource").finish()
1101    }
1102}
1103
1104// ---------------------------------------------------------------------------
1105// Internal helpers
1106// ---------------------------------------------------------------------------
1107
1108fn require_nonblank(name: &str, value: &str) {
1109    if value.trim().is_empty() {
1110        panic!("{name} must be a non-empty string");
1111    }
1112}
1113
1114/// Panics if `on_failure` is set to an invalid value (B-109).
1115fn check_failure(on_failure: &Option<String>) {
1116    if let Some(f) = on_failure {
1117        if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
1118            panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
1119        }
1120    }
1121}