1use std::collections::BTreeMap;
49
50use reqwest::Method;
51use serde_json::{json, Map, Value};
52
53use crate::client::PulseClient;
54use crate::error::PulseError;
55
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct WindowSpec {
66 spec: String,
67}
68
69impl WindowSpec {
70 pub fn new(spec: impl Into<String>) -> Self {
72 let spec = spec.into();
73 if spec.trim().is_empty() {
74 panic!("WindowSpec requires a non-empty spec string");
75 }
76 Self { spec }
77 }
78
79 pub fn spec(&self) -> &str {
81 &self.spec
82 }
83}
84
85impl std::fmt::Display for WindowSpec {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.write_str(&self.spec)
88 }
89}
90
91pub mod windows {
96 use super::WindowSpec;
97
98 pub fn tumbling(size: &str) -> WindowSpec {
100 require_nonblank("size", size);
101 WindowSpec::new(format!("tumbling({size})"))
102 }
103
104 pub fn sliding(size: &str, slide: &str) -> WindowSpec {
106 require_nonblank("size", size);
107 require_nonblank("slide", slide);
108 WindowSpec::new(format!("sliding({size},{slide})"))
109 }
110
111 pub fn session(timeout: &str) -> WindowSpec {
113 require_nonblank("timeout", timeout);
114 WindowSpec::new(format!("session({timeout})"))
115 }
116
117 pub fn global() -> WindowSpec {
119 WindowSpec::new("global")
120 }
121
122 pub fn count(n: u64) -> WindowSpec {
124 if n == 0 {
125 panic!("count window size must be positive, got 0");
126 }
127 WindowSpec::new(format!("count({n})"))
128 }
129
130 pub fn count_sliding(size: u64, slide: u64) -> WindowSpec {
132 if size == 0 || slide == 0 {
133 panic!("count_sliding requires positive size and slide, got {size}, {slide}");
134 }
135 WindowSpec::new(format!("count_sliding({size},{slide})"))
136 }
137
138 fn require_nonblank(name: &str, value: &str) {
139 if value.trim().is_empty() {
140 panic!("{name} must be a non-empty string");
141 }
142 }
143}
144
145pub mod aggs {
154 pub fn count() -> String {
156 "count()".into()
157 }
158
159 pub fn sum(field: &str) -> String {
161 require_nonblank("field", field);
162 format!("sum({field})")
163 }
164
165 pub fn avg(field: &str) -> String {
167 require_nonblank("field", field);
168 format!("avg({field})")
169 }
170
171 pub fn min(field: &str) -> String {
173 require_nonblank("field", field);
174 format!("min({field})")
175 }
176
177 pub fn max(field: &str) -> String {
179 require_nonblank("field", field);
180 format!("max({field})")
181 }
182
183 pub fn collect_list(field: &str) -> String {
185 require_nonblank("field", field);
186 format!("collect_list({field})")
187 }
188
189 pub fn distinct_count(field: &str) -> String {
191 require_nonblank("field", field);
192 format!("distinct_count({field})")
193 }
194
195 fn require_nonblank(name: &str, value: &str) {
196 if value.trim().is_empty() {
197 panic!("{name} must be a non-empty string");
198 }
199 }
200}
201
202#[derive(Debug, Clone, Default)]
208pub struct MapOptions {
209 pub fields: Option<BTreeMap<String, String>>,
211 pub target_type: Option<String>,
213}
214
215#[derive(Debug, Clone, Default)]
217pub struct WindowOptions {
218 pub aggregations: Option<BTreeMap<String, String>>,
220 pub output_topic: Option<String>,
222 pub trigger: Option<Value>,
224}
225
226#[derive(Debug, Clone)]
228pub struct BranchSpec {
229 pub condition: String,
230 pub topic: String,
231}
232
233impl BranchSpec {
234 pub fn new(condition: impl Into<String>, topic: impl Into<String>) -> Self {
235 Self {
236 condition: condition.into(),
237 topic: topic.into(),
238 }
239 }
240}
241
242#[derive(Debug, Clone, Default)]
244pub struct EnrichAsyncOptions {
245 pub url: String,
246 pub parallelism: Option<u32>,
247 pub queue_size: Option<u32>,
248 pub timeout_ms: Option<u32>,
249 pub max_retries: Option<u32>,
250 pub retry_backoff_ms: Option<u32>,
251 pub ordering: Option<String>,
253 pub on_failure: Option<String>,
255}
256
257#[derive(Debug, Clone, Default)]
259pub struct CepOptions {
260 pub within: Option<String>,
261 pub name: Option<String>,
262}
263
264#[derive(Debug, Clone, Default)]
266pub struct BroadcastJoinOptions {
267 pub join_key_field: String,
268 pub streaming_topic: Option<String>,
269 pub name: Option<String>,
270 pub max_bytes: Option<i64>,
271 pub refresh_mode: Option<String>,
273 pub interval_millis: Option<u32>,
274}
275
276#[derive(Debug, Clone, Default)]
278pub struct CdcJoinOptions {
279 pub source: String,
280 pub join_key: Option<String>,
281 pub table: Option<String>,
282 pub state_backend: Option<String>,
283}
284
285#[derive(Debug, Clone, Default)]
287pub struct MapLlmOptions {
288 pub output_field: String,
289 pub model: Option<String>,
290 pub temperature: Option<f64>,
291 pub max_tokens: Option<u32>,
292 pub parallelism: Option<u32>,
293 pub ordering: Option<String>,
295 pub on_failure: Option<String>,
297 pub max_calls_per_sec: Option<u32>,
298}
299
300#[derive(Debug, Clone, Default)]
302pub struct ExtractOptions {
303 pub instruction: String,
304 pub schema: BTreeMap<String, String>,
305 pub model: Option<String>,
306 pub temperature: Option<f64>,
307 pub max_tokens: Option<u32>,
308 pub on_failure: Option<String>,
309}
310
311#[derive(Debug, Clone, Default)]
313pub struct McpCallOptions {
314 pub args: Option<BTreeMap<String, Value>>,
315 pub output_field: Option<String>,
316 pub parallelism: Option<u32>,
317 pub ordering: Option<String>,
318 pub on_failure: Option<String>,
319}
320
321#[derive(Debug, Clone, Default)]
324pub struct MlPredictOptions {
325 pub model: String,
327 pub input_fields: Vec<String>,
330 pub output_field: String,
332 pub parallelism: Option<u32>,
333 pub ordering: Option<String>,
335 pub on_failure: Option<String>,
337}
338
339#[derive(Debug, Clone, Default)]
353pub struct StreamBuilder {
354 name: Option<String>,
355 description: Option<String>,
356 agent_label: Option<String>,
357 input_topic: Option<String>,
358 source_engine: Option<String>,
359 source_config: Map<String, Value>,
360 source_label: Option<String>,
361 output_topic: Option<String>,
362 sink_channel: Option<String>,
363 sink_config: Map<String, Value>,
364 sink_label: Option<String>,
365 operators: Vec<Map<String, Value>>,
366}
367
368impl StreamBuilder {
369 pub fn new(name: impl Into<String>) -> Self {
371 let name = name.into();
372 require_nonblank("name", &name);
373 Self {
374 name: Some(name),
375 ..Self::default()
376 }
377 }
378
379 pub fn anonymous() -> Self {
382 Self::default()
383 }
384
385 pub fn from_topic(mut self, topic: impl Into<String>) -> Self {
391 let topic = topic.into();
392 require_nonblank("topic", &topic);
393 self.input_topic = Some(topic);
394 self.source_engine = Some("kafka".into());
395 self
396 }
397
398 pub fn from_topic_with_engine(
400 mut self,
401 topic: impl Into<String>,
402 engine: impl Into<String>,
403 ) -> Self {
404 let topic = topic.into();
405 let engine = engine.into();
406 require_nonblank("topic", &topic);
407 require_nonblank("engine", &engine);
408 self.input_topic = Some(topic);
409 self.source_engine = Some(engine);
410 self
411 }
412
413 pub fn with_source_config(mut self, key: impl Into<String>, value: Value) -> Self {
415 self.source_config.insert(key.into(), value);
416 self
417 }
418
419 pub fn with_source_label(mut self, label: impl Into<String>) -> Self {
421 self.source_label = Some(label.into());
422 self
423 }
424
425 pub fn filter(mut self, condition: impl Into<String>) -> Self {
431 let condition = condition.into();
432 require_nonblank("condition", &condition);
433 let mut op = Map::new();
434 op.insert("type".into(), Value::String("filter".into()));
435 op.insert("condition".into(), Value::String(condition));
436 self.operators.push(op);
437 self
438 }
439
440 pub fn map(mut self, options: MapOptions) -> Self {
442 if options.fields.is_none() && options.target_type.is_none() {
443 panic!("map operator does nothing — provide `fields` or `target_type`");
444 }
445 let mut op = Map::new();
446 op.insert("type".into(), Value::String("map".into()));
447 if let Some(fields) = options.fields {
448 let mut m = Map::new();
449 for (k, v) in fields {
450 m.insert(k, Value::String(v));
451 }
452 op.insert("fields".into(), Value::Object(m));
453 }
454 if let Some(t) = options.target_type {
455 op.insert("targetType".into(), Value::String(t));
456 }
457 self.operators.push(op);
458 self
459 }
460
461 pub fn flat_map(mut self, split_field: impl Into<String>) -> Self {
463 let split_field = split_field.into();
464 require_nonblank("split_field", &split_field);
465 let mut op = Map::new();
466 op.insert("type".into(), Value::String("flatMap".into()));
467 op.insert("splitField".into(), Value::String(split_field));
468 self.operators.push(op);
469 self
470 }
471
472 pub fn key_by(mut self, field: impl Into<String>) -> Self {
474 let field = field.into();
475 require_nonblank("field", &field);
476 let mut op = Map::new();
477 op.insert("type".into(), Value::String("keyBy".into()));
478 op.insert("field".into(), Value::String(field));
479 self.operators.push(op);
480 self
481 }
482
483 pub fn window(self, spec: WindowSpec) -> Self {
485 self.window_full(spec, WindowOptions::default())
486 }
487
488 pub fn window_with_aggs(
490 self,
491 spec: WindowSpec,
492 aggregations: BTreeMap<String, String>,
493 ) -> Self {
494 self.window_full(
495 spec,
496 WindowOptions {
497 aggregations: Some(aggregations),
498 ..Default::default()
499 },
500 )
501 }
502
503 pub fn window_full(mut self, spec: WindowSpec, options: WindowOptions) -> Self {
505 let mut op = Map::new();
506 op.insert("type".into(), Value::String("window".into()));
507 op.insert("spec".into(), Value::String(spec.spec.clone()));
508 if let Some(aggs_map) = options.aggregations {
509 let mut m = Map::new();
510 for (k, v) in aggs_map {
511 m.insert(k, Value::String(v));
512 }
513 op.insert("aggregations".into(), Value::Object(m));
514 }
515 if let Some(out) = options.output_topic {
516 op.insert("outputTopic".into(), Value::String(out));
517 }
518 if let Some(trig) = options.trigger {
519 op.insert("trigger".into(), trig);
520 }
521 self.operators.push(op);
522 self
523 }
524
525 pub fn window_from_str(mut self, spec: &str, options: WindowOptions) -> Self {
528 require_nonblank("spec", spec);
529 self = self.window_full(WindowSpec::new(spec), options);
530 self
531 }
532
533 pub fn branch(mut self, branches: Vec<BranchSpec>) -> Self {
535 if branches.is_empty() {
536 panic!("branch operator requires at least one branch");
537 }
538 let mut normalised = Vec::with_capacity(branches.len());
539 for (i, b) in branches.iter().enumerate() {
540 if b.condition.trim().is_empty() {
541 panic!("branch[{i}] requires a non-empty `condition`");
542 }
543 if b.topic.trim().is_empty() {
544 panic!("branch[{i}] requires a non-empty `topic`");
545 }
546 normalised.push(json!({
547 "condition": b.condition,
548 "topic": b.topic,
549 }));
550 }
551 let mut op = Map::new();
552 op.insert("type".into(), Value::String("branch".into()));
553 op.insert("branches".into(), Value::Array(normalised));
554 self.operators.push(op);
555 self
556 }
557
558 pub fn enrich(mut self, lookup_topic: impl Into<String>, key_field: impl Into<String>) -> Self {
560 let lookup_topic = lookup_topic.into();
561 let key_field = key_field.into();
562 require_nonblank("lookup_topic", &lookup_topic);
563 require_nonblank("key_field", &key_field);
564 let mut op = Map::new();
565 op.insert("type".into(), Value::String("enrich".into()));
566 op.insert("lookupTopic".into(), Value::String(lookup_topic));
567 op.insert("keyField".into(), Value::String(key_field));
568 self.operators.push(op);
569 self
570 }
571
572 pub fn enrich_async(mut self, options: EnrichAsyncOptions) -> Self {
574 require_nonblank("url", &options.url);
575 if let Some(ref o) = options.ordering {
576 if o != "PRESERVE_INPUT" && o != "UNORDERED" {
577 panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
578 }
579 }
580 if let Some(ref f) = options.on_failure {
581 if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
582 panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
583 }
584 }
585 let mut op = Map::new();
586 op.insert("type".into(), Value::String("enrichAsync".into()));
587 op.insert("url".into(), Value::String(options.url));
588 if let Some(v) = options.parallelism {
589 op.insert("parallelism".into(), Value::Number(v.into()));
590 }
591 if let Some(v) = options.queue_size {
592 op.insert("queueSize".into(), Value::Number(v.into()));
593 }
594 if let Some(v) = options.timeout_ms {
595 op.insert("timeoutMs".into(), Value::Number(v.into()));
596 }
597 if let Some(v) = options.max_retries {
598 op.insert("maxRetries".into(), Value::Number(v.into()));
599 }
600 if let Some(v) = options.retry_backoff_ms {
601 op.insert("retryBackoffMs".into(), Value::Number(v.into()));
602 }
603 if let Some(o) = options.ordering {
604 op.insert("ordering".into(), Value::String(o));
605 }
606 if let Some(f) = options.on_failure {
607 op.insert("onFailure".into(), Value::String(f));
608 }
609 self.operators.push(op);
610 self
611 }
612
613 pub fn cep(mut self, sequence: Vec<Value>, options: CepOptions) -> Self {
615 if sequence.is_empty() {
616 panic!("cep operator requires a non-empty sequence");
617 }
618 let mut op = Map::new();
619 op.insert("type".into(), Value::String("cep".into()));
620 op.insert("sequence".into(), Value::Array(sequence));
621 if let Some(w) = options.within {
622 op.insert("within".into(), Value::String(w));
623 }
624 if let Some(n) = options.name {
625 op.insert("name".into(), Value::String(n));
626 }
627 self.operators.push(op);
628 self
629 }
630
631 pub fn map_llm(mut self, prompt: impl Into<String>, options: MapLlmOptions) -> Self {
635 let prompt = prompt.into();
636 require_nonblank("prompt", &prompt);
637 require_nonblank("output_field", &options.output_field);
638 if let Some(ref o) = options.ordering {
639 if o != "PRESERVE_INPUT" && o != "UNORDERED" {
640 panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
641 }
642 }
643 check_failure(&options.on_failure);
644 let mut op = Map::new();
645 op.insert("type".into(), Value::String("mapLlm".into()));
646 op.insert("prompt".into(), Value::String(prompt));
647 op.insert("outputField".into(), Value::String(options.output_field));
648 if let Some(m) = options.model {
649 op.insert("model".into(), Value::String(m));
650 }
651 if let Some(t) = options.temperature {
652 op.insert("temperature".into(), json!(t));
653 }
654 if let Some(n) = options.max_tokens {
655 op.insert("maxTokens".into(), Value::Number(n.into()));
656 }
657 if let Some(n) = options.parallelism {
658 op.insert("parallelism".into(), Value::Number(n.into()));
659 }
660 if let Some(o) = options.ordering {
661 op.insert("ordering".into(), Value::String(o));
662 }
663 if let Some(f) = options.on_failure {
664 op.insert("onFailure".into(), Value::String(f));
665 }
666 if let Some(n) = options.max_calls_per_sec {
667 op.insert("maxCallsPerSec".into(), Value::Number(n.into()));
668 }
669 self.operators.push(op);
670 self
671 }
672
673 pub fn extract(mut self, options: ExtractOptions) -> Self {
677 require_nonblank("instruction", &options.instruction);
678 if options.schema.is_empty() {
679 panic!("extract operator requires a non-empty schema");
680 }
681 check_failure(&options.on_failure);
682 let mut schema = Map::new();
683 for (k, v) in options.schema {
684 schema.insert(k, Value::String(v));
685 }
686 let mut op = Map::new();
687 op.insert("type".into(), Value::String("extract".into()));
688 op.insert("instruction".into(), Value::String(options.instruction));
689 op.insert("schema".into(), Value::Object(schema));
690 if let Some(m) = options.model {
691 op.insert("model".into(), Value::String(m));
692 }
693 if let Some(t) = options.temperature {
694 op.insert("temperature".into(), json!(t));
695 }
696 if let Some(n) = options.max_tokens {
697 op.insert("maxTokens".into(), Value::Number(n.into()));
698 }
699 if let Some(f) = options.on_failure {
700 op.insert("onFailure".into(), Value::String(f));
701 }
702 self.operators.push(op);
703 self
704 }
705
706 pub fn mcp_call(mut self, tool: impl Into<String>, options: McpCallOptions) -> Self {
710 let tool = tool.into();
711 require_nonblank("tool", &tool);
712 if let Some(ref o) = options.ordering {
713 if o != "PRESERVE_INPUT" && o != "UNORDERED" {
714 panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
715 }
716 }
717 check_failure(&options.on_failure);
718 let mut op = Map::new();
719 op.insert("type".into(), Value::String("mcpCall".into()));
720 op.insert("tool".into(), Value::String(tool));
721 if let Some(args) = options.args {
722 let mut m = Map::new();
723 for (k, v) in args {
724 m.insert(k, v);
725 }
726 op.insert("args".into(), Value::Object(m));
727 }
728 if let Some(f) = options.output_field {
729 op.insert("outputField".into(), Value::String(f));
730 }
731 if let Some(n) = options.parallelism {
732 op.insert("parallelism".into(), Value::Number(n.into()));
733 }
734 if let Some(o) = options.ordering {
735 op.insert("ordering".into(), Value::String(o));
736 }
737 if let Some(f) = options.on_failure {
738 op.insert("onFailure".into(), Value::String(f));
739 }
740 self.operators.push(op);
741 self
742 }
743
744 pub fn ml_predict(mut self, options: MlPredictOptions) -> Self {
753 require_nonblank("model", &options.model);
754 require_nonblank("output_field", &options.output_field);
755 if options.input_fields.is_empty()
756 || options.input_fields.iter().any(|f| f.trim().is_empty())
757 {
758 panic!("input_fields must be a non-empty list of non-blank strings");
759 }
760 if let Some(ref o) = options.ordering {
761 if o != "PRESERVE_INPUT" && o != "UNORDERED" {
762 panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
763 }
764 }
765 check_failure(&options.on_failure);
766 let mut op = Map::new();
767 op.insert("type".into(), Value::String("mlPredict".into()));
768 op.insert("model".into(), Value::String(options.model));
769 op.insert(
770 "inputFields".into(),
771 Value::Array(
772 options
773 .input_fields
774 .into_iter()
775 .map(Value::String)
776 .collect(),
777 ),
778 );
779 op.insert("outputField".into(), Value::String(options.output_field));
780 if let Some(n) = options.parallelism {
781 op.insert("parallelism".into(), Value::Number(n.into()));
782 }
783 if let Some(o) = options.ordering {
784 op.insert("ordering".into(), Value::String(o));
785 }
786 if let Some(f) = options.on_failure {
787 op.insert("onFailure".into(), Value::String(f));
788 }
789 self.operators.push(op);
790 self
791 }
792
793 pub fn broadcast_join(mut self, options: BroadcastJoinOptions) -> Self {
795 require_nonblank("join_key_field", &options.join_key_field);
796 if let Some(ref m) = options.refresh_mode {
797 if m != "cdc" && m != "periodic" && m != "explicit" {
798 panic!("refresh_mode must be cdc, periodic, or explicit, got {m:?}");
799 }
800 }
801 let mut op = Map::new();
802 op.insert("type".into(), Value::String("broadcastJoin".into()));
803 op.insert("joinKeyField".into(), Value::String(options.join_key_field));
804 if let Some(t) = options.streaming_topic {
805 op.insert("streamingTopic".into(), Value::String(t));
806 }
807 if let Some(n) = options.name {
808 op.insert("name".into(), Value::String(n));
809 }
810 if let Some(b) = options.max_bytes {
811 op.insert("maxBytes".into(), Value::Number(b.into()));
812 }
813 if let Some(m) = options.refresh_mode {
814 op.insert("refreshMode".into(), Value::String(m));
815 }
816 if let Some(i) = options.interval_millis {
817 op.insert("intervalMillis".into(), Value::Number(i.into()));
818 }
819 self.operators.push(op);
820 self
821 }
822
823 pub fn cdc_join(mut self, options: CdcJoinOptions) -> Self {
825 require_nonblank("source", &options.source);
826 let mut op = Map::new();
827 op.insert("type".into(), Value::String("cdcJoin".into()));
828 op.insert("source".into(), Value::String(options.source));
829 if let Some(k) = options.join_key {
830 op.insert("joinKey".into(), Value::String(k));
831 }
832 if let Some(t) = options.table {
833 op.insert("table".into(), Value::String(t));
834 }
835 if let Some(b) = options.state_backend {
836 op.insert("stateBackend".into(), Value::String(b));
837 }
838 self.operators.push(op);
839 self
840 }
841
842 pub fn to_topic(mut self, topic: impl Into<String>) -> Self {
848 let topic = topic.into();
849 require_nonblank("topic", &topic);
850 self.output_topic = Some(topic);
851 self.sink_channel = None;
852 self
853 }
854
855 pub fn to_topic_with_channel(
857 mut self,
858 topic: impl Into<String>,
859 channel: impl Into<String>,
860 ) -> Self {
861 let topic = topic.into();
862 let channel = channel.into();
863 require_nonblank("topic", &topic);
864 require_nonblank("channel", &channel);
865 self.output_topic = Some(topic);
866 self.sink_channel = Some(channel);
867 self
868 }
869
870 pub fn to_connector(self, connector_type: impl Into<String>) -> Self {
878 let ct = connector_type.into();
879 require_nonblank("connector_type", &ct);
880 let topic = format!("{ct}-sink-out");
881 self.to_topic_with_channel(topic, ct)
882 }
883
884 pub fn with_sink_config(mut self, key: impl Into<String>, value: Value) -> Self {
886 self.sink_config.insert(key.into(), value);
887 self
888 }
889
890 pub fn with_sink_label(mut self, label: impl Into<String>) -> Self {
892 self.sink_label = Some(label.into());
893 self
894 }
895
896 pub fn to_state(mut self) -> Self {
898 self.output_topic = None;
899 self.sink_channel = None;
900 self.sink_config = Map::new();
901 self.sink_label = None;
902 self
903 }
904
905 pub fn named(mut self, name: impl Into<String>) -> Self {
911 let name = name.into();
912 require_nonblank("name", &name);
913 self.name = Some(name);
914 self
915 }
916
917 pub fn described_as(mut self, description: impl Into<String>) -> Self {
919 self.description = Some(description.into());
920 self
921 }
922
923 pub fn with_agent_label(mut self, label: impl Into<String>) -> Self {
925 let label = label.into();
926 require_nonblank("label", &label);
927 self.agent_label = Some(label);
928 self
929 }
930
931 pub fn operators(&self) -> &[Map<String, Value>] {
937 &self.operators
938 }
939
940 pub fn build(&self) -> Result<Value, PulseError> {
942 self.build_inner(None)
943 }
944
945 pub fn build_with_name(&self, name: &str) -> Result<Value, PulseError> {
947 require_nonblank("name", name);
948 self.build_inner(Some(name.to_string()))
949 }
950
951 fn build_inner(&self, override_name: Option<String>) -> Result<Value, PulseError> {
952 let pipeline_name = override_name.or_else(|| self.name.clone()).ok_or_else(|| {
953 PulseError::InvalidConfig(
954 "pipeline name required — pass to StreamBuilder::new or build_with_name".into(),
955 )
956 })?;
957 let input_topic = self.input_topic.as_ref().ok_or_else(|| {
958 PulseError::InvalidConfig("no source — call .from_topic(...) before build()".into())
959 })?;
960 if self.operators.is_empty() {
961 return Err(PulseError::InvalidConfig(
962 "no operators — chain at least one of .filter/.map/.key_by/... before build()"
963 .into(),
964 ));
965 }
966
967 let source_engine = self.source_engine.as_deref().unwrap_or("kafka");
968
969 let mut nodes: Vec<Value> = Vec::with_capacity(3);
970
971 let mut src_config = Map::new();
973 src_config.insert("engine".into(), Value::String(source_engine.to_string()));
974 src_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
975 for (k, v) in &self.source_config {
976 src_config.insert(k.clone(), v.clone());
977 }
978 let src_label = self
979 .source_label
980 .clone()
981 .unwrap_or_else(|| format!("{source_engine} source"));
982 nodes.push(json!({
983 "type": "source",
984 "label": src_label,
985 "config": Value::Object(src_config),
986 }));
987
988 let mut agent_config = Map::new();
990 agent_config.insert("engine".into(), Value::String("streaming".into()));
991 agent_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
992 let ops_value: Vec<Value> = self
993 .operators
994 .iter()
995 .map(|op| Value::Object(op.clone()))
996 .collect();
997 agent_config.insert("operators".into(), Value::Array(ops_value));
998 if let Some(ref out) = self.output_topic {
999 agent_config.insert("outputTopic".into(), Value::String(out.clone()));
1000 }
1001 let agent_label = self
1002 .agent_label
1003 .clone()
1004 .unwrap_or_else(|| pipeline_name.clone());
1005 nodes.push(json!({
1006 "type": "agent",
1007 "label": agent_label,
1008 "config": Value::Object(agent_config),
1009 }));
1010
1011 if let (Some(out), Some(ch)) = (self.output_topic.as_ref(), self.sink_channel.as_ref()) {
1013 let mut sink_conf = Map::new();
1014 sink_conf.insert("channel".into(), Value::String(ch.clone()));
1015 sink_conf.insert("inputTopic".into(), Value::String(out.clone()));
1016 for (k, v) in &self.sink_config {
1017 sink_conf.insert(k.clone(), v.clone());
1018 }
1019 let sink_label = self
1020 .sink_label
1021 .clone()
1022 .unwrap_or_else(|| format!("{ch} sink"));
1023 nodes.push(json!({
1024 "type": "sink",
1025 "label": sink_label,
1026 "config": Value::Object(sink_conf),
1027 }));
1028 }
1029
1030 let mut pipeline = Map::new();
1031 pipeline.insert("name".into(), Value::String(pipeline_name));
1032 pipeline.insert("nodes".into(), Value::Array(nodes));
1033 if let Some(ref desc) = self.description {
1034 pipeline.insert("description".into(), Value::String(desc.clone()));
1035 }
1036 Ok(Value::Object(pipeline))
1037 }
1038}
1039
1040pub struct StreamsResource<'c> {
1049 pub(crate) client: &'c PulseClient,
1050}
1051
1052impl<'c> StreamsResource<'c> {
1053 pub fn compile(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
1055 builder.build()
1056 }
1057
1058 pub fn compile_with_name(
1060 &self,
1061 builder: &StreamBuilder,
1062 name: &str,
1063 ) -> Result<Value, PulseError> {
1064 builder.build_with_name(name)
1065 }
1066
1067 pub async fn deploy(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
1069 let definition = builder.build()?;
1070 self.client
1071 .request(
1072 Method::POST,
1073 "/api/pulse/pipelines",
1074 Some(&definition),
1075 true,
1076 )
1077 .await
1078 }
1079
1080 pub async fn deploy_with_name(
1082 &self,
1083 builder: &StreamBuilder,
1084 name: &str,
1085 ) -> Result<Value, PulseError> {
1086 let definition = builder.build_with_name(name)?;
1087 self.client
1088 .request(
1089 Method::POST,
1090 "/api/pulse/pipelines",
1091 Some(&definition),
1092 true,
1093 )
1094 .await
1095 }
1096}
1097
1098impl std::fmt::Debug for StreamsResource<'_> {
1099 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100 f.debug_struct("StreamsResource").finish()
1101 }
1102}
1103
1104fn require_nonblank(name: &str, value: &str) {
1109 if value.trim().is_empty() {
1110 panic!("{name} must be a non-empty string");
1111 }
1112}
1113
1114fn check_failure(on_failure: &Option<String>) {
1116 if let Some(f) = on_failure {
1117 if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
1118 panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
1119 }
1120 }
1121}