1use std::collections::BTreeMap;
41
42use serde::{Deserialize, Serialize};
43
44#[derive(Debug, thiserror::Error)]
50pub enum DirectiveError {
51 #[error("Configuration error: {0}")]
54 Configuration(String),
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum Control {
65 #[serde(rename = "dispatch.playbook")]
68 DispatchPlaybook,
69 #[serde(rename = "dispatch.execution_pool")]
72 DispatchExecutionPool,
73 Priority,
75 IdempotencyKey,
78 ContentType,
80 SchemaHint,
82}
83
84impl Control {
85 pub fn as_str(&self) -> &'static str {
87 match self {
88 Control::DispatchPlaybook => "dispatch.playbook",
89 Control::DispatchExecutionPool => "dispatch.execution_pool",
90 Control::Priority => "priority",
91 Control::IdempotencyKey => "idempotency_key",
92 Control::ContentType => "content_type",
93 Control::SchemaHint => "schema_hint",
94 }
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct DirectiveRule {
106 pub header: String,
108 pub controls: Control,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub allowed: Option<Vec<String>>,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub map: Option<BTreeMap<String, String>>,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
122#[serde(rename_all = "lowercase")]
123pub enum TracePropagation {
124 #[default]
126 None,
127 W3c,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, Default)]
133pub struct TraceConfig {
134 #[serde(default)]
136 pub propagate: TracePropagation,
137 #[serde(default)]
139 pub baggage_allowlist: Vec<String>,
140}
141
142impl TraceConfig {
143 fn is_enabled(&self) -> bool {
144 matches!(self.propagate, TracePropagation::W3c)
145 }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize, Default)]
154pub struct DirectiveSpec {
155 #[serde(default)]
159 pub normalize: bool,
160 #[serde(default)]
162 pub directives: Vec<DirectiveRule>,
163 #[serde(default)]
165 pub trace: TraceConfig,
166 #[serde(default = "default_passthrough")]
170 pub passthrough: String,
171}
172
173fn default_passthrough() -> String {
174 "data".to_string()
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
187pub struct TraceContext {
188 #[serde(skip_serializing_if = "Option::is_none")]
190 pub traceparent: Option<String>,
191 #[serde(skip_serializing_if = "Option::is_none")]
193 pub tracestate: Option<String>,
194 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
196 pub baggage: BTreeMap<String, String>,
197}
198
199impl TraceContext {
200 pub fn is_empty(&self) -> bool {
202 self.traceparent.is_none() && self.tracestate.is_none() && self.baggage.is_empty()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub struct AppliedDirective {
209 pub header: String,
211 pub controls: String,
213 pub effective_value: String,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
220pub struct DispatchPlan {
221 #[serde(skip_serializing_if = "Option::is_none")]
223 pub playbook_override: Option<String>,
224 #[serde(skip_serializing_if = "Option::is_none")]
227 pub execution_pool_override: Option<String>,
228 #[serde(skip_serializing_if = "Option::is_none")]
230 pub idempotency_key: Option<String>,
231 #[serde(skip_serializing_if = "Option::is_none")]
233 pub content_type: Option<String>,
234 #[serde(skip_serializing_if = "Option::is_none")]
236 pub schema_hint: Option<String>,
237 #[serde(skip_serializing_if = "Option::is_none")]
239 pub trace: Option<TraceContext>,
240 #[serde(default, skip_serializing_if = "Vec::is_empty")]
242 pub applied: Vec<AppliedDirective>,
243}
244
245impl DispatchPlan {
246 pub fn is_noop(&self) -> bool {
249 self.playbook_override.is_none()
250 && self.execution_pool_override.is_none()
251 && self.idempotency_key.is_none()
252 && self.content_type.is_none()
253 && self.schema_hint.is_none()
254 && self.trace.is_none()
255 && self.applied.is_empty()
256 }
257}
258
259impl DirectiveSpec {
264 pub fn parse(value: &serde_json::Value) -> Result<DirectiveSpec, DirectiveError> {
271 let mut spec: DirectiveSpec = serde_json::from_value(value.clone()).map_err(|e| {
272 DirectiveError::Configuration(format!("Invalid subscription 'headers' block: {e}"))
273 })?;
274
275 for rule in spec.directives.iter_mut() {
276 rule.header = rule.header.to_ascii_lowercase();
278
279 match rule.controls {
280 Control::DispatchPlaybook | Control::DispatchExecutionPool => {
281 let ok = rule.allowed.as_ref().map(|a| !a.is_empty()).unwrap_or(false);
282 if !ok {
283 return Err(DirectiveError::Configuration(format!(
284 "directive header '{}' controls '{}' but declares no non-empty \
285 'allowed:' value list — a routing directive must constrain its \
286 targets (RFC §7.5)",
287 rule.header,
288 rule.controls.as_str()
289 )));
290 }
291 }
292 Control::Priority => {
293 let ok = rule.map.as_ref().map(|m| !m.is_empty()).unwrap_or(false);
294 if !ok {
295 return Err(DirectiveError::Configuration(format!(
296 "directive header '{}' controls 'priority' but declares no non-empty \
297 'map:' (value → pool) — a priority directive must map to allowed \
298 pools (RFC §7.5)",
299 rule.header
300 )));
301 }
302 }
303 Control::IdempotencyKey | Control::ContentType | Control::SchemaHint => {}
304 }
305 }
306
307 Ok(spec)
308 }
309
310 pub fn resolve(&self, headers: &serde_json::Map<String, serde_json::Value>) -> DispatchPlan {
318 let mut plan = DispatchPlan::default();
319
320 for rule in &self.directives {
321 let Some(raw) = headers.get(&rule.header) else {
322 continue;
323 };
324 let Some(value) = last_value(raw) else {
325 continue;
326 };
327
328 match rule.controls {
329 Control::DispatchPlaybook => {
330 if value_allowed(rule.allowed.as_ref(), &value) {
331 plan.playbook_override = Some(value.clone());
332 plan.applied.push(applied(rule, &value));
333 }
334 }
335 Control::DispatchExecutionPool => {
336 if value_allowed(rule.allowed.as_ref(), &value) {
337 plan.execution_pool_override = Some(value.clone());
338 plan.applied.push(applied(rule, &value));
339 }
340 }
341 Control::Priority => {
342 if let Some(map) = rule.map.as_ref() {
343 if let Some(pool) = map.get(&value) {
344 if plan.execution_pool_override.is_none() {
347 plan.execution_pool_override = Some(pool.clone());
348 }
349 plan.applied.push(AppliedDirective {
350 header: rule.header.clone(),
351 controls: rule.controls.as_str().to_string(),
352 effective_value: pool.clone(),
353 });
354 }
355 }
356 }
357 Control::IdempotencyKey => {
358 plan.idempotency_key = Some(value.clone());
359 plan.applied.push(applied(rule, &value));
360 }
361 Control::ContentType => {
362 plan.content_type = Some(value.clone());
363 plan.applied.push(applied(rule, &value));
364 }
365 Control::SchemaHint => {
366 plan.schema_hint = Some(value.clone());
367 plan.applied.push(applied(rule, &value));
368 }
369 }
370 }
371
372 for rule in &self.directives {
376 if rule.controls == Control::DispatchExecutionPool {
377 if let Some(raw) = headers.get(&rule.header) {
378 if let Some(value) = last_value(raw) {
379 if value_allowed(rule.allowed.as_ref(), &value) {
380 plan.execution_pool_override = Some(value);
381 }
382 }
383 }
384 }
385 }
386
387 if self.trace.is_enabled() {
388 let trace = extract_w3c_trace(headers, &self.trace.baggage_allowlist);
389 if !trace.is_empty() {
390 plan.trace = Some(trace);
391 }
392 }
393
394 plan
395 }
396}
397
398fn last_value(raw: &serde_json::Value) -> Option<String> {
406 match raw {
407 serde_json::Value::String(s) => Some(s.clone()),
408 serde_json::Value::Array(arr) => arr
409 .iter()
410 .rev()
411 .find_map(|v| v.as_str().map(str::to_string)),
412 serde_json::Value::Number(n) => Some(n.to_string()),
413 serde_json::Value::Bool(b) => Some(b.to_string()),
414 _ => None,
415 }
416}
417
418fn value_allowed(allowed: Option<&Vec<String>>, value: &str) -> bool {
422 match allowed {
423 Some(list) => list.iter().any(|a| a == value),
424 None => true,
425 }
426}
427
428fn applied(rule: &DirectiveRule, value: &str) -> AppliedDirective {
429 AppliedDirective {
430 header: rule.header.clone(),
431 controls: rule.controls.as_str().to_string(),
432 effective_value: value.to_string(),
433 }
434}
435
436pub fn extract_w3c_trace(
444 headers: &serde_json::Map<String, serde_json::Value>,
445 baggage_allowlist: &[String],
446) -> TraceContext {
447 let mut tc = TraceContext::default();
448
449 if let Some(tp) = headers.get("traceparent").and_then(last_value_ref) {
450 if is_plausible_traceparent(&tp) {
451 tc.traceparent = Some(tp);
452 }
453 }
454 if let Some(ts) = headers.get("tracestate").and_then(last_value_ref) {
455 tc.tracestate = Some(ts);
456 }
457 if !baggage_allowlist.is_empty() {
458 if let Some(raw) = headers.get("baggage").and_then(last_value_ref) {
459 for item in raw.split(',') {
460 let item = item.trim();
461 if let Some((k, v)) = item.split_once('=') {
462 let key = k.trim();
463 let val = v.split(';').next().unwrap_or("").trim();
466 if baggage_allowlist.iter().any(|a| a == key) {
467 tc.baggage.insert(key.to_string(), val.to_string());
468 }
469 }
470 }
471 }
472 }
473
474 tc
475}
476
477fn last_value_ref(raw: &serde_json::Value) -> Option<String> {
478 last_value(raw)
479}
480
481fn is_plausible_traceparent(s: &str) -> bool {
486 let parts: Vec<&str> = s.split('-').collect();
487 parts.len() == 4
488 && parts[0].len() == 2
489 && parts[1].len() == 32
490 && parts[2].len() == 16
491 && parts[3].len() == 2
492 && parts.iter().all(|p| p.bytes().all(|b| b.is_ascii_hexdigit()))
493}
494
495pub fn normalize_http_headers(
500 raw: &[(String, String)],
501) -> serde_json::Map<String, serde_json::Value> {
502 let mut acc: BTreeMap<String, Vec<String>> = BTreeMap::new();
503 for (k, v) in raw {
504 acc.entry(k.to_ascii_lowercase()).or_default().push(v.clone());
505 }
506 let mut out = serde_json::Map::new();
507 for (k, mut vals) in acc {
508 let value = if vals.len() == 1 {
509 serde_json::Value::String(vals.pop().unwrap())
510 } else {
511 serde_json::Value::Array(vals.into_iter().map(serde_json::Value::String).collect())
512 };
513 out.insert(k, value);
514 }
515 out
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use serde_json::json;
522
523 fn headers(v: serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
524 v.as_object().unwrap().clone()
525 }
526
527 #[test]
528 fn empty_spec_is_noop() {
529 let spec = DirectiveSpec::default();
530 let plan = spec.resolve(&headers(json!({ "x-anything": "value" })));
531 assert!(plan.is_noop());
532 }
533
534 #[test]
535 fn parse_requires_allowed_for_routing() {
536 let err = DirectiveSpec::parse(&json!({
537 "directives": [{ "header": "x-route", "controls": "dispatch.playbook" }]
538 }))
539 .unwrap_err();
540 assert!(format!("{err}").contains("allowed"));
541
542 let err = DirectiveSpec::parse(&json!({
543 "directives": [{ "header": "x-pool", "controls": "dispatch.execution_pool", "allowed": [] }]
544 }))
545 .unwrap_err();
546 assert!(format!("{err}").contains("allowed"));
547
548 let err = DirectiveSpec::parse(&json!({
549 "directives": [{ "header": "x-prio", "controls": "priority" }]
550 }))
551 .unwrap_err();
552 assert!(format!("{err}").contains("map"));
553 }
554
555 #[test]
556 fn parse_lowercases_header_keys() {
557 let spec = DirectiveSpec::parse(&json!({
558 "directives": [{ "header": "X-Idempotency-Key", "controls": "idempotency_key" }]
559 }))
560 .unwrap();
561 assert_eq!(spec.directives[0].header, "x-idempotency-key");
562 }
563
564 #[test]
565 fn redirect_playbook_respects_allowlist() {
566 let spec = DirectiveSpec::parse(&json!({
567 "directives": [{
568 "header": "x-noetl-route",
569 "controls": "dispatch.playbook",
570 "allowed": ["domain/handle_billing", "domain/handle_fraud"]
571 }]
572 }))
573 .unwrap();
574
575 let plan = spec.resolve(&headers(json!({ "x-noetl-route": "domain/handle_fraud" })));
577 assert_eq!(plan.playbook_override.as_deref(), Some("domain/handle_fraud"));
578 assert_eq!(plan.applied.len(), 1);
579 assert_eq!(plan.applied[0].controls, "dispatch.playbook");
580
581 let plan = spec.resolve(&headers(json!({ "x-noetl-route": "domain/evil" })));
583 assert!(plan.playbook_override.is_none());
584 assert!(plan.applied.is_empty());
585 }
586
587 #[test]
588 fn execution_pool_override_and_priority_precedence() {
589 let spec = DirectiveSpec::parse(&json!({
590 "directives": [
591 { "header": "x-priority", "controls": "priority", "map": { "high": "priority", "normal": "shared" } },
592 { "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot", "priority", "shared"] }
593 ]
594 }))
595 .unwrap();
596
597 let plan = spec.resolve(&headers(json!({ "x-priority": "high" })));
599 assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
600
601 let plan = spec.resolve(&headers(json!({ "x-priority": "high", "x-noetl-pool": "iot" })));
603 assert_eq!(plan.execution_pool_override.as_deref(), Some("iot"));
604
605 let plan = spec.resolve(&headers(json!({ "x-priority": "bogus" })));
607 assert!(plan.execution_pool_override.is_none());
608 }
609
610 #[test]
611 fn idempotency_content_schema_are_free_values() {
612 let spec = DirectiveSpec::parse(&json!({
613 "directives": [
614 { "header": "x-idempotency-key", "controls": "idempotency_key" },
615 { "header": "content-type", "controls": "content_type" },
616 { "header": "x-schema", "controls": "schema_hint" }
617 ]
618 }))
619 .unwrap();
620 let plan = spec.resolve(&headers(json!({
621 "x-idempotency-key": "abc-123",
622 "content-type": "application/json",
623 "x-schema": "order.v2"
624 })));
625 assert_eq!(plan.idempotency_key.as_deref(), Some("abc-123"));
626 assert_eq!(plan.content_type.as_deref(), Some("application/json"));
627 assert_eq!(plan.schema_hint.as_deref(), Some("order.v2"));
628 assert_eq!(plan.applied.len(), 3);
629 }
630
631 #[test]
632 fn non_allowlisted_headers_are_data_only() {
633 let spec = DirectiveSpec::parse(&json!({
634 "directives": [{ "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot"] }]
635 }))
636 .unwrap();
637 let plan = spec.resolve(&headers(json!({ "x-evil-route": "domain/evil", "x-random": "data" })));
639 assert!(plan.is_noop());
640 }
641
642 #[test]
643 fn multi_value_header_is_last_wins() {
644 let spec = DirectiveSpec::parse(&json!({
645 "directives": [{ "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot", "priority"] }]
646 }))
647 .unwrap();
648 let plan = spec.resolve(&headers(json!({ "x-noetl-pool": ["iot", "priority"] })));
649 assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
650 }
651
652 #[test]
653 fn w3c_trace_extracted_when_enabled() {
654 let spec = DirectiveSpec::parse(&json!({
655 "trace": { "propagate": "w3c", "baggage_allowlist": ["tenant", "request_id"] }
656 }))
657 .unwrap();
658 let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
659 let plan = spec.resolve(&headers(json!({
660 "traceparent": tp,
661 "tracestate": "vendor=abc",
662 "baggage": "tenant=acme,request_id=r-9, secret=nope"
663 })));
664 let trace = plan.trace.unwrap();
665 assert_eq!(trace.traceparent.as_deref(), Some(tp));
666 assert_eq!(trace.tracestate.as_deref(), Some("vendor=abc"));
667 assert_eq!(trace.baggage.get("tenant").map(String::as_str), Some("acme"));
668 assert_eq!(trace.baggage.get("request_id").map(String::as_str), Some("r-9"));
669 assert!(!trace.baggage.contains_key("secret"));
671 }
672
673 #[test]
674 fn trace_disabled_by_default() {
675 let spec = DirectiveSpec::default();
676 let plan = spec.resolve(&headers(json!({
677 "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
678 })));
679 assert!(plan.trace.is_none());
680 }
681
682 #[test]
683 fn malformed_traceparent_dropped() {
684 let tc = extract_w3c_trace(&headers(json!({ "traceparent": "not-a-trace" })), &[]);
685 assert!(tc.traceparent.is_none());
686 assert!(tc.is_empty());
687 }
688
689 #[test]
690 fn redirect_example_from_rfc_7_3() {
691 let spec = DirectiveSpec::parse(&json!({
693 "directives": [
694 { "header": "x-noetl-route", "controls": "dispatch.playbook",
695 "allowed": ["domain/handle_billing", "domain/handle_fraud", "domain/handle_event"] },
696 { "header": "x-noetl-pool", "controls": "dispatch.execution_pool",
697 "allowed": ["priority", "shared"] }
698 ],
699 "trace": { "propagate": "w3c" }
700 }))
701 .unwrap();
702 let plan = spec.resolve(&headers(json!({
703 "x-noetl-route": "domain/handle_fraud",
704 "x-noetl-pool": "priority"
705 })));
706 assert_eq!(plan.playbook_override.as_deref(), Some("domain/handle_fraud"));
707 assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
708 assert_eq!(plan.applied.len(), 2);
709 }
710
711 #[test]
712 fn normalize_http_headers_lowercases_and_groups() {
713 let raw = vec![
714 ("X-Noetl-Route".to_string(), "domain/x".to_string()),
715 ("Accept".to_string(), "a".to_string()),
716 ("Accept".to_string(), "b".to_string()),
717 ];
718 let m = normalize_http_headers(&raw);
719 assert_eq!(m.get("x-noetl-route").unwrap(), "domain/x");
720 assert_eq!(m.get("accept").unwrap(), &json!(["a", "b"]));
721 }
722}