Skip to main content

noetl_directives/
lib.rs

1//! Header / attribute directive engine — the shared, lean implementation.
2//!
3//! Phase 2/3 of the subscription/listener RFC
4//! ([noetl/ai-meta#90](https://github.com/noetl/ai-meta/issues/90), RFC §7);
5//! extracted into a standalone crate by
6//! [noetl/ai-meta#92](https://github.com/noetl/ai-meta/issues/92) so the
7//! security-sensitive directive allowlist has ONE home consumed by both the
8//! `noetl-tools` worker/tools runtime AND the internet-facing `noetl-gateway`
9//! (which must not pull `duckdb`/`kube`/`tokio`). It was previously a serde-only
10//! port vendored into the gateway — the drift risk that motivated this crate.
11//!
12//! ### What this is
13//!
14//! Every message source carries a metadata channel alongside the payload —
15//! Pub/Sub **attributes**, Kafka/NATS **headers**, HTTP **headers** for
16//! webhook/push ingress — normalized into one uniform `headers` map
17//! (lowercased keys, RFC §7.1). This module turns selected, **allowlisted**
18//! headers into *instructions* that influence how a message is dispatched:
19//! redirect to a different target playbook, route to a different worker pool /
20//! command segment, supply an idempotency key, hint the content type, and
21//! carry a W3C distributed-trace context into the execution.
22//!
23//! ### Untrusted by default (RFC §7.5)
24//!
25//! Nothing here trusts an arbitrary inbound header. A header acts as a
26//! directive **only** if its key appears in the configured
27//! [`DirectiveSpec::directives`] allowlist, and even then a value allowlist
28//! (`allowed:` / `map:`) constrains what target it may select. A header not in
29//! the allowlist is data — it stays in `message.headers` and can never drive
30//! routing. (Auth-gated directive trust for *push* ingress is the gateway's
31//! concern: it runs this engine only after verification succeeds.)
32//!
33//! ### Output
34//!
35//! [`DirectiveSpec::resolve`] returns a [`DispatchPlan`] — the effective
36//! `dispatch.playbook` + `execution_pool` overrides, idempotency key, content
37//! hints, the extracted [`TraceContext`], and an `applied` audit list for the
38//! `subscription.message.directives_applied` event (RFC §7.6).
39
40use std::collections::BTreeMap;
41
42use serde::{Deserialize, Serialize};
43
44/// Error type for directive parsing. Lean (no dependency on the noetl-tools
45/// `ToolError` or `anyhow`) so both consumers can map it into their own error
46/// channel via `Display` — the worker maps it into `anyhow`, the gateway logs
47/// it and falls back to a no-op plan, and noetl-tools converts it via the
48/// `From<DirectiveError>` impl on its `ToolError`.
49#[derive(Debug, thiserror::Error)]
50pub enum DirectiveError {
51    /// The `headers:` directive block was malformed or violated §7.5
52    /// (a routing control without its required value constraint).
53    #[error("Configuration error: {0}")]
54    Configuration(String),
55}
56
57// ---------------------------------------------------------------------------
58// Controls — what a directive header may bind to (RFC §7.2 table)
59// ---------------------------------------------------------------------------
60
61/// The dispatch concern a directive header controls.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum Control {
65    /// Redirect — run a different target playbook than the subscription
66    /// default. Constrained by an `allowed:` list (no arbitrary playbooks).
67    #[serde(rename = "dispatch.playbook")]
68    DispatchPlaybook,
69    /// Route the run to a different worker pool / command segment.
70    /// Constrained by an `allowed:` list.
71    #[serde(rename = "dispatch.execution_pool")]
72    DispatchExecutionPool,
73    /// Map a priority class to a pool/segment via `map:` (value → pool).
74    Priority,
75    /// Feed the dedup window + the spool item key. Free value (a key, not a
76    /// target).
77    IdempotencyKey,
78    /// Tell the dispatched playbook how to parse the body. Free hint.
79    ContentType,
80    /// Schema hint for the body. Free hint.
81    SchemaHint,
82}
83
84impl Control {
85    /// Stable wire string used in the applied-directive audit list.
86    pub fn as_str(&self) -> &'static str {
87        match self {
88            Control::DispatchPlaybook => "dispatch.playbook",
89            Control::DispatchExecutionPool => "dispatch.execution_pool",
90            Control::Priority => "priority",
91            Control::IdempotencyKey => "idempotency_key",
92            Control::ContentType => "content_type",
93            Control::SchemaHint => "schema_hint",
94        }
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Spec — the configurable allowlist (parsed from the `headers:` block)
100// ---------------------------------------------------------------------------
101
102/// One allowlisted directive rule: a header key, the concern it controls, and
103/// an optional value constraint.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct DirectiveRule {
106    /// Header key (lowercased on parse to match the normalized headers map).
107    pub header: String,
108    /// The dispatch concern this header controls.
109    pub controls: Control,
110    /// Value allowlist for routing controls — only these values may be
111    /// selected (`dispatch.playbook` / `dispatch.execution_pool`). A value
112    /// not on the list is ignored (the directive does not apply).
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub allowed: Option<Vec<String>>,
115    /// Value → target map for `priority` (e.g. `{ high: priority, normal: shared }`).
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub map: Option<BTreeMap<String, String>>,
118}
119
120/// How a header should be propagated as a distributed-trace context (RFC §7.4).
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
122#[serde(rename_all = "lowercase")]
123pub enum TracePropagation {
124    /// Do not extract trace context.
125    #[default]
126    None,
127    /// Honor W3C `traceparent` / `tracestate` / `baggage`.
128    W3c,
129}
130
131/// Trace-propagation configuration (the `headers.trace` block).
132#[derive(Debug, Clone, Serialize, Deserialize, Default)]
133pub struct TraceConfig {
134    /// Propagation mode. Default `none`.
135    #[serde(default)]
136    pub propagate: TracePropagation,
137    /// Baggage keys allowed to cross the boundary. Empty → no baggage.
138    #[serde(default)]
139    pub baggage_allowlist: Vec<String>,
140}
141
142impl TraceConfig {
143    fn is_enabled(&self) -> bool {
144        matches!(self.propagate, TracePropagation::W3c)
145    }
146}
147
148/// The parsed `headers:` directive block (RFC §7.2).
149///
150/// Default is fully off — no normalization influence, no directives, no
151/// trace — so a subscription without a `headers:` block behaves exactly as
152/// Phase 1.
153#[derive(Debug, Clone, Serialize, Deserialize, Default)]
154pub struct DirectiveSpec {
155    /// Build `message.headers` from the source channel. Phase 1 always
156    /// normalizes at the source client, so this is informational; the
157    /// directive engine operates on the already-normalized map regardless.
158    #[serde(default)]
159    pub normalize: bool,
160    /// The allowlist — only these header keys act as instructions.
161    #[serde(default)]
162    pub directives: Vec<DirectiveRule>,
163    /// Distributed-trace propagation config.
164    #[serde(default)]
165    pub trace: TraceConfig,
166    /// What to do with non-allowlisted headers. Always `data` in Phase 2
167    /// (they stay in `message.headers`); kept for forward-compat with a
168    /// future `drop`.
169    #[serde(default = "default_passthrough")]
170    pub passthrough: String,
171}
172
173fn default_passthrough() -> String {
174    "data".to_string()
175}
176
177// ---------------------------------------------------------------------------
178// Output — the resolved dispatch plan
179// ---------------------------------------------------------------------------
180
181/// W3C trace context extracted from a message's headers (RFC §7.4).
182///
183/// `execution_id` stays the primary NoETL trace key; this is the *external*
184/// join so cross-system traces stitch together. It rides in the execution's
185/// event `meta.trace` and is never a metric label (Observability P4).
186#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
187pub struct TraceContext {
188    /// W3C `traceparent` (`00-<trace-id>-<span-id>-<flags>`).
189    #[serde(skip_serializing_if = "Option::is_none")]
190    pub traceparent: Option<String>,
191    /// W3C `tracestate` (vendor list).
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub tracestate: Option<String>,
194    /// Allowlisted baggage key→value pairs.
195    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
196    pub baggage: BTreeMap<String, String>,
197}
198
199impl TraceContext {
200    /// True when no trace context was found (nothing to propagate).
201    pub fn is_empty(&self) -> bool {
202        self.traceparent.is_none() && self.tracestate.is_none() && self.baggage.is_empty()
203    }
204}
205
206/// One directive that actually applied, for the audit event (RFC §7.6).
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub struct AppliedDirective {
209    /// The header key that drove the directive.
210    pub header: String,
211    /// The concern it controlled.
212    pub controls: String,
213    /// The effective value selected (after `allowed:` / `map:`).
214    pub effective_value: String,
215}
216
217/// The resolved effect of a message's directives — what the runtime applies
218/// before `POST /api/execute`.
219#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
220pub struct DispatchPlan {
221    /// Effective target playbook override (`dispatch.playbook` redirect).
222    #[serde(skip_serializing_if = "Option::is_none")]
223    pub playbook_override: Option<String>,
224    /// Effective worker-pool / command-segment override
225    /// (`dispatch.execution_pool` or a `priority` map).
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub execution_pool_override: Option<String>,
228    /// Idempotency key for the dedup window + spool item key.
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub idempotency_key: Option<String>,
231    /// Content-type parse hint for the dispatched playbook.
232    #[serde(skip_serializing_if = "Option::is_none")]
233    pub content_type: Option<String>,
234    /// Schema hint for the body.
235    #[serde(skip_serializing_if = "Option::is_none")]
236    pub schema_hint: Option<String>,
237    /// Extracted W3C trace context.
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub trace: Option<TraceContext>,
240    /// Audit list of the directives that applied (RFC §7.6).
241    #[serde(default, skip_serializing_if = "Vec::is_empty")]
242    pub applied: Vec<AppliedDirective>,
243}
244
245impl DispatchPlan {
246    /// True when no directive applied and no trace was extracted (the common
247    /// case for a subscription with no `headers:` block).
248    pub fn is_noop(&self) -> bool {
249        self.playbook_override.is_none()
250            && self.execution_pool_override.is_none()
251            && self.idempotency_key.is_none()
252            && self.content_type.is_none()
253            && self.schema_hint.is_none()
254            && self.trace.is_none()
255            && self.applied.is_empty()
256    }
257}
258
259// ---------------------------------------------------------------------------
260// Engine
261// ---------------------------------------------------------------------------
262
263impl DirectiveSpec {
264    /// Parse a `headers:` block (a JSON object) into a [`DirectiveSpec`].
265    ///
266    /// Validates that routing controls carry the value constraint they
267    /// require: `dispatch.playbook` / `dispatch.execution_pool` need an
268    /// `allowed:` list, `priority` needs a `map:`. Without it an allowlisted
269    /// header could select an arbitrary target, defeating §7.5.
270    pub fn parse(value: &serde_json::Value) -> Result<DirectiveSpec, DirectiveError> {
271        let mut spec: DirectiveSpec = serde_json::from_value(value.clone()).map_err(|e| {
272            DirectiveError::Configuration(format!("Invalid subscription 'headers' block: {e}"))
273        })?;
274
275        for rule in spec.directives.iter_mut() {
276            // Normalize the header key to match the lowercased headers map.
277            rule.header = rule.header.to_ascii_lowercase();
278
279            match rule.controls {
280                Control::DispatchPlaybook | Control::DispatchExecutionPool => {
281                    let ok = rule.allowed.as_ref().map(|a| !a.is_empty()).unwrap_or(false);
282                    if !ok {
283                        return Err(DirectiveError::Configuration(format!(
284                            "directive header '{}' controls '{}' but declares no non-empty \
285                             'allowed:' value list — a routing directive must constrain its \
286                             targets (RFC §7.5)",
287                            rule.header,
288                            rule.controls.as_str()
289                        )));
290                    }
291                }
292                Control::Priority => {
293                    let ok = rule.map.as_ref().map(|m| !m.is_empty()).unwrap_or(false);
294                    if !ok {
295                        return Err(DirectiveError::Configuration(format!(
296                            "directive header '{}' controls 'priority' but declares no non-empty \
297                             'map:' (value → pool) — a priority directive must map to allowed \
298                             pools (RFC §7.5)",
299                            rule.header
300                        )));
301                    }
302                }
303                Control::IdempotencyKey | Control::ContentType | Control::SchemaHint => {}
304            }
305        }
306
307        Ok(spec)
308    }
309
310    /// Resolve this spec against a message's normalized headers map, producing
311    /// the [`DispatchPlan`] the runtime applies before dispatch.
312    ///
313    /// Only allowlisted keys are honored; routing controls are further
314    /// constrained by their `allowed:` / `map:` value lists. Multi-value
315    /// headers (Kafka allows duplicate keys; the array shape from the source
316    /// normalizer) are **last-wins** for directives (RFC §10 OQ7).
317    pub fn resolve(&self, headers: &serde_json::Map<String, serde_json::Value>) -> DispatchPlan {
318        let mut plan = DispatchPlan::default();
319
320        for rule in &self.directives {
321            let Some(raw) = headers.get(&rule.header) else {
322                continue;
323            };
324            let Some(value) = last_value(raw) else {
325                continue;
326            };
327
328            match rule.controls {
329                Control::DispatchPlaybook => {
330                    if value_allowed(rule.allowed.as_ref(), &value) {
331                        plan.playbook_override = Some(value.clone());
332                        plan.applied.push(applied(rule, &value));
333                    }
334                }
335                Control::DispatchExecutionPool => {
336                    if value_allowed(rule.allowed.as_ref(), &value) {
337                        plan.execution_pool_override = Some(value.clone());
338                        plan.applied.push(applied(rule, &value));
339                    }
340                }
341                Control::Priority => {
342                    if let Some(map) = rule.map.as_ref() {
343                        if let Some(pool) = map.get(&value) {
344                            // An explicit dispatch.execution_pool directive wins
345                            // over a priority mapping (RFC §10 OQ7 precedence).
346                            if plan.execution_pool_override.is_none() {
347                                plan.execution_pool_override = Some(pool.clone());
348                            }
349                            plan.applied.push(AppliedDirective {
350                                header: rule.header.clone(),
351                                controls: rule.controls.as_str().to_string(),
352                                effective_value: pool.clone(),
353                            });
354                        }
355                    }
356                }
357                Control::IdempotencyKey => {
358                    plan.idempotency_key = Some(value.clone());
359                    plan.applied.push(applied(rule, &value));
360                }
361                Control::ContentType => {
362                    plan.content_type = Some(value.clone());
363                    plan.applied.push(applied(rule, &value));
364                }
365                Control::SchemaHint => {
366                    plan.schema_hint = Some(value.clone());
367                    plan.applied.push(applied(rule, &value));
368                }
369            }
370        }
371
372        // Precedence fix-up: a `dispatch.execution_pool` directive must win
373        // over a `priority` map even when priority was declared first. Re-run
374        // the explicit-pool rules last.
375        for rule in &self.directives {
376            if rule.controls == Control::DispatchExecutionPool {
377                if let Some(raw) = headers.get(&rule.header) {
378                    if let Some(value) = last_value(raw) {
379                        if value_allowed(rule.allowed.as_ref(), &value) {
380                            plan.execution_pool_override = Some(value);
381                        }
382                    }
383                }
384            }
385        }
386
387        if self.trace.is_enabled() {
388            let trace = extract_w3c_trace(headers, &self.trace.baggage_allowlist);
389            if !trace.is_empty() {
390                plan.trace = Some(trace);
391            }
392        }
393
394        plan
395    }
396}
397
398// ---------------------------------------------------------------------------
399// Helpers
400// ---------------------------------------------------------------------------
401
402/// Pull the effective string value from a normalized header value. A
403/// single-value header is a `String`; a multi-value header is an `Array` —
404/// last-wins for directives (RFC §10 OQ7).
405fn last_value(raw: &serde_json::Value) -> Option<String> {
406    match raw {
407        serde_json::Value::String(s) => Some(s.clone()),
408        serde_json::Value::Array(arr) => arr
409            .iter()
410            .rev()
411            .find_map(|v| v.as_str().map(str::to_string)),
412        serde_json::Value::Number(n) => Some(n.to_string()),
413        serde_json::Value::Bool(b) => Some(b.to_string()),
414        _ => None,
415    }
416}
417
418/// True when `value` is permitted by the (optional) value allowlist. A
419/// routing control always carries one (enforced at parse); a free control
420/// passes any value.
421fn value_allowed(allowed: Option<&Vec<String>>, value: &str) -> bool {
422    match allowed {
423        Some(list) => list.iter().any(|a| a == value),
424        None => true,
425    }
426}
427
428fn applied(rule: &DirectiveRule, value: &str) -> AppliedDirective {
429    AppliedDirective {
430        header: rule.header.clone(),
431        controls: rule.controls.as_str().to_string(),
432        effective_value: value.to_string(),
433    }
434}
435
436/// Extract a W3C trace context from the normalized headers map.
437///
438/// Reads `traceparent` + `tracestate`, and parses the W3C `baggage` header
439/// (`k1=v1,k2=v2`) keeping only allowlisted keys. Validation of `traceparent`
440/// is loose — a malformed value is still carried (it is an external join, not
441/// a NoETL-authoritative id), but an obviously non-W3C shape is dropped so it
442/// never pollutes the join.
443pub fn extract_w3c_trace(
444    headers: &serde_json::Map<String, serde_json::Value>,
445    baggage_allowlist: &[String],
446) -> TraceContext {
447    let mut tc = TraceContext::default();
448
449    if let Some(tp) = headers.get("traceparent").and_then(last_value_ref) {
450        if is_plausible_traceparent(&tp) {
451            tc.traceparent = Some(tp);
452        }
453    }
454    if let Some(ts) = headers.get("tracestate").and_then(last_value_ref) {
455        tc.tracestate = Some(ts);
456    }
457    if !baggage_allowlist.is_empty() {
458        if let Some(raw) = headers.get("baggage").and_then(last_value_ref) {
459            for item in raw.split(',') {
460                let item = item.trim();
461                if let Some((k, v)) = item.split_once('=') {
462                    let key = k.trim();
463                    // A baggage member may carry `;`-delimited properties; keep
464                    // only the value.
465                    let val = v.split(';').next().unwrap_or("").trim();
466                    if baggage_allowlist.iter().any(|a| a == key) {
467                        tc.baggage.insert(key.to_string(), val.to_string());
468                    }
469                }
470            }
471        }
472    }
473
474    tc
475}
476
477fn last_value_ref(raw: &serde_json::Value) -> Option<String> {
478    last_value(raw)
479}
480
481/// Loose W3C `traceparent` shape check: 4 hyphen-delimited hex fields
482/// (`version-traceid-spanid-flags`), trace-id 32 hex, span-id 16 hex. We do
483/// not reject an all-zero id here (that is the caller's concern) — only an
484/// obviously non-W3C string.
485fn is_plausible_traceparent(s: &str) -> bool {
486    let parts: Vec<&str> = s.split('-').collect();
487    parts.len() == 4
488        && parts[0].len() == 2
489        && parts[1].len() == 32
490        && parts[2].len() == 16
491        && parts[3].len() == 2
492        && parts.iter().all(|p| p.bytes().all(|b| b.is_ascii_hexdigit()))
493}
494
495/// Normalize an HTTP header set into the uniform lowercased `message.headers`
496/// map (RFC §7.1). Multi-value headers collapse to an array. This is the
497/// HTTP-channel normalizer the gateway uses for push/webhook ingress, the
498/// counterpart of the noetl-tools source-client `normalize_headers`.
499pub fn normalize_http_headers(
500    raw: &[(String, String)],
501) -> serde_json::Map<String, serde_json::Value> {
502    let mut acc: BTreeMap<String, Vec<String>> = BTreeMap::new();
503    for (k, v) in raw {
504        acc.entry(k.to_ascii_lowercase()).or_default().push(v.clone());
505    }
506    let mut out = serde_json::Map::new();
507    for (k, mut vals) in acc {
508        let value = if vals.len() == 1 {
509            serde_json::Value::String(vals.pop().unwrap())
510        } else {
511            serde_json::Value::Array(vals.into_iter().map(serde_json::Value::String).collect())
512        };
513        out.insert(k, value);
514    }
515    out
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use serde_json::json;
522
523    fn headers(v: serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
524        v.as_object().unwrap().clone()
525    }
526
527    #[test]
528    fn empty_spec_is_noop() {
529        let spec = DirectiveSpec::default();
530        let plan = spec.resolve(&headers(json!({ "x-anything": "value" })));
531        assert!(plan.is_noop());
532    }
533
534    #[test]
535    fn parse_requires_allowed_for_routing() {
536        let err = DirectiveSpec::parse(&json!({
537            "directives": [{ "header": "x-route", "controls": "dispatch.playbook" }]
538        }))
539        .unwrap_err();
540        assert!(format!("{err}").contains("allowed"));
541
542        let err = DirectiveSpec::parse(&json!({
543            "directives": [{ "header": "x-pool", "controls": "dispatch.execution_pool", "allowed": [] }]
544        }))
545        .unwrap_err();
546        assert!(format!("{err}").contains("allowed"));
547
548        let err = DirectiveSpec::parse(&json!({
549            "directives": [{ "header": "x-prio", "controls": "priority" }]
550        }))
551        .unwrap_err();
552        assert!(format!("{err}").contains("map"));
553    }
554
555    #[test]
556    fn parse_lowercases_header_keys() {
557        let spec = DirectiveSpec::parse(&json!({
558            "directives": [{ "header": "X-Idempotency-Key", "controls": "idempotency_key" }]
559        }))
560        .unwrap();
561        assert_eq!(spec.directives[0].header, "x-idempotency-key");
562    }
563
564    #[test]
565    fn redirect_playbook_respects_allowlist() {
566        let spec = DirectiveSpec::parse(&json!({
567            "directives": [{
568                "header": "x-noetl-route",
569                "controls": "dispatch.playbook",
570                "allowed": ["domain/handle_billing", "domain/handle_fraud"]
571            }]
572        }))
573        .unwrap();
574
575        // Allowlisted value applies.
576        let plan = spec.resolve(&headers(json!({ "x-noetl-route": "domain/handle_fraud" })));
577        assert_eq!(plan.playbook_override.as_deref(), Some("domain/handle_fraud"));
578        assert_eq!(plan.applied.len(), 1);
579        assert_eq!(plan.applied[0].controls, "dispatch.playbook");
580
581        // Non-allowlisted value is ignored — never routes to an arbitrary playbook.
582        let plan = spec.resolve(&headers(json!({ "x-noetl-route": "domain/evil" })));
583        assert!(plan.playbook_override.is_none());
584        assert!(plan.applied.is_empty());
585    }
586
587    #[test]
588    fn execution_pool_override_and_priority_precedence() {
589        let spec = DirectiveSpec::parse(&json!({
590            "directives": [
591                { "header": "x-priority", "controls": "priority", "map": { "high": "priority", "normal": "shared" } },
592                { "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot", "priority", "shared"] }
593            ]
594        }))
595        .unwrap();
596
597        // Priority maps to a pool.
598        let plan = spec.resolve(&headers(json!({ "x-priority": "high" })));
599        assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
600
601        // Explicit pool wins over priority even when both present.
602        let plan = spec.resolve(&headers(json!({ "x-priority": "high", "x-noetl-pool": "iot" })));
603        assert_eq!(plan.execution_pool_override.as_deref(), Some("iot"));
604
605        // Unmapped priority class does nothing.
606        let plan = spec.resolve(&headers(json!({ "x-priority": "bogus" })));
607        assert!(plan.execution_pool_override.is_none());
608    }
609
610    #[test]
611    fn idempotency_content_schema_are_free_values() {
612        let spec = DirectiveSpec::parse(&json!({
613            "directives": [
614                { "header": "x-idempotency-key", "controls": "idempotency_key" },
615                { "header": "content-type", "controls": "content_type" },
616                { "header": "x-schema", "controls": "schema_hint" }
617            ]
618        }))
619        .unwrap();
620        let plan = spec.resolve(&headers(json!({
621            "x-idempotency-key": "abc-123",
622            "content-type": "application/json",
623            "x-schema": "order.v2"
624        })));
625        assert_eq!(plan.idempotency_key.as_deref(), Some("abc-123"));
626        assert_eq!(plan.content_type.as_deref(), Some("application/json"));
627        assert_eq!(plan.schema_hint.as_deref(), Some("order.v2"));
628        assert_eq!(plan.applied.len(), 3);
629    }
630
631    #[test]
632    fn non_allowlisted_headers_are_data_only() {
633        let spec = DirectiveSpec::parse(&json!({
634            "directives": [{ "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot"] }]
635        }))
636        .unwrap();
637        // A header NOT in the allowlist never drives anything.
638        let plan = spec.resolve(&headers(json!({ "x-evil-route": "domain/evil", "x-random": "data" })));
639        assert!(plan.is_noop());
640    }
641
642    #[test]
643    fn multi_value_header_is_last_wins() {
644        let spec = DirectiveSpec::parse(&json!({
645            "directives": [{ "header": "x-noetl-pool", "controls": "dispatch.execution_pool", "allowed": ["iot", "priority"] }]
646        }))
647        .unwrap();
648        let plan = spec.resolve(&headers(json!({ "x-noetl-pool": ["iot", "priority"] })));
649        assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
650    }
651
652    #[test]
653    fn w3c_trace_extracted_when_enabled() {
654        let spec = DirectiveSpec::parse(&json!({
655            "trace": { "propagate": "w3c", "baggage_allowlist": ["tenant", "request_id"] }
656        }))
657        .unwrap();
658        let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
659        let plan = spec.resolve(&headers(json!({
660            "traceparent": tp,
661            "tracestate": "vendor=abc",
662            "baggage": "tenant=acme,request_id=r-9, secret=nope"
663        })));
664        let trace = plan.trace.unwrap();
665        assert_eq!(trace.traceparent.as_deref(), Some(tp));
666        assert_eq!(trace.tracestate.as_deref(), Some("vendor=abc"));
667        assert_eq!(trace.baggage.get("tenant").map(String::as_str), Some("acme"));
668        assert_eq!(trace.baggage.get("request_id").map(String::as_str), Some("r-9"));
669        // Non-allowlisted baggage dropped.
670        assert!(!trace.baggage.contains_key("secret"));
671    }
672
673    #[test]
674    fn trace_disabled_by_default() {
675        let spec = DirectiveSpec::default();
676        let plan = spec.resolve(&headers(json!({
677            "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
678        })));
679        assert!(plan.trace.is_none());
680    }
681
682    #[test]
683    fn malformed_traceparent_dropped() {
684        let tc = extract_w3c_trace(&headers(json!({ "traceparent": "not-a-trace" })), &[]);
685        assert!(tc.traceparent.is_none());
686        assert!(tc.is_empty());
687    }
688
689    #[test]
690    fn redirect_example_from_rfc_7_3() {
691        // The RFC §7.3 worked example: x-noetl-route + x-noetl-pool, both allowlisted.
692        let spec = DirectiveSpec::parse(&json!({
693            "directives": [
694                { "header": "x-noetl-route", "controls": "dispatch.playbook",
695                  "allowed": ["domain/handle_billing", "domain/handle_fraud", "domain/handle_event"] },
696                { "header": "x-noetl-pool", "controls": "dispatch.execution_pool",
697                  "allowed": ["priority", "shared"] }
698            ],
699            "trace": { "propagate": "w3c" }
700        }))
701        .unwrap();
702        let plan = spec.resolve(&headers(json!({
703            "x-noetl-route": "domain/handle_fraud",
704            "x-noetl-pool": "priority"
705        })));
706        assert_eq!(plan.playbook_override.as_deref(), Some("domain/handle_fraud"));
707        assert_eq!(plan.execution_pool_override.as_deref(), Some("priority"));
708        assert_eq!(plan.applied.len(), 2);
709    }
710
711    #[test]
712    fn normalize_http_headers_lowercases_and_groups() {
713        let raw = vec![
714            ("X-Noetl-Route".to_string(), "domain/x".to_string()),
715            ("Accept".to_string(), "a".to_string()),
716            ("Accept".to_string(), "b".to_string()),
717        ];
718        let m = normalize_http_headers(&raw);
719        assert_eq!(m.get("x-noetl-route").unwrap(), "domain/x");
720        assert_eq!(m.get("accept").unwrap(), &json!(["a", "b"]));
721    }
722}