1use std::collections::BTreeMap;
49
50use reqwest::Method;
51use serde_json::{json, Map, Value};
52
53use crate::client::PulseClient;
54use crate::error::PulseError;
55
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct WindowSpec {
66 spec: String,
67}
68
69impl WindowSpec {
70 pub fn new(spec: impl Into<String>) -> Self {
72 let spec = spec.into();
73 if spec.trim().is_empty() {
74 panic!("WindowSpec requires a non-empty spec string");
75 }
76 Self { spec }
77 }
78
79 pub fn spec(&self) -> &str {
81 &self.spec
82 }
83}
84
85impl std::fmt::Display for WindowSpec {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.write_str(&self.spec)
88 }
89}
90
91pub mod windows {
96 use super::WindowSpec;
97
98 pub fn tumbling(size: &str) -> WindowSpec {
100 require_nonblank("size", size);
101 WindowSpec::new(format!("tumbling({size})"))
102 }
103
104 pub fn sliding(size: &str, slide: &str) -> WindowSpec {
106 require_nonblank("size", size);
107 require_nonblank("slide", slide);
108 WindowSpec::new(format!("sliding({size},{slide})"))
109 }
110
111 pub fn session(timeout: &str) -> WindowSpec {
113 require_nonblank("timeout", timeout);
114 WindowSpec::new(format!("session({timeout})"))
115 }
116
117 pub fn global() -> WindowSpec {
119 WindowSpec::new("global")
120 }
121
122 pub fn count(n: u64) -> WindowSpec {
124 if n == 0 {
125 panic!("count window size must be positive, got 0");
126 }
127 WindowSpec::new(format!("count({n})"))
128 }
129
130 pub fn count_sliding(size: u64, slide: u64) -> WindowSpec {
132 if size == 0 || slide == 0 {
133 panic!("count_sliding requires positive size and slide, got {size}, {slide}");
134 }
135 WindowSpec::new(format!("count_sliding({size},{slide})"))
136 }
137
138 fn require_nonblank(name: &str, value: &str) {
139 if value.trim().is_empty() {
140 panic!("{name} must be a non-empty string");
141 }
142 }
143}
144
145pub mod aggs {
154 pub fn count() -> String {
156 "count()".into()
157 }
158
159 pub fn sum(field: &str) -> String {
161 require_nonblank("field", field);
162 format!("sum({field})")
163 }
164
165 pub fn avg(field: &str) -> String {
167 require_nonblank("field", field);
168 format!("avg({field})")
169 }
170
171 pub fn min(field: &str) -> String {
173 require_nonblank("field", field);
174 format!("min({field})")
175 }
176
177 pub fn max(field: &str) -> String {
179 require_nonblank("field", field);
180 format!("max({field})")
181 }
182
183 pub fn collect_list(field: &str) -> String {
185 require_nonblank("field", field);
186 format!("collect_list({field})")
187 }
188
189 pub fn distinct_count(field: &str) -> String {
191 require_nonblank("field", field);
192 format!("distinct_count({field})")
193 }
194
195 fn require_nonblank(name: &str, value: &str) {
196 if value.trim().is_empty() {
197 panic!("{name} must be a non-empty string");
198 }
199 }
200}
201
202#[derive(Debug, Clone, Default)]
208pub struct MapOptions {
209 pub fields: Option<BTreeMap<String, String>>,
211 pub target_type: Option<String>,
213}
214
215#[derive(Debug, Clone, Default)]
217pub struct WindowOptions {
218 pub aggregations: Option<BTreeMap<String, String>>,
220 pub output_topic: Option<String>,
222 pub trigger: Option<Value>,
224}
225
226#[derive(Debug, Clone)]
228pub struct BranchSpec {
229 pub condition: String,
230 pub topic: String,
231}
232
233impl BranchSpec {
234 pub fn new(condition: impl Into<String>, topic: impl Into<String>) -> Self {
235 Self {
236 condition: condition.into(),
237 topic: topic.into(),
238 }
239 }
240}
241
242#[derive(Debug, Clone, Default)]
244pub struct EnrichAsyncOptions {
245 pub url: String,
246 pub parallelism: Option<u32>,
247 pub queue_size: Option<u32>,
248 pub timeout_ms: Option<u32>,
249 pub max_retries: Option<u32>,
250 pub retry_backoff_ms: Option<u32>,
251 pub ordering: Option<String>,
253 pub on_failure: Option<String>,
255}
256
257#[derive(Debug, Clone, Default)]
259pub struct CepOptions {
260 pub within: Option<String>,
261 pub name: Option<String>,
262}
263
264#[derive(Debug, Clone, Default)]
266pub struct BroadcastJoinOptions {
267 pub join_key_field: String,
268 pub streaming_topic: Option<String>,
269 pub name: Option<String>,
270 pub max_bytes: Option<i64>,
271 pub refresh_mode: Option<String>,
273 pub interval_millis: Option<u32>,
274}
275
276#[derive(Debug, Clone, Default)]
278pub struct CdcJoinOptions {
279 pub source: String,
280 pub join_key: Option<String>,
281 pub table: Option<String>,
282 pub state_backend: Option<String>,
283}
284
285#[derive(Debug, Clone, Default)]
299pub struct StreamBuilder {
300 name: Option<String>,
301 description: Option<String>,
302 agent_label: Option<String>,
303 input_topic: Option<String>,
304 source_engine: Option<String>,
305 source_config: Map<String, Value>,
306 source_label: Option<String>,
307 output_topic: Option<String>,
308 sink_channel: Option<String>,
309 sink_config: Map<String, Value>,
310 sink_label: Option<String>,
311 operators: Vec<Map<String, Value>>,
312}
313
314impl StreamBuilder {
315 pub fn new(name: impl Into<String>) -> Self {
317 let name = name.into();
318 require_nonblank("name", &name);
319 Self {
320 name: Some(name),
321 ..Self::default()
322 }
323 }
324
325 pub fn anonymous() -> Self {
328 Self::default()
329 }
330
331 pub fn from_topic(mut self, topic: impl Into<String>) -> Self {
337 let topic = topic.into();
338 require_nonblank("topic", &topic);
339 self.input_topic = Some(topic);
340 self.source_engine = Some("kafka".into());
341 self
342 }
343
344 pub fn from_topic_with_engine(
346 mut self,
347 topic: impl Into<String>,
348 engine: impl Into<String>,
349 ) -> Self {
350 let topic = topic.into();
351 let engine = engine.into();
352 require_nonblank("topic", &topic);
353 require_nonblank("engine", &engine);
354 self.input_topic = Some(topic);
355 self.source_engine = Some(engine);
356 self
357 }
358
359 pub fn with_source_config(mut self, key: impl Into<String>, value: Value) -> Self {
361 self.source_config.insert(key.into(), value);
362 self
363 }
364
365 pub fn with_source_label(mut self, label: impl Into<String>) -> Self {
367 self.source_label = Some(label.into());
368 self
369 }
370
371 pub fn filter(mut self, condition: impl Into<String>) -> Self {
377 let condition = condition.into();
378 require_nonblank("condition", &condition);
379 let mut op = Map::new();
380 op.insert("type".into(), Value::String("filter".into()));
381 op.insert("condition".into(), Value::String(condition));
382 self.operators.push(op);
383 self
384 }
385
386 pub fn map(mut self, options: MapOptions) -> Self {
388 if options.fields.is_none() && options.target_type.is_none() {
389 panic!("map operator does nothing — provide `fields` or `target_type`");
390 }
391 let mut op = Map::new();
392 op.insert("type".into(), Value::String("map".into()));
393 if let Some(fields) = options.fields {
394 let mut m = Map::new();
395 for (k, v) in fields {
396 m.insert(k, Value::String(v));
397 }
398 op.insert("fields".into(), Value::Object(m));
399 }
400 if let Some(t) = options.target_type {
401 op.insert("targetType".into(), Value::String(t));
402 }
403 self.operators.push(op);
404 self
405 }
406
407 pub fn flat_map(mut self, split_field: impl Into<String>) -> Self {
409 let split_field = split_field.into();
410 require_nonblank("split_field", &split_field);
411 let mut op = Map::new();
412 op.insert("type".into(), Value::String("flatMap".into()));
413 op.insert("splitField".into(), Value::String(split_field));
414 self.operators.push(op);
415 self
416 }
417
418 pub fn key_by(mut self, field: impl Into<String>) -> Self {
420 let field = field.into();
421 require_nonblank("field", &field);
422 let mut op = Map::new();
423 op.insert("type".into(), Value::String("keyBy".into()));
424 op.insert("field".into(), Value::String(field));
425 self.operators.push(op);
426 self
427 }
428
429 pub fn window(self, spec: WindowSpec) -> Self {
431 self.window_full(spec, WindowOptions::default())
432 }
433
434 pub fn window_with_aggs(
436 self,
437 spec: WindowSpec,
438 aggregations: BTreeMap<String, String>,
439 ) -> Self {
440 self.window_full(
441 spec,
442 WindowOptions {
443 aggregations: Some(aggregations),
444 ..Default::default()
445 },
446 )
447 }
448
449 pub fn window_full(mut self, spec: WindowSpec, options: WindowOptions) -> Self {
451 let mut op = Map::new();
452 op.insert("type".into(), Value::String("window".into()));
453 op.insert("spec".into(), Value::String(spec.spec.clone()));
454 if let Some(aggs_map) = options.aggregations {
455 let mut m = Map::new();
456 for (k, v) in aggs_map {
457 m.insert(k, Value::String(v));
458 }
459 op.insert("aggregations".into(), Value::Object(m));
460 }
461 if let Some(out) = options.output_topic {
462 op.insert("outputTopic".into(), Value::String(out));
463 }
464 if let Some(trig) = options.trigger {
465 op.insert("trigger".into(), trig);
466 }
467 self.operators.push(op);
468 self
469 }
470
471 pub fn window_from_str(mut self, spec: &str, options: WindowOptions) -> Self {
474 require_nonblank("spec", spec);
475 self = self.window_full(WindowSpec::new(spec), options);
476 self
477 }
478
479 pub fn branch(mut self, branches: Vec<BranchSpec>) -> Self {
481 if branches.is_empty() {
482 panic!("branch operator requires at least one branch");
483 }
484 let mut normalised = Vec::with_capacity(branches.len());
485 for (i, b) in branches.iter().enumerate() {
486 if b.condition.trim().is_empty() {
487 panic!("branch[{i}] requires a non-empty `condition`");
488 }
489 if b.topic.trim().is_empty() {
490 panic!("branch[{i}] requires a non-empty `topic`");
491 }
492 normalised.push(json!({
493 "condition": b.condition,
494 "topic": b.topic,
495 }));
496 }
497 let mut op = Map::new();
498 op.insert("type".into(), Value::String("branch".into()));
499 op.insert("branches".into(), Value::Array(normalised));
500 self.operators.push(op);
501 self
502 }
503
504 pub fn enrich(mut self, lookup_topic: impl Into<String>, key_field: impl Into<String>) -> Self {
506 let lookup_topic = lookup_topic.into();
507 let key_field = key_field.into();
508 require_nonblank("lookup_topic", &lookup_topic);
509 require_nonblank("key_field", &key_field);
510 let mut op = Map::new();
511 op.insert("type".into(), Value::String("enrich".into()));
512 op.insert("lookupTopic".into(), Value::String(lookup_topic));
513 op.insert("keyField".into(), Value::String(key_field));
514 self.operators.push(op);
515 self
516 }
517
518 pub fn enrich_async(mut self, options: EnrichAsyncOptions) -> Self {
520 require_nonblank("url", &options.url);
521 if let Some(ref o) = options.ordering {
522 if o != "PRESERVE_INPUT" && o != "UNORDERED" {
523 panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
524 }
525 }
526 if let Some(ref f) = options.on_failure {
527 if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
528 panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
529 }
530 }
531 let mut op = Map::new();
532 op.insert("type".into(), Value::String("enrichAsync".into()));
533 op.insert("url".into(), Value::String(options.url));
534 if let Some(v) = options.parallelism {
535 op.insert("parallelism".into(), Value::Number(v.into()));
536 }
537 if let Some(v) = options.queue_size {
538 op.insert("queueSize".into(), Value::Number(v.into()));
539 }
540 if let Some(v) = options.timeout_ms {
541 op.insert("timeoutMs".into(), Value::Number(v.into()));
542 }
543 if let Some(v) = options.max_retries {
544 op.insert("maxRetries".into(), Value::Number(v.into()));
545 }
546 if let Some(v) = options.retry_backoff_ms {
547 op.insert("retryBackoffMs".into(), Value::Number(v.into()));
548 }
549 if let Some(o) = options.ordering {
550 op.insert("ordering".into(), Value::String(o));
551 }
552 if let Some(f) = options.on_failure {
553 op.insert("onFailure".into(), Value::String(f));
554 }
555 self.operators.push(op);
556 self
557 }
558
559 pub fn cep(mut self, sequence: Vec<Value>, options: CepOptions) -> Self {
561 if sequence.is_empty() {
562 panic!("cep operator requires a non-empty sequence");
563 }
564 let mut op = Map::new();
565 op.insert("type".into(), Value::String("cep".into()));
566 op.insert("sequence".into(), Value::Array(sequence));
567 if let Some(w) = options.within {
568 op.insert("within".into(), Value::String(w));
569 }
570 if let Some(n) = options.name {
571 op.insert("name".into(), Value::String(n));
572 }
573 self.operators.push(op);
574 self
575 }
576
577 pub fn broadcast_join(mut self, options: BroadcastJoinOptions) -> Self {
579 require_nonblank("join_key_field", &options.join_key_field);
580 if let Some(ref m) = options.refresh_mode {
581 if m != "cdc" && m != "periodic" && m != "explicit" {
582 panic!("refresh_mode must be cdc, periodic, or explicit, got {m:?}");
583 }
584 }
585 let mut op = Map::new();
586 op.insert("type".into(), Value::String("broadcastJoin".into()));
587 op.insert("joinKeyField".into(), Value::String(options.join_key_field));
588 if let Some(t) = options.streaming_topic {
589 op.insert("streamingTopic".into(), Value::String(t));
590 }
591 if let Some(n) = options.name {
592 op.insert("name".into(), Value::String(n));
593 }
594 if let Some(b) = options.max_bytes {
595 op.insert("maxBytes".into(), Value::Number(b.into()));
596 }
597 if let Some(m) = options.refresh_mode {
598 op.insert("refreshMode".into(), Value::String(m));
599 }
600 if let Some(i) = options.interval_millis {
601 op.insert("intervalMillis".into(), Value::Number(i.into()));
602 }
603 self.operators.push(op);
604 self
605 }
606
607 pub fn cdc_join(mut self, options: CdcJoinOptions) -> Self {
609 require_nonblank("source", &options.source);
610 let mut op = Map::new();
611 op.insert("type".into(), Value::String("cdcJoin".into()));
612 op.insert("source".into(), Value::String(options.source));
613 if let Some(k) = options.join_key {
614 op.insert("joinKey".into(), Value::String(k));
615 }
616 if let Some(t) = options.table {
617 op.insert("table".into(), Value::String(t));
618 }
619 if let Some(b) = options.state_backend {
620 op.insert("stateBackend".into(), Value::String(b));
621 }
622 self.operators.push(op);
623 self
624 }
625
626 pub fn to_topic(mut self, topic: impl Into<String>) -> Self {
632 let topic = topic.into();
633 require_nonblank("topic", &topic);
634 self.output_topic = Some(topic);
635 self.sink_channel = None;
636 self
637 }
638
639 pub fn to_topic_with_channel(
641 mut self,
642 topic: impl Into<String>,
643 channel: impl Into<String>,
644 ) -> Self {
645 let topic = topic.into();
646 let channel = channel.into();
647 require_nonblank("topic", &topic);
648 require_nonblank("channel", &channel);
649 self.output_topic = Some(topic);
650 self.sink_channel = Some(channel);
651 self
652 }
653
654 pub fn with_sink_config(mut self, key: impl Into<String>, value: Value) -> Self {
656 self.sink_config.insert(key.into(), value);
657 self
658 }
659
660 pub fn with_sink_label(mut self, label: impl Into<String>) -> Self {
662 self.sink_label = Some(label.into());
663 self
664 }
665
666 pub fn to_state(mut self) -> Self {
668 self.output_topic = None;
669 self.sink_channel = None;
670 self.sink_config = Map::new();
671 self.sink_label = None;
672 self
673 }
674
675 pub fn named(mut self, name: impl Into<String>) -> Self {
681 let name = name.into();
682 require_nonblank("name", &name);
683 self.name = Some(name);
684 self
685 }
686
687 pub fn described_as(mut self, description: impl Into<String>) -> Self {
689 self.description = Some(description.into());
690 self
691 }
692
693 pub fn with_agent_label(mut self, label: impl Into<String>) -> Self {
695 let label = label.into();
696 require_nonblank("label", &label);
697 self.agent_label = Some(label);
698 self
699 }
700
701 pub fn operators(&self) -> &[Map<String, Value>] {
707 &self.operators
708 }
709
710 pub fn build(&self) -> Result<Value, PulseError> {
712 self.build_inner(None)
713 }
714
715 pub fn build_with_name(&self, name: &str) -> Result<Value, PulseError> {
717 require_nonblank("name", name);
718 self.build_inner(Some(name.to_string()))
719 }
720
721 fn build_inner(&self, override_name: Option<String>) -> Result<Value, PulseError> {
722 let pipeline_name = override_name.or_else(|| self.name.clone()).ok_or_else(|| {
723 PulseError::InvalidConfig(
724 "pipeline name required — pass to StreamBuilder::new or build_with_name".into(),
725 )
726 })?;
727 let input_topic = self.input_topic.as_ref().ok_or_else(|| {
728 PulseError::InvalidConfig("no source — call .from_topic(...) before build()".into())
729 })?;
730 if self.operators.is_empty() {
731 return Err(PulseError::InvalidConfig(
732 "no operators — chain at least one of .filter/.map/.key_by/... before build()"
733 .into(),
734 ));
735 }
736
737 let source_engine = self.source_engine.as_deref().unwrap_or("kafka");
738
739 let mut nodes: Vec<Value> = Vec::with_capacity(3);
740
741 let mut src_config = Map::new();
743 src_config.insert("engine".into(), Value::String(source_engine.to_string()));
744 src_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
745 for (k, v) in &self.source_config {
746 src_config.insert(k.clone(), v.clone());
747 }
748 let src_label = self
749 .source_label
750 .clone()
751 .unwrap_or_else(|| format!("{source_engine} source"));
752 nodes.push(json!({
753 "type": "source",
754 "label": src_label,
755 "config": Value::Object(src_config),
756 }));
757
758 let mut agent_config = Map::new();
760 agent_config.insert("engine".into(), Value::String("streaming".into()));
761 agent_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
762 let ops_value: Vec<Value> = self
763 .operators
764 .iter()
765 .map(|op| Value::Object(op.clone()))
766 .collect();
767 agent_config.insert("operators".into(), Value::Array(ops_value));
768 if let Some(ref out) = self.output_topic {
769 agent_config.insert("outputTopic".into(), Value::String(out.clone()));
770 }
771 let agent_label = self
772 .agent_label
773 .clone()
774 .unwrap_or_else(|| pipeline_name.clone());
775 nodes.push(json!({
776 "type": "agent",
777 "label": agent_label,
778 "config": Value::Object(agent_config),
779 }));
780
781 if let (Some(out), Some(ch)) = (self.output_topic.as_ref(), self.sink_channel.as_ref()) {
783 let mut sink_conf = Map::new();
784 sink_conf.insert("channel".into(), Value::String(ch.clone()));
785 sink_conf.insert("inputTopic".into(), Value::String(out.clone()));
786 for (k, v) in &self.sink_config {
787 sink_conf.insert(k.clone(), v.clone());
788 }
789 let sink_label = self
790 .sink_label
791 .clone()
792 .unwrap_or_else(|| format!("{ch} sink"));
793 nodes.push(json!({
794 "type": "sink",
795 "label": sink_label,
796 "config": Value::Object(sink_conf),
797 }));
798 }
799
800 let mut pipeline = Map::new();
801 pipeline.insert("name".into(), Value::String(pipeline_name));
802 pipeline.insert("nodes".into(), Value::Array(nodes));
803 if let Some(ref desc) = self.description {
804 pipeline.insert("description".into(), Value::String(desc.clone()));
805 }
806 Ok(Value::Object(pipeline))
807 }
808}
809
810pub struct StreamsResource<'c> {
819 pub(crate) client: &'c PulseClient,
820}
821
822impl<'c> StreamsResource<'c> {
823 pub fn compile(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
825 builder.build()
826 }
827
828 pub fn compile_with_name(
830 &self,
831 builder: &StreamBuilder,
832 name: &str,
833 ) -> Result<Value, PulseError> {
834 builder.build_with_name(name)
835 }
836
837 pub async fn deploy(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
839 let definition = builder.build()?;
840 self.client
841 .request(
842 Method::POST,
843 "/api/pulse/pipelines",
844 Some(&definition),
845 true,
846 )
847 .await
848 }
849
850 pub async fn deploy_with_name(
852 &self,
853 builder: &StreamBuilder,
854 name: &str,
855 ) -> Result<Value, PulseError> {
856 let definition = builder.build_with_name(name)?;
857 self.client
858 .request(
859 Method::POST,
860 "/api/pulse/pipelines",
861 Some(&definition),
862 true,
863 )
864 .await
865 }
866}
867
868impl std::fmt::Debug for StreamsResource<'_> {
869 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
870 f.debug_struct("StreamsResource").finish()
871 }
872}
873
874fn require_nonblank(name: &str, value: &str) {
879 if value.trim().is_empty() {
880 panic!("{name} must be a non-empty string");
881 }
882}