use std::collections::BTreeMap;
use reqwest::Method;
use serde_json::{json, Map, Value};
use crate::client::PulseClient;
use crate::error::PulseError;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WindowSpec {
spec: String,
}
impl WindowSpec {
pub fn new(spec: impl Into<String>) -> Self {
let spec = spec.into();
if spec.trim().is_empty() {
panic!("WindowSpec requires a non-empty spec string");
}
Self { spec }
}
pub fn spec(&self) -> &str {
&self.spec
}
}
impl std::fmt::Display for WindowSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.spec)
}
}
pub mod windows {
use super::WindowSpec;
pub fn tumbling(size: &str) -> WindowSpec {
require_nonblank("size", size);
WindowSpec::new(format!("tumbling({size})"))
}
pub fn sliding(size: &str, slide: &str) -> WindowSpec {
require_nonblank("size", size);
require_nonblank("slide", slide);
WindowSpec::new(format!("sliding({size},{slide})"))
}
pub fn session(timeout: &str) -> WindowSpec {
require_nonblank("timeout", timeout);
WindowSpec::new(format!("session({timeout})"))
}
pub fn global() -> WindowSpec {
WindowSpec::new("global")
}
pub fn count(n: u64) -> WindowSpec {
if n == 0 {
panic!("count window size must be positive, got 0");
}
WindowSpec::new(format!("count({n})"))
}
pub fn count_sliding(size: u64, slide: u64) -> WindowSpec {
if size == 0 || slide == 0 {
panic!("count_sliding requires positive size and slide, got {size}, {slide}");
}
WindowSpec::new(format!("count_sliding({size},{slide})"))
}
fn require_nonblank(name: &str, value: &str) {
if value.trim().is_empty() {
panic!("{name} must be a non-empty string");
}
}
}
pub mod aggs {
pub fn count() -> String {
"count()".into()
}
pub fn sum(field: &str) -> String {
require_nonblank("field", field);
format!("sum({field})")
}
pub fn avg(field: &str) -> String {
require_nonblank("field", field);
format!("avg({field})")
}
pub fn min(field: &str) -> String {
require_nonblank("field", field);
format!("min({field})")
}
pub fn max(field: &str) -> String {
require_nonblank("field", field);
format!("max({field})")
}
pub fn collect_list(field: &str) -> String {
require_nonblank("field", field);
format!("collect_list({field})")
}
pub fn distinct_count(field: &str) -> String {
require_nonblank("field", field);
format!("distinct_count({field})")
}
fn require_nonblank(name: &str, value: &str) {
if value.trim().is_empty() {
panic!("{name} must be a non-empty string");
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MapOptions {
pub fields: Option<BTreeMap<String, String>>,
pub target_type: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct WindowOptions {
pub aggregations: Option<BTreeMap<String, String>>,
pub output_topic: Option<String>,
pub trigger: Option<Value>,
}
#[derive(Debug, Clone)]
pub struct BranchSpec {
pub condition: String,
pub topic: String,
}
impl BranchSpec {
pub fn new(condition: impl Into<String>, topic: impl Into<String>) -> Self {
Self {
condition: condition.into(),
topic: topic.into(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EnrichAsyncOptions {
pub url: String,
pub parallelism: Option<u32>,
pub queue_size: Option<u32>,
pub timeout_ms: Option<u32>,
pub max_retries: Option<u32>,
pub retry_backoff_ms: Option<u32>,
pub ordering: Option<String>,
pub on_failure: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct CepOptions {
pub within: Option<String>,
pub name: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct BroadcastJoinOptions {
pub join_key_field: String,
pub streaming_topic: Option<String>,
pub name: Option<String>,
pub max_bytes: Option<i64>,
pub refresh_mode: Option<String>,
pub interval_millis: Option<u32>,
}
#[derive(Debug, Clone, Default)]
pub struct CdcJoinOptions {
pub source: String,
pub join_key: Option<String>,
pub table: Option<String>,
pub state_backend: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct StreamBuilder {
name: Option<String>,
description: Option<String>,
agent_label: Option<String>,
input_topic: Option<String>,
source_engine: Option<String>,
source_config: Map<String, Value>,
source_label: Option<String>,
output_topic: Option<String>,
sink_channel: Option<String>,
sink_config: Map<String, Value>,
sink_label: Option<String>,
operators: Vec<Map<String, Value>>,
}
impl StreamBuilder {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
require_nonblank("name", &name);
Self {
name: Some(name),
..Self::default()
}
}
pub fn anonymous() -> Self {
Self::default()
}
pub fn from_topic(mut self, topic: impl Into<String>) -> Self {
let topic = topic.into();
require_nonblank("topic", &topic);
self.input_topic = Some(topic);
self.source_engine = Some("kafka".into());
self
}
pub fn from_topic_with_engine(
mut self,
topic: impl Into<String>,
engine: impl Into<String>,
) -> Self {
let topic = topic.into();
let engine = engine.into();
require_nonblank("topic", &topic);
require_nonblank("engine", &engine);
self.input_topic = Some(topic);
self.source_engine = Some(engine);
self
}
pub fn with_source_config(mut self, key: impl Into<String>, value: Value) -> Self {
self.source_config.insert(key.into(), value);
self
}
pub fn with_source_label(mut self, label: impl Into<String>) -> Self {
self.source_label = Some(label.into());
self
}
pub fn filter(mut self, condition: impl Into<String>) -> Self {
let condition = condition.into();
require_nonblank("condition", &condition);
let mut op = Map::new();
op.insert("type".into(), Value::String("filter".into()));
op.insert("condition".into(), Value::String(condition));
self.operators.push(op);
self
}
pub fn map(mut self, options: MapOptions) -> Self {
if options.fields.is_none() && options.target_type.is_none() {
panic!("map operator does nothing — provide `fields` or `target_type`");
}
let mut op = Map::new();
op.insert("type".into(), Value::String("map".into()));
if let Some(fields) = options.fields {
let mut m = Map::new();
for (k, v) in fields {
m.insert(k, Value::String(v));
}
op.insert("fields".into(), Value::Object(m));
}
if let Some(t) = options.target_type {
op.insert("targetType".into(), Value::String(t));
}
self.operators.push(op);
self
}
pub fn flat_map(mut self, split_field: impl Into<String>) -> Self {
let split_field = split_field.into();
require_nonblank("split_field", &split_field);
let mut op = Map::new();
op.insert("type".into(), Value::String("flatMap".into()));
op.insert("splitField".into(), Value::String(split_field));
self.operators.push(op);
self
}
pub fn key_by(mut self, field: impl Into<String>) -> Self {
let field = field.into();
require_nonblank("field", &field);
let mut op = Map::new();
op.insert("type".into(), Value::String("keyBy".into()));
op.insert("field".into(), Value::String(field));
self.operators.push(op);
self
}
pub fn window(self, spec: WindowSpec) -> Self {
self.window_full(spec, WindowOptions::default())
}
pub fn window_with_aggs(
self,
spec: WindowSpec,
aggregations: BTreeMap<String, String>,
) -> Self {
self.window_full(
spec,
WindowOptions {
aggregations: Some(aggregations),
..Default::default()
},
)
}
pub fn window_full(mut self, spec: WindowSpec, options: WindowOptions) -> Self {
let mut op = Map::new();
op.insert("type".into(), Value::String("window".into()));
op.insert("spec".into(), Value::String(spec.spec.clone()));
if let Some(aggs_map) = options.aggregations {
let mut m = Map::new();
for (k, v) in aggs_map {
m.insert(k, Value::String(v));
}
op.insert("aggregations".into(), Value::Object(m));
}
if let Some(out) = options.output_topic {
op.insert("outputTopic".into(), Value::String(out));
}
if let Some(trig) = options.trigger {
op.insert("trigger".into(), trig);
}
self.operators.push(op);
self
}
pub fn window_from_str(mut self, spec: &str, options: WindowOptions) -> Self {
require_nonblank("spec", spec);
self = self.window_full(WindowSpec::new(spec), options);
self
}
pub fn branch(mut self, branches: Vec<BranchSpec>) -> Self {
if branches.is_empty() {
panic!("branch operator requires at least one branch");
}
let mut normalised = Vec::with_capacity(branches.len());
for (i, b) in branches.iter().enumerate() {
if b.condition.trim().is_empty() {
panic!("branch[{i}] requires a non-empty `condition`");
}
if b.topic.trim().is_empty() {
panic!("branch[{i}] requires a non-empty `topic`");
}
normalised.push(json!({
"condition": b.condition,
"topic": b.topic,
}));
}
let mut op = Map::new();
op.insert("type".into(), Value::String("branch".into()));
op.insert("branches".into(), Value::Array(normalised));
self.operators.push(op);
self
}
pub fn enrich(mut self, lookup_topic: impl Into<String>, key_field: impl Into<String>) -> Self {
let lookup_topic = lookup_topic.into();
let key_field = key_field.into();
require_nonblank("lookup_topic", &lookup_topic);
require_nonblank("key_field", &key_field);
let mut op = Map::new();
op.insert("type".into(), Value::String("enrich".into()));
op.insert("lookupTopic".into(), Value::String(lookup_topic));
op.insert("keyField".into(), Value::String(key_field));
self.operators.push(op);
self
}
pub fn enrich_async(mut self, options: EnrichAsyncOptions) -> Self {
require_nonblank("url", &options.url);
if let Some(ref o) = options.ordering {
if o != "PRESERVE_INPUT" && o != "UNORDERED" {
panic!("ordering must be PRESERVE_INPUT or UNORDERED, got {o:?}");
}
}
if let Some(ref f) = options.on_failure {
if f != "EMIT_ERROR" && f != "DROP" && f != "PASS_THROUGH" {
panic!("on_failure must be EMIT_ERROR, DROP, or PASS_THROUGH, got {f:?}");
}
}
let mut op = Map::new();
op.insert("type".into(), Value::String("enrichAsync".into()));
op.insert("url".into(), Value::String(options.url));
if let Some(v) = options.parallelism {
op.insert("parallelism".into(), Value::Number(v.into()));
}
if let Some(v) = options.queue_size {
op.insert("queueSize".into(), Value::Number(v.into()));
}
if let Some(v) = options.timeout_ms {
op.insert("timeoutMs".into(), Value::Number(v.into()));
}
if let Some(v) = options.max_retries {
op.insert("maxRetries".into(), Value::Number(v.into()));
}
if let Some(v) = options.retry_backoff_ms {
op.insert("retryBackoffMs".into(), Value::Number(v.into()));
}
if let Some(o) = options.ordering {
op.insert("ordering".into(), Value::String(o));
}
if let Some(f) = options.on_failure {
op.insert("onFailure".into(), Value::String(f));
}
self.operators.push(op);
self
}
pub fn cep(mut self, sequence: Vec<Value>, options: CepOptions) -> Self {
if sequence.is_empty() {
panic!("cep operator requires a non-empty sequence");
}
let mut op = Map::new();
op.insert("type".into(), Value::String("cep".into()));
op.insert("sequence".into(), Value::Array(sequence));
if let Some(w) = options.within {
op.insert("within".into(), Value::String(w));
}
if let Some(n) = options.name {
op.insert("name".into(), Value::String(n));
}
self.operators.push(op);
self
}
pub fn broadcast_join(mut self, options: BroadcastJoinOptions) -> Self {
require_nonblank("join_key_field", &options.join_key_field);
if let Some(ref m) = options.refresh_mode {
if m != "cdc" && m != "periodic" && m != "explicit" {
panic!("refresh_mode must be cdc, periodic, or explicit, got {m:?}");
}
}
let mut op = Map::new();
op.insert("type".into(), Value::String("broadcastJoin".into()));
op.insert("joinKeyField".into(), Value::String(options.join_key_field));
if let Some(t) = options.streaming_topic {
op.insert("streamingTopic".into(), Value::String(t));
}
if let Some(n) = options.name {
op.insert("name".into(), Value::String(n));
}
if let Some(b) = options.max_bytes {
op.insert("maxBytes".into(), Value::Number(b.into()));
}
if let Some(m) = options.refresh_mode {
op.insert("refreshMode".into(), Value::String(m));
}
if let Some(i) = options.interval_millis {
op.insert("intervalMillis".into(), Value::Number(i.into()));
}
self.operators.push(op);
self
}
pub fn cdc_join(mut self, options: CdcJoinOptions) -> Self {
require_nonblank("source", &options.source);
let mut op = Map::new();
op.insert("type".into(), Value::String("cdcJoin".into()));
op.insert("source".into(), Value::String(options.source));
if let Some(k) = options.join_key {
op.insert("joinKey".into(), Value::String(k));
}
if let Some(t) = options.table {
op.insert("table".into(), Value::String(t));
}
if let Some(b) = options.state_backend {
op.insert("stateBackend".into(), Value::String(b));
}
self.operators.push(op);
self
}
pub fn to_topic(mut self, topic: impl Into<String>) -> Self {
let topic = topic.into();
require_nonblank("topic", &topic);
self.output_topic = Some(topic);
self.sink_channel = None;
self
}
pub fn to_topic_with_channel(
mut self,
topic: impl Into<String>,
channel: impl Into<String>,
) -> Self {
let topic = topic.into();
let channel = channel.into();
require_nonblank("topic", &topic);
require_nonblank("channel", &channel);
self.output_topic = Some(topic);
self.sink_channel = Some(channel);
self
}
pub fn with_sink_config(mut self, key: impl Into<String>, value: Value) -> Self {
self.sink_config.insert(key.into(), value);
self
}
pub fn with_sink_label(mut self, label: impl Into<String>) -> Self {
self.sink_label = Some(label.into());
self
}
pub fn to_state(mut self) -> Self {
self.output_topic = None;
self.sink_channel = None;
self.sink_config = Map::new();
self.sink_label = None;
self
}
pub fn named(mut self, name: impl Into<String>) -> Self {
let name = name.into();
require_nonblank("name", &name);
self.name = Some(name);
self
}
pub fn described_as(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_agent_label(mut self, label: impl Into<String>) -> Self {
let label = label.into();
require_nonblank("label", &label);
self.agent_label = Some(label);
self
}
pub fn operators(&self) -> &[Map<String, Value>] {
&self.operators
}
pub fn build(&self) -> Result<Value, PulseError> {
self.build_inner(None)
}
pub fn build_with_name(&self, name: &str) -> Result<Value, PulseError> {
require_nonblank("name", name);
self.build_inner(Some(name.to_string()))
}
fn build_inner(&self, override_name: Option<String>) -> Result<Value, PulseError> {
let pipeline_name = override_name.or_else(|| self.name.clone()).ok_or_else(|| {
PulseError::InvalidConfig(
"pipeline name required — pass to StreamBuilder::new or build_with_name".into(),
)
})?;
let input_topic = self.input_topic.as_ref().ok_or_else(|| {
PulseError::InvalidConfig("no source — call .from_topic(...) before build()".into())
})?;
if self.operators.is_empty() {
return Err(PulseError::InvalidConfig(
"no operators — chain at least one of .filter/.map/.key_by/... before build()"
.into(),
));
}
let source_engine = self.source_engine.as_deref().unwrap_or("kafka");
let mut nodes: Vec<Value> = Vec::with_capacity(3);
let mut src_config = Map::new();
src_config.insert("engine".into(), Value::String(source_engine.to_string()));
src_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
for (k, v) in &self.source_config {
src_config.insert(k.clone(), v.clone());
}
let src_label = self
.source_label
.clone()
.unwrap_or_else(|| format!("{source_engine} source"));
nodes.push(json!({
"type": "source",
"label": src_label,
"config": Value::Object(src_config),
}));
let mut agent_config = Map::new();
agent_config.insert("engine".into(), Value::String("streaming".into()));
agent_config.insert("inputTopic".into(), Value::String(input_topic.clone()));
let ops_value: Vec<Value> = self
.operators
.iter()
.map(|op| Value::Object(op.clone()))
.collect();
agent_config.insert("operators".into(), Value::Array(ops_value));
if let Some(ref out) = self.output_topic {
agent_config.insert("outputTopic".into(), Value::String(out.clone()));
}
let agent_label = self
.agent_label
.clone()
.unwrap_or_else(|| pipeline_name.clone());
nodes.push(json!({
"type": "agent",
"label": agent_label,
"config": Value::Object(agent_config),
}));
if let (Some(out), Some(ch)) = (self.output_topic.as_ref(), self.sink_channel.as_ref()) {
let mut sink_conf = Map::new();
sink_conf.insert("channel".into(), Value::String(ch.clone()));
sink_conf.insert("inputTopic".into(), Value::String(out.clone()));
for (k, v) in &self.sink_config {
sink_conf.insert(k.clone(), v.clone());
}
let sink_label = self
.sink_label
.clone()
.unwrap_or_else(|| format!("{ch} sink"));
nodes.push(json!({
"type": "sink",
"label": sink_label,
"config": Value::Object(sink_conf),
}));
}
let mut pipeline = Map::new();
pipeline.insert("name".into(), Value::String(pipeline_name));
pipeline.insert("nodes".into(), Value::Array(nodes));
if let Some(ref desc) = self.description {
pipeline.insert("description".into(), Value::String(desc.clone()));
}
Ok(Value::Object(pipeline))
}
}
pub struct StreamsResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl<'c> StreamsResource<'c> {
pub fn compile(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
builder.build()
}
pub fn compile_with_name(
&self,
builder: &StreamBuilder,
name: &str,
) -> Result<Value, PulseError> {
builder.build_with_name(name)
}
pub async fn deploy(&self, builder: &StreamBuilder) -> Result<Value, PulseError> {
let definition = builder.build()?;
self.client
.request(
Method::POST,
"/api/pulse/pipelines",
Some(&definition),
true,
)
.await
}
pub async fn deploy_with_name(
&self,
builder: &StreamBuilder,
name: &str,
) -> Result<Value, PulseError> {
let definition = builder.build_with_name(name)?;
self.client
.request(
Method::POST,
"/api/pulse/pipelines",
Some(&definition),
true,
)
.await
}
}
impl std::fmt::Debug for StreamsResource<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamsResource").finish()
}
}
fn require_nonblank(name: &str, value: &str) {
if value.trim().is_empty() {
panic!("{name} must be a non-empty string");
}
}