Skip to main content

harn_vm/
channels.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7
8use crate::event_log::{
9    active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
10    EventId, EventLog, LogEvent, Topic,
11};
12use crate::llm::vm_value_to_json;
13use crate::runtime_limits::RuntimeLimits;
14use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
15use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
16use crate::value::{VmError, VmValue};
17
18const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
19const CHANNEL_EVENT_KIND: &str = "channel.emit";
20const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
21const NAME_HEADER: &str = "harn.channel.name";
22const SCOPE_HEADER: &str = "harn.channel.scope";
23const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
24const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
25
26/// CH-06 (#1877): topic for `transcript.channel.emit` / `transcript.channel.match`
27/// transcript events. Consumers subscribe to this topic to render channel
28/// activity alongside reminder/suspension lifecycle events.
29pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
30pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
31pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
32
33/// CH-07 (#1878): durable audit topic for the replay-determinism receipts
34/// emitted alongside every channel emit + every channel match. Consumers
35/// drive replay/audit tooling off this topic instead of the lossy
36/// `transcript.channel.lifecycle` summary topic — receipts carry the full
37/// payload, signed timestamps, and cached match linkage so the
38/// `replay_oracle` can byte-compare two runs of the same workload.
39pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
40pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
41pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
42/// CH-07 (#1878): receipt schema header so downstream tooling can
43/// version-gate parsers (mirrors `harn.pool_submit.v1` from PL-06 / #1891).
44const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
45const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
46
47/// CH-11 (#1911): event kinds + schema header for guardrail middleware
48/// audit entries. Block + warn outcomes both write to the same
49/// `lifecycle.channel.audit` topic so security review tooling reads a
50/// single stream and discriminates on `kind`.
51pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
52pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
53const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
54
55/// CH-06 (#1877): per-event headers stamped on the `TriggerEvent` so the
56/// channel-match dispatcher can link the `ChannelMatch` span back to the
57/// originating `ChannelEmit` span across the async boundary (also through
58/// the aggregation buffer for batched triggers).
59const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
60const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
61
62static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
63static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67enum ChannelScope {
68    Session,
69    Pipeline,
70    Tenant,
71    Org,
72}
73
74impl ChannelScope {
75    fn parse(value: &str) -> Result<Self, ChannelError> {
76        match value.trim() {
77            "session" => Ok(Self::Session),
78            "pipeline" => Ok(Self::Pipeline),
79            "tenant" => Ok(Self::Tenant),
80            "org" => Ok(Self::Org),
81            other => Err(ChannelError::malformed(format!(
82                "HARN-CHN-003 malformed channel scope '{other}'"
83            ))),
84        }
85    }
86
87    fn as_str(self) -> &'static str {
88        match self {
89            Self::Session => "session",
90            Self::Pipeline => "pipeline",
91            Self::Tenant => "tenant",
92            Self::Org => "org",
93        }
94    }
95}
96
97#[derive(Clone, Debug, Default)]
98struct ChannelContext {
99    task_id: Option<String>,
100    root_task_id: Option<String>,
101    scope_id: Option<String>,
102    workflow_id: Option<String>,
103    run_id: Option<String>,
104    worker_id: Option<String>,
105    agent_session_id: Option<String>,
106    root_agent_session_id: Option<String>,
107    tenant_id: Option<String>,
108}
109
110#[derive(Clone, Debug, Default)]
111struct ChannelOptions {
112    scope: Option<ChannelScope>,
113    id: Option<String>,
114    tenant_id: Option<String>,
115    session_id: Option<String>,
116    pipeline_id: Option<String>,
117    from_cursor: Option<EventId>,
118    limit: Option<usize>,
119    ttl_ms: Option<i64>,
120}
121
122#[derive(Clone, Debug)]
123struct ResolvedChannel {
124    scope: ChannelScope,
125    scope_id: String,
126    resolved_name: String,
127    topic: Topic,
128    retention: &'static str,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct SignedTimestamp {
133    pub at_ms: i64,
134    pub at: String,
135    pub algorithm: String,
136    pub key_id: String,
137    pub signature: String,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141struct StoredChannelEvent {
142    id: String,
143    name: String,
144    payload: serde_json::Value,
145    emitted_at: SignedTimestamp,
146    emitted_by: String,
147    scope: String,
148    scope_id: String,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pipeline_id: Option<String>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    session_id: Option<String>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    tenant_id: Option<String>,
155    retention: String,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    ttl_ms: Option<i64>,
158}
159
160/// CH-07 (#1878): durable replay-determinism receipt for a single
161/// `emit_channel(...)` call. Pairs 1:1 with the `ChannelEmit` span (CH-06
162/// / #1877) and with the durable journal append. Persisted to the
163/// `lifecycle.channel.audit` event-log topic so the `replay_oracle` can
164/// reproduce the entire emit chain across two runs of the same workload.
165///
166/// `payload_hash` lets the oracle detect producer-side drift
167/// (`HARN-REP-CHN-002`) without having to canonicalize the full payload
168/// during comparison; `payload` carries the verbatim value for byte
169/// equality + replay reconstruction.
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct ChannelEmitReceipt {
172    pub event_id: String,
173    pub name_resolved: String,
174    pub scope: String,
175    pub scope_id: String,
176    pub payload_hash: String,
177    pub payload: serde_json::Value,
178    pub emitted_at: SignedTimestamp,
179    pub emitted_by: String,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub pipeline_id: Option<String>,
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub session_id: Option<String>,
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub tenant_id: Option<String>,
186    pub topic: String,
187    pub inserted: bool,
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub span_id: Option<u64>,
190}
191
192/// CH-07 (#1878): durable replay-determinism receipt for a single channel
193/// match — one per `(binding, event)` pair the dispatcher fires. For
194/// aggregation/batched triggers (CH-04 / #1875) the receipt carries
195/// `batch.constituent_event_ids` so the replay oracle can verify the
196/// full batch composition matches across runs (`HARN-REP-CHN-003`).
197///
198/// `event_id` doubles as the cached-match key: on replay the dispatcher
199/// looks up the recorded match by `event_id` instead of re-evaluating
200/// the filter spec, preserving "the journal IS the spec" determinism.
201#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
202pub struct ChannelMatchReceipt {
203    pub event_id: String,
204    pub trigger_id: String,
205    pub binding_key: String,
206    pub name_resolved: String,
207    pub scope: String,
208    pub scope_id: String,
209    pub matched_at: SignedTimestamp,
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub matched_in_session_id: Option<String>,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub batch: Option<ChannelMatchBatchInfo>,
214    pub handler_kind: String,
215    pub handler_result: ChannelMatchResultSummary,
216    #[serde(skip_serializing_if = "Option::is_none")]
217    pub span_id: Option<u64>,
218}
219
220/// CH-07 (#1878): batched-dispatch summary stamped onto
221/// `ChannelMatchReceipt`. The `constituent_event_ids` list is the full
222/// recorded composition; replay reconstructs the batch from those ids.
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224pub struct ChannelMatchBatchInfo {
225    pub count: usize,
226    pub constituent_event_ids: Vec<String>,
227}
228
229/// CH-07 (#1878): summary of the handler invocation outcome. Mirrors the
230/// shape of the dispatcher's `DispatchOutcome` but kept lightweight so
231/// the receipt stays compact and self-contained — replay tooling never
232/// needs to cross-reference the dispatcher log to interpret a match.
233#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
234pub struct ChannelMatchResultSummary {
235    pub status: String,
236    pub attempt_count: u32,
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub error: Option<String>,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub dispatch_failed: Option<bool>,
241}
242
243impl ChannelMatchResultSummary {
244    fn from_dispatch(
245        outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
246    ) -> Self {
247        match outcome {
248            Ok(outcome) => Self {
249                status: outcome.status.as_str().to_string(),
250                attempt_count: outcome.attempt_count,
251                error: outcome.error.clone(),
252                dispatch_failed: None,
253            },
254            Err(error) => Self {
255                status: "dispatch_error".to_string(),
256                attempt_count: 0,
257                error: Some(error.to_string()),
258                dispatch_failed: Some(true),
259            },
260        }
261    }
262}
263
264#[derive(Debug)]
265struct ChannelError(String);
266
267impl ChannelError {
268    fn missing_pipeline() -> Self {
269        Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
270    }
271
272    fn cross_tenant(message: impl Into<String>) -> Self {
273        Self(format!("HARN-CHN-002 {}", message.into()))
274    }
275
276    fn malformed(message: impl Into<String>) -> Self {
277        Self(message.into())
278    }
279
280    fn scope_ambiguous(message: impl Into<String>) -> Self {
281        Self(format!("HARN-CHN-004 {}", message.into()))
282    }
283}
284
285impl From<ChannelError> for VmError {
286    fn from(error: ChannelError) -> Self {
287        VmError::Runtime(error.0)
288    }
289}
290
291pub fn reset_channel_state() {
292    if let Some(slot) = SESSION_CHANNEL_LOG.get() {
293        *slot.lock().expect("channel session log poisoned") = None;
294    }
295    // CH-11 (#1911): clear the per-thread guardrail registry so the
296    // next pipeline starts with a clean slate. Mirrors how
297    // SESSION_CHANNEL_LOG is reset above.
298    crate::channel_guardrails::clear();
299}
300
301pub(crate) async fn emit_channel_from_vm(args: Vec<VmValue>) -> Result<VmValue, VmError> {
302    let name = required_string(args.first(), "emit_channel", "name")?;
303    let payload = vm_value_to_json(
304        args.get(1)
305            .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
306    );
307    let options = parse_options(args.get(2), "emit_channel")?;
308    let context = ChannelContext::current();
309    let resolved = resolve_channel(&name, &options, &context)?;
310    let event_id = options
311        .id
312        .clone()
313        .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
314    let emitted_by = emitted_by(&context);
315    let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
316    let occurred_at_ms = emitted_at.at_ms;
317
318    // CH-11 (#1911): channel guardrails middleware. Runs BEFORE the
319    // durable journal append so a blocked payload never gets persisted
320    // (the block itself IS persisted on the lifecycle.channel.audit
321    // topic so the audit trail is durable). Warn verdicts proceed but
322    // record an audit. The aggregate decision is the worst verdict
323    // across every registered guardrail.
324    let guardrail_context = serde_json::json!({
325        "name": name,
326        "name_resolved": resolved.resolved_name,
327        "scope": resolved.scope.as_str(),
328        "scope_id": resolved.scope_id,
329        "event_id": event_id,
330        "emitted_by": emitted_by,
331    });
332    let decision =
333        crate::channel_guardrails::evaluate(&payload, &guardrail_context, &resolved.resolved_name)
334            .await?;
335    if matches!(
336        decision.verdict,
337        crate::channel_guardrails::Verdict::Block { .. }
338    ) {
339        return handle_blocked_emit(
340            &name,
341            &resolved,
342            &event_id,
343            &emitted_by,
344            &emitted_at,
345            &payload,
346            &decision,
347        )
348        .await;
349    }
350    record_guardrail_warnings(
351        &resolved,
352        &event_id,
353        &emitted_by,
354        &payload,
355        decision.fired.as_slice(),
356    )
357    .await;
358    let record = StoredChannelEvent {
359        id: event_id.clone(),
360        name: resolved.resolved_name.clone(),
361        payload,
362        emitted_at,
363        emitted_by: emitted_by.clone(),
364        scope: resolved.scope.as_str().to_string(),
365        scope_id: resolved.scope_id.clone(),
366        pipeline_id: context.pipeline_id_for_receipt(&resolved),
367        session_id: context.session_id_for_receipt(&resolved),
368        tenant_id: context.tenant_id_for_receipt(&resolved),
369        retention: resolved.retention.to_string(),
370        ttl_ms: options.ttl_ms,
371    };
372
373    // CH-06 (#1877): open the ChannelEmit span around the durable append +
374    // trigger fan-out. The span captures emit-time metadata (scope, name,
375    // payload summary) and its trace/span id is stashed on the trigger
376    // event headers so the downstream ChannelMatch span can link back.
377    let mut emit_span = ChannelSpanGuard::start(
378        crate::tracing::SpanKind::ChannelEmit,
379        format!("channel.emit {}", resolved.resolved_name),
380        Vec::new(),
381    );
382    emit_span.set_metadata("event_id", serde_json::json!(record.id));
383    emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
384    emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
385    emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
386    emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
387    let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
388    let emit_link = emit_span.link();
389
390    let mut headers = BTreeMap::new();
391    headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
392    headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
393    headers.insert(
394        SCOPE_HEADER.to_string(),
395        resolved.scope.as_str().to_string(),
396    );
397    headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
398    headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
399
400    let log = log_for_scope(resolved.scope);
401    let mut log_event = LogEvent::new(
402        CHANNEL_EVENT_KIND,
403        serde_json::to_value(&record)
404            .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
405    )
406    .with_headers(headers);
407    log_event.occurred_at_ms = occurred_at_ms;
408    let outcome = log
409        .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
410        .await
411        .map_err(channel_log_error)?;
412    let receipt = receipt_value(
413        &resolved.topic,
414        outcome.event_id,
415        &outcome.event,
416        outcome.inserted,
417    )?;
418    // CH-06 (#1877): emit the transcript event whether the append was fresh
419    // or idempotent — both outcomes are first-class observability signals.
420    emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
421    // CH-07 (#1878): persist the durable emit receipt on the
422    // `lifecycle.channel.audit` topic so the replay oracle has the full
423    // emit chain (payload hash, signed timestamp, scope correlation)
424    // independently of the lossy transcript summary topic.
425    record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
426    // CH-02 (#1872): fan out the emit to channel-source triggers. Only fresh
427    // appends fan out; idempotent duplicates short-circuit because the producer
428    // already saw the original delivery.
429    if outcome.inserted {
430        let payload_json = outcome
431            .event
432            .payload
433            .get("payload")
434            .cloned()
435            .unwrap_or(serde_json::Value::Null);
436        let context_for_fanout = ChannelContext::current();
437        let fanout_payload = ChannelEventPayload {
438            id: event_id.clone(),
439            name: parse_name(&name)
440                .map(|parsed| parsed.name)
441                .unwrap_or_else(|_| resolved.resolved_name.clone()),
442            name_resolved: resolved.resolved_name.clone(),
443            scope: resolved.scope.as_str().to_string(),
444            scope_id: resolved.scope_id.clone(),
445            payload: payload_json,
446            emitted_by: emitted_by.clone(),
447            tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
448            session_id: context_for_fanout.session_id_for_receipt(&resolved),
449            pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
450        };
451        dispatch_channel_emit_to_triggers(&resolved, fanout_payload, emit_link).await?;
452    }
453    emit_span.end();
454    Ok(crate::stdlib::json_to_vm_value(&receipt))
455}
456
457pub(crate) async fn channel_events_from_vm(args: Vec<VmValue>) -> Result<VmValue, VmError> {
458    let name = required_string(args.first(), "channel_events", "name")?;
459    let options = parse_options(args.get(1), "channel_events")?;
460    let context = ChannelContext::current();
461    let resolved = resolve_channel(&name, &options, &context)?;
462    let events = log_for_scope(resolved.scope)
463        .read_range(
464            &resolved.topic,
465            options.from_cursor,
466            options.limit.unwrap_or(usize::MAX),
467        )
468        .await
469        .map_err(channel_log_error)?;
470    let values = events
471        .into_iter()
472        .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
473        .collect::<Result<Vec<_>, _>>()?;
474    Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
475        values,
476    )))
477}
478
479impl ChannelContext {
480    fn current() -> Self {
481        let mut context = Self::default();
482        if let Some(vm) = crate::vm::clone_async_builtin_child_vm() {
483            context.task_id = Some(vm.runtime_context.task_id.clone());
484            context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
485            context.scope_id = vm.runtime_context.scope_id.clone();
486            if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
487                context.workflow_id = dict_string(&values, "workflow_id");
488                context.run_id = dict_string(&values, "run_id");
489                context.worker_id = dict_string(&values, "worker_id");
490                context.agent_session_id = dict_string(&values, "agent_session_id");
491                context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
492                context.tenant_id = dict_string(&values, "tenant_id");
493            }
494        }
495        context.agent_session_id = context
496            .agent_session_id
497            .or_else(crate::agent_sessions::current_session_id);
498        context
499    }
500
501    fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
502        // CH-03 (#1874): an explicit `options.session_id` that disagrees with the
503        // active session is HARN-CHN-004 ambiguity, not a silent override. The
504        // resolver MUST be deterministic for a given runtime context.
505        if let Some(requested) = options.session_id.as_deref() {
506            if let Some(active) = self.agent_session_id.as_deref() {
507                if active != requested {
508                    return Err(ChannelError::scope_ambiguous(format!(
509                        "session scope ambiguous: options.session_id '{requested}' \
510                         conflicts with active session '{active}'"
511                    )));
512                }
513            }
514        }
515        Ok(options
516            .session_id
517            .clone()
518            .or_else(|| self.agent_session_id.clone())
519            .or_else(|| self.root_agent_session_id.clone())
520            .or_else(|| self.scope_id.clone())
521            .or_else(|| self.root_task_id.clone())
522            .unwrap_or_else(|| "session".to_string()))
523    }
524
525    fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
526        // CH-03 (#1874): explicit `options.pipeline_id` that conflicts with the
527        // active workflow/run is HARN-CHN-004. Two pipelines cannot share a
528        // resolved channel without an explicit disambiguation.
529        let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
530        if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
531        {
532            if requested != active {
533                return Err(ChannelError::scope_ambiguous(format!(
534                    "pipeline scope ambiguous: options.pipeline_id '{requested}' \
535                     conflicts with active pipeline '{active}'"
536                )));
537            }
538        }
539        options
540            .pipeline_id
541            .clone()
542            .or(active)
543            .ok_or_else(ChannelError::missing_pipeline)
544    }
545
546    fn tenant_id(
547        &self,
548        options: &ChannelOptions,
549        requested: Option<&str>,
550    ) -> Result<String, ChannelError> {
551        let current = self.tenant_id.as_deref();
552        let requested = requested
553            .map(ToOwned::to_owned)
554            .or_else(|| options.tenant_id.clone());
555        if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
556            if current != requested {
557                return Err(ChannelError::cross_tenant(format!(
558                    "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
559                )));
560            }
561        }
562        Ok(requested
563            .or_else(|| self.tenant_id.clone())
564            .unwrap_or_else(|| "default".to_string()))
565    }
566
567    fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
568        match resolved.scope {
569            ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
570            _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
571        }
572    }
573
574    fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
575        match resolved.scope {
576            ChannelScope::Session => Some(resolved.scope_id.clone()),
577            _ => self
578                .agent_session_id
579                .clone()
580                .or_else(|| self.root_agent_session_id.clone()),
581        }
582    }
583
584    fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
585        match resolved.scope {
586            ChannelScope::Tenant => Some(resolved.scope_id.clone()),
587            _ => self.tenant_id.clone(),
588        }
589    }
590}
591
592fn resolve_channel(
593    raw_name: &str,
594    options: &ChannelOptions,
595    context: &ChannelContext,
596) -> Result<ResolvedChannel, ChannelError> {
597    let parsed = parse_name(raw_name)?;
598    if let Some(option_scope) = options.scope {
599        if let Some(prefix_scope) = parsed.scope {
600            if prefix_scope != option_scope {
601                return Err(ChannelError::malformed(format!(
602                    "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
603                    prefix_scope.as_str(),
604                    option_scope.as_str()
605                )));
606            }
607        }
608    }
609
610    let scope = parsed
611        .scope
612        .or(options.scope)
613        .unwrap_or(ChannelScope::Tenant);
614    if scope == ChannelScope::Org {
615        return Err(ChannelError::cross_tenant(
616            "org-scoped channels are disabled until org grants are available",
617        ));
618    }
619
620    validate_channel_name(&parsed.name)?;
621    let scope_id = match scope {
622        ChannelScope::Session => match parsed.scope_id.clone() {
623            Some(id) => id,
624            None => context.session_id(options)?,
625        },
626        ChannelScope::Pipeline => context.pipeline_id(options)?,
627        ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
628        ChannelScope::Org => unreachable!("org scope returned above"),
629    };
630    validate_scope_id(scope, &scope_id)?;
631    let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
632    let topic = Topic::new(format!(
633        "channels.{}.{}.{}",
634        scope.as_str(),
635        sanitize_topic_component(&scope_id),
636        sanitize_topic_component(&parsed.name)
637    ))
638    .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
639    Ok(ResolvedChannel {
640        scope,
641        scope_id,
642        resolved_name,
643        topic,
644        retention: retention_for_scope(scope),
645    })
646}
647
648#[derive(Clone, Debug)]
649struct ParsedName {
650    scope: Option<ChannelScope>,
651    scope_id: Option<String>,
652    name: String,
653}
654
655fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
656    let raw_name = raw_name.trim();
657    if raw_name.is_empty() {
658        return Err(ChannelError::malformed(
659            "HARN-CHN-003 channel name cannot be empty",
660        ));
661    }
662    let Some((prefix, rest)) = raw_name.split_once(':') else {
663        return Ok(ParsedName {
664            scope: None,
665            scope_id: None,
666            name: raw_name.to_string(),
667        });
668    };
669    let scope = ChannelScope::parse(prefix)?;
670    match scope {
671        ChannelScope::Session | ChannelScope::Pipeline => {
672            if rest.is_empty() || rest.contains(':') {
673                return Err(ChannelError::malformed(format!(
674                    "HARN-CHN-003 malformed {} channel name '{raw_name}'",
675                    scope.as_str()
676                )));
677            }
678            Ok(ParsedName {
679                scope: Some(scope),
680                scope_id: None,
681                name: rest.to_string(),
682            })
683        }
684        ChannelScope::Tenant => {
685            if rest.is_empty() {
686                return Err(ChannelError::malformed(
687                    "HARN-CHN-003 tenant channel name cannot be empty",
688                ));
689            }
690            let (scope_id, name) = match rest.split_once(':') {
691                Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
692                    (Some(tenant_id.to_string()), name.to_string())
693                }
694                Some(_) => {
695                    return Err(ChannelError::malformed(format!(
696                        "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
697                    )))
698                }
699                None => (None, rest.to_string()),
700            };
701            Ok(ParsedName {
702                scope: Some(scope),
703                scope_id,
704                name,
705            })
706        }
707        ChannelScope::Org => {
708            let Some((org_id, name)) = rest.split_once(':') else {
709                return Err(ChannelError::malformed(format!(
710                    "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
711                )));
712            };
713            if org_id.is_empty() || name.is_empty() {
714                return Err(ChannelError::malformed(format!(
715                    "HARN-CHN-003 malformed org channel name '{raw_name}'"
716                )));
717            }
718            Ok(ParsedName {
719                scope: Some(scope),
720                scope_id: Some(org_id.to_string()),
721                name: name.to_string(),
722            })
723        }
724    }
725}
726
727fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
728    if name.trim().is_empty()
729        || name.contains(':')
730        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
731    {
732        return Err(ChannelError::malformed(format!(
733            "HARN-CHN-003 malformed channel name '{name}'"
734        )));
735    }
736    Ok(())
737}
738
739fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
740    if scope_id.trim().is_empty()
741        || scope_id
742            .chars()
743            .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
744    {
745        return Err(ChannelError::malformed(format!(
746            "HARN-CHN-003 malformed {} scope id '{scope_id}'",
747            scope.as_str()
748        )));
749    }
750    Ok(())
751}
752
753fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
754    match scope {
755        ChannelScope::Session => {
756            let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
757            let mut guard = slot.lock().expect("channel session log poisoned");
758            guard
759                .get_or_insert_with(|| {
760                    Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
761                        CHANNEL_QUEUE_DEPTH,
762                    )))
763                })
764                .clone()
765        }
766        ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
767            .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
768        ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
769    }
770}
771
772fn signed_timestamp(
773    resolved: &ResolvedChannel,
774    event_id: &str,
775    emitted_by: &str,
776) -> SignedTimestamp {
777    let at = crate::clock_mock::now_utc();
778    let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
779    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
780    let material = format!(
781        "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
782        resolved.resolved_name,
783        resolved.scope.as_str(),
784        resolved.scope_id
785    );
786    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
787        signing_salt(),
788        material.as_bytes(),
789    ));
790    SignedTimestamp {
791        at_ms,
792        at: at_text,
793        algorithm: "hmac-sha256".to_string(),
794        key_id: "local-session".to_string(),
795        signature: format!("sha256:{signature}"),
796    }
797}
798
799fn signing_salt() -> &'static [u8] {
800    SIGNING_SALT
801        .get_or_init(|| {
802            format!(
803                "harn-channel-signing-salt:{}:{}",
804                std::process::id(),
805                uuid::Uuid::now_v7()
806            )
807            .into_bytes()
808        })
809        .as_slice()
810}
811
812/// CH-07 (#1878): signed timestamp for a channel match. Mirrors the emit
813/// timestamp algorithm in `signed_timestamp` so the replay oracle can
814/// verify match timestamps with the same `signing_salt()` key material.
815/// Including `event_id` + `trigger_id` in the signing material binds the
816/// timestamp to this specific (emit, binding) pair so a replayed match
817/// can't be re-stamped onto a different binding.
818fn signed_match_timestamp(
819    resolved: &ResolvedChannel,
820    event_id: &str,
821    trigger_id: &str,
822) -> SignedTimestamp {
823    let at = crate::clock_mock::now_utc();
824    let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
825    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
826    let material = format!(
827        "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
828        resolved.resolved_name,
829        resolved.scope.as_str(),
830        resolved.scope_id
831    );
832    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
833        signing_salt(),
834        material.as_bytes(),
835    ));
836    SignedTimestamp {
837        at_ms,
838        at: at_text,
839        algorithm: "hmac-sha256".to_string(),
840        key_id: "local-session".to_string(),
841        signature: format!("sha256:{signature}"),
842    }
843}
844
845/// CH-07 (#1878): SHA-256 of the canonical JSON encoding of a channel
846/// emit payload. `serde_json::to_string` sorts object keys
847/// deterministically when the value originates from `serde_json::Value`
848/// (BTreeMap-backed `Map` with `preserve_order` disabled — the default
849/// for this crate). Used by the replay oracle to detect producer-side
850/// drift (`HARN-REP-CHN-002`).
851pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
852    let canonical = canonical_json_string(payload);
853    let digest = Sha256::digest(canonical.as_bytes());
854    format!("sha256:{}", hex::encode(digest))
855}
856
857/// CH-07 (#1878): canonicalize a JSON value to a deterministic string so
858/// the SHA-256 hash on `ChannelEmitReceipt.payload_hash` is stable
859/// across reruns. Recursively sorts object keys; arrays preserve their
860/// (semantically meaningful) order. Mirrors the canonicalization rule
861/// `replay_oracle::canonicalize_run` relies on for cross-run diffs.
862fn canonical_json_string(value: &serde_json::Value) -> String {
863    match value {
864        serde_json::Value::Object(map) => {
865            let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
866                std::collections::BTreeMap::new();
867            for (key, value) in map {
868                sorted.insert(key, value);
869            }
870            let parts: Vec<String> = sorted
871                .iter()
872                .map(|(key, value)| {
873                    format!(
874                        "{}:{}",
875                        serde_json::to_string(key).unwrap_or_else(|_| key.to_string()),
876                        canonical_json_string(value)
877                    )
878                })
879                .collect();
880            format!("{{{}}}", parts.join(","))
881        }
882        serde_json::Value::Array(items) => {
883            let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
884            format!("[{}]", parts.join(","))
885        }
886        other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
887    }
888}
889
890/// CH-07 (#1878): append a channel audit receipt to the durable
891/// `lifecycle.channel.audit` topic. Mirrors `emit_pool_*_receipt`
892/// (PL-06 / #1891). The audit log uses `active_event_log()` so receipts
893/// inherit the pipeline's durability (in-memory in tests; durable
894/// SQLite/etc. in production). Best-effort: receipt append errors do not
895/// fail the emit/match — the emit's user-visible receipt is the source
896/// of truth for caller behavior. Audit consumers learn about gaps via
897/// the canonical event-log read API.
898async fn append_channel_audit_event(
899    kind: &'static str,
900    schema: &'static str,
901    payload: serde_json::Value,
902) {
903    let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
904        Ok(topic) => topic,
905        Err(_) => return,
906    };
907    let log = active_event_log()
908        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
909    let mut headers = BTreeMap::new();
910    headers.insert("schema".to_string(), schema.to_string());
911    let _ = log
912        .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
913        .await;
914}
915
916/// CH-11 (#1911): emit a guardrail-blocked / -warned audit entry to
917/// the durable channel-audit topic AND to the in-process lifecycle
918/// audit log. The lifecycle entry is what `pipeline_lifecycle_audit_log_take()`
919/// surfaces to test fixtures; the event-log entry is what production
920/// audit consumers tail. Both echo the verbatim payload (callers
921/// concerned about PII should layer `redact::*` on top of the
922/// guardrail). Best-effort; audit write errors do not propagate.
923async fn record_guardrail_audit(
924    kind: &'static str,
925    resolved: &ResolvedChannel,
926    event_id: &str,
927    emitted_by: &str,
928    payload: &serde_json::Value,
929    fired: &[crate::channel_guardrails::FiredGuardrail],
930) {
931    let fired_json: Vec<serde_json::Value> = fired
932        .iter()
933        .map(|entry| {
934            serde_json::json!({
935                "id": entry.id,
936                "kind": entry.kind,
937                "verdict_label": entry.verdict_label,
938                "reason": entry.reason,
939            })
940        })
941        .collect();
942    let audit_payload = serde_json::json!({
943        "event_id": event_id,
944        "name_resolved": resolved.resolved_name,
945        "scope": resolved.scope.as_str(),
946        "scope_id": resolved.scope_id,
947        "emitted_by": emitted_by,
948        "payload_hash": channel_payload_hash(payload),
949        "payload": payload,
950        "fired": fired_json,
951    });
952    append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
953    // Mirror to the lifecycle audit log so tests using
954    // `pipeline_lifecycle_audit_log_take()` see the entry without
955    // having to scan the event log directly.
956    crate::orchestration::record_lifecycle_audit(kind, audit_payload);
957}
958
959/// CH-11 (#1911): record a warning audit for every Warn verdict that
960/// fired. A no-op when there were no Warn-level fires (the common
961/// case). Called BEFORE the durable append so the warning order
962/// matches the dispatch order.
963async fn record_guardrail_warnings(
964    resolved: &ResolvedChannel,
965    event_id: &str,
966    emitted_by: &str,
967    payload: &serde_json::Value,
968    fired: &[crate::channel_guardrails::FiredGuardrail],
969) {
970    if fired.is_empty() {
971        return;
972    }
973    record_guardrail_audit(
974        CHANNEL_GUARDRAIL_WARNING_KIND,
975        resolved,
976        event_id,
977        emitted_by,
978        payload,
979        fired,
980    )
981    .await;
982}
983
984/// CH-11 (#1911): record a `channel_guardrail_blocked` audit and
985/// return the synthetic "blocked" receipt so the caller can
986/// distinguish a guardrail block from a successful append or an
987/// idempotent dedupe. No durable journal append happens for a blocked
988/// emit; the audit log entry IS the durable artifact.
989async fn handle_blocked_emit(
990    raw_name: &str,
991    resolved: &ResolvedChannel,
992    event_id: &str,
993    emitted_by: &str,
994    emitted_at: &SignedTimestamp,
995    payload: &serde_json::Value,
996    decision: &crate::channel_guardrails::GuardrailDecision,
997) -> Result<VmValue, VmError> {
998    record_guardrail_audit(
999        CHANNEL_GUARDRAIL_BLOCKED_KIND,
1000        resolved,
1001        event_id,
1002        emitted_by,
1003        payload,
1004        decision.fired.as_slice(),
1005    )
1006    .await;
1007    let block_reason = decision
1008        .fired
1009        .iter()
1010        .rev()
1011        .find_map(|f| {
1012            if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1013                || f.verdict_label.contains("block")
1014            {
1015                Some(f.reason.clone())
1016            } else {
1017                None
1018            }
1019        })
1020        .unwrap_or_else(|| "guardrail blocked".to_string());
1021    let fired_json: Vec<serde_json::Value> = decision
1022        .fired
1023        .iter()
1024        .map(|entry| {
1025            serde_json::json!({
1026                "id": entry.id,
1027                "kind": entry.kind,
1028                "verdict_label": entry.verdict_label,
1029                "reason": entry.reason,
1030            })
1031        })
1032        .collect();
1033    let receipt = serde_json::json!({
1034        "event_id": event_id,
1035        "cursor": serde_json::Value::Null,
1036        "id": event_id,
1037        "name": raw_name,
1038        "name_resolved": resolved.resolved_name,
1039        "scope": resolved.scope.as_str(),
1040        "scope_id": resolved.scope_id,
1041        "emitted_at": emitted_at,
1042        "emitted_by": emitted_by,
1043        "retention": resolved.retention,
1044        "topic": resolved.topic.as_str(),
1045        "inserted": false,
1046        "duplicate": false,
1047        "blocked": true,
1048        "block_reason": block_reason,
1049        "guardrail_fired": fired_json,
1050    });
1051    Ok(crate::stdlib::json_to_vm_value(&receipt))
1052}
1053
1054/// CH-07 (#1878): build + persist the emit receipt on the audit topic.
1055/// Called from `emit_channel_from_vm` immediately after the durable
1056/// journal append succeeds (whether the append was fresh or
1057/// idempotent-suppressed — both outcomes are first-class audit signals).
1058async fn record_channel_emit_receipt(
1059    record: &StoredChannelEvent,
1060    resolved: &ResolvedChannel,
1061    inserted: bool,
1062    span_id: u64,
1063) {
1064    let receipt = ChannelEmitReceipt {
1065        event_id: record.id.clone(),
1066        name_resolved: resolved.resolved_name.clone(),
1067        scope: resolved.scope.as_str().to_string(),
1068        scope_id: resolved.scope_id.clone(),
1069        payload_hash: channel_payload_hash(&record.payload),
1070        payload: record.payload.clone(),
1071        emitted_at: record.emitted_at.clone(),
1072        emitted_by: record.emitted_by.clone(),
1073        pipeline_id: record.pipeline_id.clone(),
1074        session_id: record.session_id.clone(),
1075        tenant_id: record.tenant_id.clone(),
1076        topic: resolved.topic.as_str().to_string(),
1077        inserted,
1078        span_id: if span_id == 0 { None } else { Some(span_id) },
1079    };
1080    let payload = match serde_json::to_value(&receipt) {
1081        Ok(value) => value,
1082        Err(_) => return,
1083    };
1084    append_channel_audit_event(
1085        CHANNEL_EMIT_RECEIPT_KIND,
1086        CHANNEL_EMIT_RECEIPT_SCHEMA,
1087        payload,
1088    )
1089    .await;
1090}
1091
1092/// CH-07 (#1878): build + persist the match receipt on the audit topic.
1093/// Called from `fire_channel_match` after the dispatcher returns, so
1094/// `handler_result` reflects the recorded outcome (succeeded / failed /
1095/// dlq / cancelled / ...) and replay tooling can verify the same handler
1096/// shape fires on every replay.
1097#[allow(clippy::too_many_arguments)]
1098async fn record_channel_match_receipt(
1099    trigger_id: &str,
1100    binding_key: &str,
1101    handler_kind: &str,
1102    resolved: &ResolvedChannel,
1103    event_id: &str,
1104    matched_in_session_id: Option<&str>,
1105    batch: Option<ChannelMatchBatchInfo>,
1106    span_id: u64,
1107    dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1108) {
1109    let receipt = ChannelMatchReceipt {
1110        event_id: event_id.to_string(),
1111        trigger_id: trigger_id.to_string(),
1112        binding_key: binding_key.to_string(),
1113        name_resolved: resolved.resolved_name.clone(),
1114        scope: resolved.scope.as_str().to_string(),
1115        scope_id: resolved.scope_id.clone(),
1116        matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1117        matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1118        batch,
1119        handler_kind: handler_kind.to_string(),
1120        handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1121        span_id: if span_id == 0 { None } else { Some(span_id) },
1122    };
1123    let payload = match serde_json::to_value(&receipt) {
1124        Ok(value) => value,
1125        Err(_) => return,
1126    };
1127    append_channel_audit_event(
1128        CHANNEL_MATCH_RECEIPT_KIND,
1129        CHANNEL_MATCH_RECEIPT_SCHEMA,
1130        payload,
1131    )
1132    .await;
1133}
1134
1135/// CH-07 (#1878): extract `ChannelMatchBatchInfo` from the transcript
1136/// `batch_summary` JSON the dispatcher already builds for batched
1137/// triggers. Returns `None` for non-batched dispatch.
1138fn batch_info_from_summary(
1139    batch_summary: Option<&serde_json::Value>,
1140) -> Option<ChannelMatchBatchInfo> {
1141    let summary = batch_summary?.as_object()?;
1142    let count = summary
1143        .get("count")
1144        .and_then(|v| v.as_u64())
1145        .map(|n| n as usize)?;
1146    let constituent_event_ids = summary
1147        .get("constituent_event_ids")
1148        .and_then(|v| v.as_array())
1149        .map(|arr| {
1150            arr.iter()
1151                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1152                .collect()
1153        })
1154        .unwrap_or_default();
1155    Some(ChannelMatchBatchInfo {
1156        count,
1157        constituent_event_ids,
1158    })
1159}
1160
1161fn emitted_by(context: &ChannelContext) -> String {
1162    context
1163        .worker_id
1164        .clone()
1165        .or_else(|| context.agent_session_id.clone())
1166        .or_else(|| context.task_id.clone())
1167        .unwrap_or_else(|| "harn".to_string())
1168}
1169
1170fn retention_for_scope(scope: ChannelScope) -> &'static str {
1171    match scope {
1172        ChannelScope::Session => "in_process_session",
1173        ChannelScope::Pipeline => "pipeline_event_log",
1174        ChannelScope::Tenant => "tenant_event_log",
1175        ChannelScope::Org => "org_event_log",
1176    }
1177}
1178
1179fn receipt_value(
1180    topic: &Topic,
1181    event_id: EventId,
1182    event: &LogEvent,
1183    inserted: bool,
1184) -> Result<serde_json::Value, VmError> {
1185    let record = stored_record(event)?;
1186    Ok(serde_json::json!({
1187        "event_id": event_id,
1188        "cursor": event_id,
1189        "id": record.id,
1190        "name": record.name,
1191        "name_resolved": record.name,
1192        "scope": record.scope,
1193        "scope_id": record.scope_id,
1194        "payload": record.payload,
1195        "emitted_at": record.emitted_at,
1196        "emitted_by": record.emitted_by,
1197        "pipeline_id": record.pipeline_id,
1198        "session_id": record.session_id,
1199        "tenant_id": record.tenant_id,
1200        "retention": record.retention,
1201        "ttl_ms": record.ttl_ms,
1202        "topic": topic.as_str(),
1203        "inserted": inserted,
1204        "duplicate": !inserted,
1205    }))
1206}
1207
1208fn event_value(
1209    topic: &Topic,
1210    event_id: EventId,
1211    event: LogEvent,
1212) -> Result<serde_json::Value, VmError> {
1213    let record = stored_record(&event)?;
1214    Ok(serde_json::json!({
1215        "event_id": event_id,
1216        "cursor": event_id,
1217        "topic": topic.as_str(),
1218        "kind": event.kind,
1219        "headers": event.headers,
1220        "occurred_at_ms": event.occurred_at_ms,
1221        "id": record.id,
1222        "name": record.name,
1223        "name_resolved": record.name,
1224        "scope": record.scope,
1225        "scope_id": record.scope_id,
1226        "payload": record.payload,
1227        "emitted_at": record.emitted_at,
1228        "emitted_by": record.emitted_by,
1229        "pipeline_id": record.pipeline_id,
1230        "session_id": record.session_id,
1231        "tenant_id": record.tenant_id,
1232        "retention": record.retention,
1233        "ttl_ms": record.ttl_ms,
1234    }))
1235}
1236
1237fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1238    serde_json::from_value(event.payload.clone()).map_err(|error| {
1239        VmError::Runtime(format!(
1240            "channel event store contained malformed channel payload: {error}"
1241        ))
1242    })
1243}
1244
1245fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1246    let Some(value) = value else {
1247        return Ok(ChannelOptions::default());
1248    };
1249    match value {
1250        VmValue::Nil => Ok(ChannelOptions::default()),
1251        VmValue::Dict(options) => Ok(ChannelOptions {
1252            scope: option_string(options, "scope", builtin)?
1253                .map(|scope| ChannelScope::parse(&scope))
1254                .transpose()
1255                .map_err(VmError::from)?,
1256            id: option_string(options, "id", builtin)?,
1257            tenant_id: option_string(options, "tenant_id", builtin)?,
1258            session_id: option_string(options, "session_id", builtin)?,
1259            pipeline_id: option_string(options, "pipeline_id", builtin)?,
1260            from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1261                .or(option_non_negative_int(options, "cursor", builtin)?)
1262                .map(|value| value as EventId),
1263            limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1264            ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1265        }),
1266        other => Err(VmError::TypeError(format!(
1267            "{builtin}: options must be a dict or nil, got {}",
1268            other.type_name()
1269        ))),
1270    }
1271}
1272
1273fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1274    match value {
1275        Some(VmValue::String(value)) => Ok(value.to_string()),
1276        Some(other) => Err(VmError::TypeError(format!(
1277            "{builtin}: {name} must be a string, got {}",
1278            other.type_name()
1279        ))),
1280        None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1281    }
1282}
1283
1284fn option_string(
1285    options: &BTreeMap<String, VmValue>,
1286    key: &str,
1287    builtin: &str,
1288) -> Result<Option<String>, VmError> {
1289    match options.get(key) {
1290        None | Some(VmValue::Nil) => Ok(None),
1291        Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1292        Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1293            "{builtin}: options.{key} cannot be empty"
1294        ))),
1295        Some(other) => Err(VmError::TypeError(format!(
1296            "{builtin}: options.{key} must be a string or nil, got {}",
1297            other.type_name()
1298        ))),
1299    }
1300}
1301
1302fn option_non_negative_int(
1303    options: &BTreeMap<String, VmValue>,
1304    key: &str,
1305    builtin: &str,
1306) -> Result<Option<u64>, VmError> {
1307    match options.get(key) {
1308        None | Some(VmValue::Nil) => Ok(None),
1309        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1310        Some(other) => Err(VmError::TypeError(format!(
1311            "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1312            other.type_name()
1313        ))),
1314    }
1315}
1316
1317fn option_duration_ms(
1318    options: &BTreeMap<String, VmValue>,
1319    key: &str,
1320    builtin: &str,
1321) -> Result<Option<i64>, VmError> {
1322    match options.get(key) {
1323        None | Some(VmValue::Nil) => Ok(None),
1324        Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1325        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1326        Some(other) => Err(VmError::TypeError(format!(
1327            "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1328            other.type_name()
1329        ))),
1330    }
1331}
1332
1333fn dict_string(values: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
1334    match values.get(key) {
1335        Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1336        _ => None,
1337    }
1338}
1339
1340fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1341    VmError::Runtime(format!("channel event log: {error}"))
1342}
1343
1344/// CH-06 (#1877): RAII guard around channel emit/match tracing spans.
1345///
1346/// Mirrors `PoolSpanGuard` (PL-06 / #1891): opens both a thread-local
1347/// Harn span (visible to `trace_spans()`) and an OTel `tracing::Span`
1348/// (visible to the exporter), wires OTel span links via
1349/// `crate::observability::otel::set_span_link`, and closes them both on
1350/// `end()` / `Drop`. Disabled-tracing path is a no-op because
1351/// `crate::tracing::span_start_*` returns id 0 and short-circuits.
1352struct ChannelSpanGuard {
1353    span_id: u64,
1354    otel_span: tracing::Span,
1355}
1356
1357impl ChannelSpanGuard {
1358    fn start(
1359        kind: crate::tracing::SpanKind,
1360        name: String,
1361        links: Vec<crate::tracing::SpanLink>,
1362    ) -> Self {
1363        Self::start_with_parenting(kind, name, links, true)
1364    }
1365
1366    fn start_detached(
1367        kind: crate::tracing::SpanKind,
1368        name: String,
1369        links: Vec<crate::tracing::SpanLink>,
1370    ) -> Self {
1371        Self::start_with_parenting(kind, name, links, false)
1372    }
1373
1374    fn start_with_parenting(
1375        kind: crate::tracing::SpanKind,
1376        name: String,
1377        links: Vec<crate::tracing::SpanLink>,
1378        inherit_parent: bool,
1379    ) -> Self {
1380        let span_id = if inherit_parent {
1381            crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1382        } else {
1383            crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1384        };
1385        let otel_span = tracing::info_span!(
1386            target: "harn.vm.channel",
1387            "harn.channel",
1388            harn.kind = kind.as_str(),
1389            harn.name = %name,
1390        );
1391        for link in links {
1392            let trace_id = crate::TraceId(link.trace_id);
1393            let mut attributes: std::collections::HashMap<String, String> =
1394                link.attributes.into_iter().collect();
1395            attributes
1396                .entry("harn.link.kind".to_string())
1397                .or_insert_with(|| "channel_emit".to_string());
1398            let _ = crate::observability::otel::set_span_link(
1399                &otel_span,
1400                &trace_id,
1401                &link.span_id,
1402                Some(attributes),
1403            );
1404        }
1405        Self { span_id, otel_span }
1406    }
1407
1408    fn link(&self) -> Option<crate::tracing::SpanLink> {
1409        crate::observability::otel::current_span_context_hex(&self.otel_span)
1410            .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1411            .or_else(|| crate::tracing::span_link(self.span_id))
1412    }
1413
1414    fn set_metadata(&self, key: &str, value: serde_json::Value) {
1415        crate::tracing::span_set_metadata(self.span_id, key, value);
1416    }
1417
1418    fn end(&mut self) {
1419        if self.span_id != 0 {
1420            crate::tracing::span_end(self.span_id);
1421            self.span_id = 0;
1422        }
1423    }
1424}
1425
1426impl Drop for ChannelSpanGuard {
1427    fn drop(&mut self) {
1428        self.end();
1429    }
1430}
1431
1432/// CH-06 (#1877): summarize an emit payload for the transcript event so
1433/// downstream renderers / audit feeds don't have to ingest the full
1434/// payload. Numbers/bools render verbatim; strings are truncated; dicts
1435/// and lists collapse to their top-level field count. Keeps the
1436/// transcript log compact while preserving enough context for human
1437/// inspection.
1438fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1439    const MAX_STRING_LEN: usize = 120;
1440    match payload {
1441        serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1442        serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1443        serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1444        serde_json::Value::String(value) => {
1445            let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1446            let len = value.chars().count();
1447            serde_json::json!({
1448                "kind": "string",
1449                "value": truncated,
1450                "truncated": len > MAX_STRING_LEN,
1451                "length": len,
1452            })
1453        }
1454        serde_json::Value::Array(items) => {
1455            serde_json::json!({"kind": "array", "length": items.len()})
1456        }
1457        serde_json::Value::Object(map) => {
1458            let fields: Vec<&String> = map.keys().take(8).collect();
1459            serde_json::json!({
1460                "kind": "object",
1461                "field_count": map.len(),
1462                "fields": fields,
1463            })
1464        }
1465    }
1466}
1467
1468/// CH-06 (#1877): append a channel transcript lifecycle event onto the
1469/// active event log. No-op when no log is installed (e.g. unit tests
1470/// running outside a VM context) so emission stays infallible from the
1471/// caller's perspective.
1472fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1473    let Some(log) = active_event_log() else {
1474        return;
1475    };
1476    let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1477        return;
1478    };
1479    let event = LogEvent::new(kind, payload);
1480    if tokio::runtime::Handle::try_current().is_ok() {
1481        if let Ok(join) = std::thread::Builder::new()
1482            .name("harn-channel-transcript".to_string())
1483            .spawn(move || {
1484                let _ = futures::executor::block_on(log.append(&topic, event));
1485            })
1486        {
1487            let _ = join.join();
1488        }
1489    } else {
1490        let _ = futures::executor::block_on(log.append(&topic, event));
1491    }
1492}
1493
1494/// CH-06 (#1877): emit the `transcript.channel.emit` lifecycle event the
1495/// moment the durable append succeeds (whether the append was fresh or
1496/// idempotent). Carries the emit span id when tracing is on so the
1497/// transcript log can be stitched against the OTel trace.
1498fn emit_channel_emit_transcript(
1499    record: &StoredChannelEvent,
1500    resolved: &ResolvedChannel,
1501    inserted: bool,
1502    span_id: u64,
1503) {
1504    let payload = serde_json::json!({
1505        "event_id": record.id,
1506        "name": record.name,
1507        "name_resolved": resolved.resolved_name,
1508        "scope": record.scope,
1509        "scope_id": record.scope_id,
1510        "payload_summary": summarize_payload(&record.payload),
1511        "emitted_at": record.emitted_at,
1512        "emitted_at_ms": record.emitted_at.at_ms,
1513        "emitted_by": record.emitted_by,
1514        "session_id": record.session_id,
1515        "pipeline_id": record.pipeline_id,
1516        "tenant_id": record.tenant_id,
1517        "inserted": inserted,
1518        "duplicate": !inserted,
1519        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1520    });
1521    emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1522}
1523
1524/// CH-06 (#1877): emit the `transcript.channel.match` lifecycle event
1525/// just before the dispatcher invokes the handler. Carries the match
1526/// span id and, for batched triggers, the constituent event ids so the
1527/// transcript can render the full batch context inline.
1528#[allow(clippy::too_many_arguments)]
1529fn emit_channel_match_transcript(
1530    trigger_id: &str,
1531    handler_kind: &str,
1532    resolved: &ResolvedChannel,
1533    event_id: &str,
1534    matched_at_ms: i64,
1535    matched_in_session_id: Option<&str>,
1536    span_id: u64,
1537    batch: Option<serde_json::Value>,
1538) {
1539    let mut payload = serde_json::json!({
1540        "event_id": event_id,
1541        "name_resolved": resolved.resolved_name,
1542        "scope": resolved.scope.as_str(),
1543        "scope_id": resolved.scope_id,
1544        "trigger_id": trigger_id,
1545        "handler_kind": handler_kind,
1546        "matched_at_ms": matched_at_ms,
1547        "matched_in_session_id": matched_in_session_id,
1548        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1549    });
1550    if let Some(batch) = batch {
1551        if let Some(map) = payload.as_object_mut() {
1552            map.insert("batch".to_string(), batch);
1553        }
1554    }
1555    emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1556}
1557
1558/// CH-06 (#1877): collect the originating-emit span links stashed on a
1559/// batch of `TriggerEvent`s. The non-batched dispatch path uses the
1560/// single-event variant directly; the aggregated/batched path threads
1561/// the per-event headers through the buffer and rebuilds the link list
1562/// here so the resulting `ChannelMatch` span multi-links to every
1563/// constituent emit.
1564fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1565    let mut links = Vec::new();
1566    if let (Some(trace_id), Some(span_id)) = (
1567        event.headers.get(EMIT_TRACE_ID_HEADER),
1568        event.headers.get(EMIT_SPAN_ID_HEADER),
1569    ) {
1570        links.push(
1571            crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1572                BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1573            ),
1574        );
1575    }
1576    links
1577}
1578
1579fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1580    let mut links = Vec::new();
1581    for event in events {
1582        links.extend(emit_links_from_event(event));
1583    }
1584    links
1585}
1586
1587fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1588    let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1589    serde_json::json!({
1590        "count": events.len(),
1591        "constituent_event_ids": constituent_ids,
1592    })
1593}
1594
1595/// Parsed channel-source trigger selector.
1596///
1597/// Trigger DSL strings look like `channel:<scope>:<scope-id>:<name>` with
1598/// shorthand forms for tenant-default and session/pipeline scopes. The
1599/// `scope_id_pattern` is `None` for "current" (e.g. `channel:foo` against
1600/// the trigger's tenant) or `Some("*")` for an explicit wildcard
1601/// (`channel:tenant:*:foo`).
1602#[derive(Clone, Debug, PartialEq, Eq)]
1603pub struct ChannelSelector {
1604    scope: ChannelScope,
1605    scope_id_pattern: ScopeIdPattern,
1606    name: String,
1607}
1608
1609#[derive(Clone, Debug, PartialEq, Eq)]
1610enum ScopeIdPattern {
1611    /// Match the current tenant/session/pipeline of the trigger registry
1612    /// (no explicit scope id supplied in the selector string).
1613    Current,
1614    /// Explicit scope id from the selector string.
1615    Exact(String),
1616    /// Wildcard, e.g. `tenant:*:foo` — match any scope id within the
1617    /// trigger's entitled boundary (today: the current tenant only).
1618    Wildcard,
1619}
1620
1621impl ChannelSelector {
1622    /// Parse a `channel:...` trigger source string.
1623    ///
1624    /// Accepted shapes:
1625    /// - `channel:<name>` — tenant scope (current tenant), exact `<name>`
1626    /// - `channel:session:<name>` — session scope, exact `<name>`
1627    /// - `channel:pipeline:<name>` — pipeline scope, exact `<name>`
1628    /// - `channel:tenant:<tenant-id>:<name>` — explicit tenant
1629    /// - `channel:tenant:*:<name>` — tenant wildcard (within entitlement)
1630    /// - `channel:org:<org-id>:<name>` — explicit org (currently disabled)
1631    pub fn parse(input: &str) -> Result<Self, String> {
1632        let input = input.trim();
1633        let rest = input
1634            .strip_prefix("channel:")
1635            .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1636        if rest.is_empty() {
1637            return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1638        }
1639
1640        let (head, tail_opt) = match rest.split_once(':') {
1641            Some((head, tail)) => (head, Some(tail)),
1642            None => (rest, None),
1643        };
1644        let parsed_scope = ChannelScope::parse(head).ok();
1645        match (parsed_scope, tail_opt) {
1646            // `channel:<name>` — tenant default.
1647            (None, _) => {
1648                let name = rest.to_string();
1649                validate_selector_name(&name)?;
1650                Ok(Self {
1651                    scope: ChannelScope::Tenant,
1652                    scope_id_pattern: ScopeIdPattern::Current,
1653                    name,
1654                })
1655            }
1656            (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1657                if !name.is_empty() =>
1658            {
1659                if name.contains(':') {
1660                    return Err(format!(
1661                        "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1662                        scope.as_str()
1663                    ));
1664                }
1665                validate_selector_name(name)?;
1666                Ok(Self {
1667                    scope,
1668                    scope_id_pattern: ScopeIdPattern::Current,
1669                    name: name.to_string(),
1670                })
1671            }
1672            (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1673                if !tail.is_empty() =>
1674            {
1675                let Some((scope_id, name)) = tail.split_once(':') else {
1676                    // `channel:tenant:foo` — treat as `<name>` in tenant default.
1677                    if matches!(scope, ChannelScope::Tenant) {
1678                        validate_selector_name(tail)?;
1679                        return Ok(Self {
1680                            scope,
1681                            scope_id_pattern: ScopeIdPattern::Current,
1682                            name: tail.to_string(),
1683                        });
1684                    }
1685                    return Err(format!(
1686                        "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1687                    ));
1688                };
1689                if scope_id.is_empty() || name.is_empty() {
1690                    return Err(format!(
1691                        "channel selector `{input}`: scope id and name must be non-empty"
1692                    ));
1693                }
1694                validate_selector_name(name)?;
1695                let pattern = if scope_id == "*" {
1696                    ScopeIdPattern::Wildcard
1697                } else {
1698                    ScopeIdPattern::Exact(scope_id.to_string())
1699                };
1700                Ok(Self {
1701                    scope,
1702                    scope_id_pattern: pattern,
1703                    name: name.to_string(),
1704                })
1705            }
1706            (Some(scope), _) => Err(format!(
1707                "channel selector `{input}`: {} scope requires `<name>` segment",
1708                scope.as_str()
1709            )),
1710        }
1711    }
1712
1713    pub fn scope(&self) -> &'static str {
1714        self.scope.as_str()
1715    }
1716
1717    pub fn name(&self) -> &str {
1718        &self.name
1719    }
1720
1721    /// Returns true if the supplied emit (scope, scope_id, name) matches this
1722    /// selector. `current_tenant` lets the matcher resolve the implicit
1723    /// "current tenant" boundary used by both `Current` and `Wildcard` modes.
1724    pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1725        if self.scope.as_str() != scope || self.name != name {
1726            return false;
1727        }
1728        match &self.scope_id_pattern {
1729            ScopeIdPattern::Current => match self.scope {
1730                ChannelScope::Tenant => scope_id == current_tenant,
1731                ChannelScope::Session | ChannelScope::Pipeline => {
1732                    // For session/pipeline, "current" means trigger and emit
1733                    // share a runtime context. In v1 (in-process registry)
1734                    // this is implicit: both producer and consumer run in the
1735                    // same VM, so any scope_id within this scope type matches.
1736                    true
1737                }
1738                ChannelScope::Org => false,
1739            },
1740            ScopeIdPattern::Exact(value) => scope_id == value,
1741            ScopeIdPattern::Wildcard => match self.scope {
1742                ChannelScope::Tenant => true,
1743                // Session/pipeline wildcards aren't entitled yet; org wildcards disabled.
1744                _ => false,
1745            },
1746        }
1747    }
1748}
1749
1750fn validate_selector_name(name: &str) -> Result<(), String> {
1751    if name.trim().is_empty()
1752        || name.contains(':')
1753        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1754    {
1755        return Err(format!("channel selector name `{name}` is malformed"));
1756    }
1757    Ok(())
1758}
1759
1760/// Dispatch a freshly emitted channel event to any registered triggers whose
1761/// `channel:` source selector matches the (scope, scope_id, name) tuple.
1762///
1763/// This is the consumer side of the `emit_channel` ↔ trigger plumbing
1764/// (CH-02 / #1872). Errors from individual handlers do not abort the
1765/// emit; they surface in the dispatcher's DLQ + retry pipeline.
1766///
1767/// CH-04 (#1875): bindings with `batch { count, window, key, expire_action }`
1768/// route events through an aggregation buffer instead of dispatching one
1769/// event at a time. When the buffer hits `count`, the dispatcher fires
1770/// the handler with a batched event (`event.batch` populated). When the
1771/// `window` elapses with fewer than `count` events, the dispatcher
1772/// either fires a partial batch (default) or discards the buffer.
1773async fn dispatch_channel_emit_to_triggers(
1774    resolved: &ResolvedChannel,
1775    payload: ChannelEventPayload,
1776    emit_link: Option<crate::tracing::SpanLink>,
1777) -> Result<(), VmError> {
1778    // Snapshot matching bindings outside of any async work so the registry
1779    // borrow is short-lived.
1780    let bindings = crate::triggers::registry::channel_bindings_matching(
1781        resolved.scope.as_str(),
1782        &resolved.scope_id,
1783        &payload.name,
1784    );
1785
1786    // CH-04 (#1875): flush any aggregation buffers whose window has
1787    // elapsed BEFORE this emit so old batches go out in order. The
1788    // implicit sweep runs even for emits that don't match any binding so
1789    // a stale buffer can't outlive the trigger lifecycle. Tests can also
1790    // call `flush_trigger_aggregations()` directly for deterministic
1791    // window-expire coverage.
1792    flush_expired_aggregations_inner().await;
1793
1794    if bindings.is_empty() {
1795        return Ok(());
1796    }
1797    let Some(base_vm) = crate::vm::clone_async_builtin_child_vm() else {
1798        // No host VM (e.g. raw test path); nothing to dispatch.
1799        return Ok(());
1800    };
1801    let log = active_event_log()
1802        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1803    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1804    for binding in bindings {
1805        // Filter (#1872 acceptance): JSON-path-equality on payload.
1806        // Applied BEFORE aggregation so the buffer only collects events
1807        // the handler actually cares about.
1808        if let Some(filter_str) = binding.filter.as_ref() {
1809            if !channel_filter_matches(filter_str, &payload.payload) {
1810                continue;
1811            }
1812        }
1813        let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1814
1815        // CH-04 (#1875): aggregation path. When the binding declared
1816        // `batch`, accumulate into the per-(binding, partition_key)
1817        // buffer; only dispatch once the threshold is reached.
1818        if let Some(aggregation_config) = binding.aggregation.as_ref() {
1819            let partition_key = crate::triggers::aggregation::partition_key_for_event(
1820                aggregation_config,
1821                &payload.payload,
1822            );
1823            let binding_key = binding.binding_key();
1824            let outcome = crate::triggers::aggregation::accumulate(
1825                &binding_key,
1826                aggregation_config,
1827                partition_key.as_deref(),
1828                event,
1829            );
1830            if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1831                // CH-06 (#1877): the ChannelMatch span for a batched
1832                // trigger multi-links to ALL constituent ChannelEmit
1833                // spans so the trace tree shows the aggregation fan-in.
1834                let links = emit_links_from_batch(&events);
1835                let batch_summary = batch_summary_for_transcript(&events);
1836                let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1837                {
1838                    Ok(batched) => batched,
1839                    Err(error) => {
1840                        return Err(VmError::Runtime(format!(
1841                            "emit_channel aggregation batch: {error}"
1842                        )));
1843                    }
1844                };
1845                fire_channel_match(
1846                    &dispatcher,
1847                    binding.clone(),
1848                    batched,
1849                    resolved,
1850                    links,
1851                    Some(batch_summary),
1852                )
1853                .await;
1854            }
1855            continue;
1856        }
1857
1858        let links = emit_links_from_event(&event);
1859        fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1860    }
1861    Ok(())
1862}
1863
1864/// CH-06 (#1877): open a `ChannelMatch` span, emit the transcript
1865/// lifecycle event, and dispatch the trigger handler. The span links
1866/// back to the originating ChannelEmit span (multi-link for batched
1867/// triggers) via `set_span_link` from P-05 (#1858).
1868async fn fire_channel_match(
1869    dispatcher: &crate::triggers::Dispatcher,
1870    binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1871    event: TriggerEvent,
1872    resolved: &ResolvedChannel,
1873    links: Vec<crate::tracing::SpanLink>,
1874    batch_summary: Option<serde_json::Value>,
1875) {
1876    let trigger_id = binding.id.as_str().to_string();
1877    let handler_kind = binding.handler.kind().to_string();
1878    // CH-07 (#1878): use the channel emit's id (carried as the trigger
1879    // event's `dedupe_key`) so the match receipt links back to the
1880    // original `ChannelEmitReceipt.event_id`. `TriggerEvent::new` mints
1881    // a fresh `trigger_evt_*` id for its own `event.id` field that does
1882    // not correlate to the emit chain.
1883    let event_id = if event.dedupe_key.is_empty() {
1884        event.id.0.clone()
1885    } else {
1886        event.dedupe_key.clone()
1887    };
1888    let mut match_span = ChannelSpanGuard::start_detached(
1889        crate::tracing::SpanKind::ChannelMatch,
1890        format!("channel.match {}", resolved.resolved_name),
1891        links,
1892    );
1893    match_span.set_metadata("event_id", serde_json::json!(event_id));
1894    match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
1895    match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
1896    match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
1897    if let Some(summary) = batch_summary.as_ref() {
1898        match_span.set_metadata("batch", summary.clone());
1899    }
1900    let span_id = crate::tracing::current_span_id().unwrap_or(0);
1901    let matched_at_ms = crate::clock_mock::now_utc().unix_timestamp_nanos() as i64 / 1_000_000;
1902    let matched_in_session_id = crate::agent_sessions::current_session_id()
1903        .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
1904    emit_channel_match_transcript(
1905        &trigger_id,
1906        &handler_kind,
1907        resolved,
1908        &event_id,
1909        matched_at_ms,
1910        matched_in_session_id.as_deref(),
1911        span_id,
1912        batch_summary.clone(),
1913    );
1914    // CH-07 (#1878): capture the dispatch outcome BEFORE writing the
1915    // match receipt so the receipt records the recorded handler result
1916    // (succeeded/failed/dlq/...) — replay tooling treats the receipt as
1917    // the cached match: on replay the dispatcher looks up the receipt
1918    // by `event_id` instead of re-evaluating the filter spec.
1919    let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
1920    let binding_key = binding.binding_key();
1921    let batch_info = batch_info_from_summary(batch_summary.as_ref());
1922    record_channel_match_receipt(
1923        &trigger_id,
1924        &binding_key,
1925        &handler_kind,
1926        resolved,
1927        &event_id,
1928        matched_in_session_id.as_deref(),
1929        batch_info,
1930        span_id,
1931        &dispatch_outcome,
1932    )
1933    .await;
1934    // Pre-CH-07 the dispatch error was discarded with `let _ = ...`. The
1935    // CH-07 match receipt now records the failure with
1936    // `dispatch_failed: true` so audit consumers don't lose the signal —
1937    // we keep the same fire-and-forget callsite semantics here.
1938    drop(dispatch_outcome);
1939    match_span.end();
1940}
1941
1942/// Drain all expired aggregation buffers and dispatch them. Exposed as
1943/// the `flush_trigger_aggregations()` builtin so Harn scripts (and the
1944/// production runtime) can deterministically advance window-expire
1945/// processing.
1946pub(crate) async fn flush_expired_aggregations_inner() {
1947    let expirations = crate::triggers::aggregation::drain_expired_aggregations();
1948    if expirations.is_empty() {
1949        return;
1950    }
1951    let Some(base_vm) = crate::vm::clone_async_builtin_child_vm() else {
1952        return;
1953    };
1954    let log = active_event_log()
1955        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1956    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1957    for expired in expirations {
1958        if matches!(
1959            expired.action,
1960            crate::triggers::aggregation::ExpireAction::Discard
1961        ) {
1962            continue;
1963        }
1964        // Resolve the binding by parsing the binding_key (id@vN). Skip
1965        // if the binding has been terminated since the buffer was opened.
1966        let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
1967            continue;
1968        };
1969        let Ok(version) = version_str.parse::<u32>() else {
1970            continue;
1971        };
1972        let Ok(binding) =
1973            crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
1974        else {
1975            continue;
1976        };
1977        // CH-06 (#1877): rebuild the channel-resolved metadata from the
1978        // first buffered event so the ChannelMatch span carries the
1979        // right scope/name even for window-expire flushes.
1980        let resolved_for_match = resolved_from_first_event(&expired.events);
1981        let links = emit_links_from_batch(&expired.events);
1982        let batch_summary = batch_summary_for_transcript(&expired.events);
1983        let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
1984        {
1985            Ok(batched) => batched,
1986            Err(_) => continue,
1987        };
1988        match resolved_for_match {
1989            Some(resolved) => {
1990                fire_channel_match(
1991                    &dispatcher,
1992                    binding,
1993                    batched,
1994                    &resolved,
1995                    links,
1996                    Some(batch_summary),
1997                )
1998                .await;
1999            }
2000            None => {
2001                let _ = dispatcher.dispatch(&binding, batched).await;
2002            }
2003        }
2004    }
2005}
2006
2007/// CH-06 (#1877): synthesize a `ResolvedChannel` from the first buffered
2008/// trigger event when a window-expire flush dispatches without going
2009/// back through `resolve_channel`. Returns `None` if the event payload
2010/// isn't a known channel payload (defensive — should not happen in
2011/// practice since the dispatcher only buffers channel events).
2012fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2013    let first = events.first()?;
2014    let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2015    else {
2016        return None;
2017    };
2018    let scope = ChannelScope::parse(&payload.scope).ok()?;
2019    let topic = Topic::new(format!(
2020        "channels.{}.{}.{}",
2021        payload.scope,
2022        sanitize_topic_component(&payload.scope_id),
2023        sanitize_topic_component(&payload.name),
2024    ))
2025    .ok()?;
2026    Some(ResolvedChannel {
2027        scope,
2028        scope_id: payload.scope_id.clone(),
2029        resolved_name: payload.name_resolved.clone(),
2030        topic,
2031        retention: retention_for_scope(scope),
2032    })
2033}
2034
2035fn build_channel_trigger_event(
2036    payload: &ChannelEventPayload,
2037    emit_link: Option<&crate::tracing::SpanLink>,
2038) -> TriggerEvent {
2039    let mut event = TriggerEvent::new(
2040        ProviderId::from("channel"),
2041        "channel.emit",
2042        None,
2043        payload.id.clone(),
2044        payload.tenant_id.clone().map(TenantId::new),
2045        BTreeMap::new(),
2046        ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2047        SignatureStatus::Unsigned,
2048    );
2049    event.headers.insert(
2050        "harn_channel_name".to_string(),
2051        payload.name_resolved.clone(),
2052    );
2053    event
2054        .headers
2055        .insert("harn_channel_scope".to_string(), payload.scope.clone());
2056    event.headers.insert(
2057        "harn_channel_scope_id".to_string(),
2058        payload.scope_id.clone(),
2059    );
2060    // CH-06 (#1877): stash the ChannelEmit span coordinates on the trigger
2061    // event so the downstream ChannelMatch span can link back via
2062    // `set_span_link` — even after travelling through an aggregation
2063    // buffer or being serialized into a batched envelope.
2064    if let Some(link) = emit_link {
2065        event
2066            .headers
2067            .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2068        event
2069            .headers
2070            .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2071    }
2072    event
2073}
2074
2075/// Evaluate the trigger filter spec (CH-02 / #1872) against the channel payload.
2076///
2077/// Supported syntax v1: JSON dict (`{"repo": "harn"}`) — each key is a
2078/// dot-path into the payload that must equality-match the value. Missing
2079/// path = no match. Non-dict filter strings are treated as no-op (return
2080/// true) so we don't regress pre-existing trigger `filter:` semantics that
2081/// reuse this field for other purposes.
2082fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2083    let trimmed = filter_raw.trim();
2084    if trimmed.is_empty() {
2085        return true;
2086    }
2087    let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2088        Ok(value) => value,
2089        Err(_) => return true,
2090    };
2091    let Some(map) = parsed.as_object() else {
2092        return true;
2093    };
2094    map.iter()
2095        .all(|(key, expected)| match payload_path(payload, key) {
2096            Some(actual) => actual == expected,
2097            None => false,
2098        })
2099}
2100
2101fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2102    let mut current = value;
2103    for segment in path.split('.') {
2104        if segment.is_empty() {
2105            return None;
2106        }
2107        current = match current {
2108            serde_json::Value::Object(map) => map.get(segment)?,
2109            _ => return None,
2110        };
2111    }
2112    Some(current)
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117    use super::*;
2118
2119    fn context() -> ChannelContext {
2120        ChannelContext {
2121            task_id: Some("task".to_string()),
2122            root_task_id: Some("root".to_string()),
2123            ..ChannelContext::default()
2124        }
2125    }
2126
2127    #[test]
2128    fn resolves_bare_name_to_default_tenant() {
2129        let resolved =
2130            resolve_channel("pr.merged", &ChannelOptions::default(), &context()).unwrap();
2131        assert_eq!(resolved.scope, ChannelScope::Tenant);
2132        assert_eq!(resolved.resolved_name, "tenant:default:pr.merged");
2133        assert_eq!(resolved.topic.as_str(), "channels.tenant.default.pr.merged");
2134    }
2135
2136    #[test]
2137    fn resolves_session_prefix_from_context() {
2138        let resolved =
2139            resolve_channel("session:agent.done", &ChannelOptions::default(), &context()).unwrap();
2140        assert_eq!(resolved.scope, ChannelScope::Session);
2141        assert_eq!(resolved.resolved_name, "session:root:agent.done");
2142    }
2143
2144    #[test]
2145    fn missing_pipeline_context_reports_channel_error() {
2146        let err = resolve_channel(
2147            "pipeline:stage.done",
2148            &ChannelOptions::default(),
2149            &context(),
2150        )
2151        .unwrap_err();
2152        assert!(err.0.contains("HARN-CHN-001"));
2153    }
2154
2155    #[test]
2156    fn org_scope_is_disabled() {
2157        let err = resolve_channel(
2158            "org:burin-labs:pr.merged",
2159            &ChannelOptions::default(),
2160            &context(),
2161        )
2162        .unwrap_err();
2163        assert!(err.0.contains("HARN-CHN-002"));
2164    }
2165
2166    #[test]
2167    fn explicit_session_id_matching_context_resolves() {
2168        let ctx = ChannelContext {
2169            agent_session_id: Some("sess-A".to_string()),
2170            ..context()
2171        };
2172        let options = ChannelOptions {
2173            session_id: Some("sess-A".to_string()),
2174            ..ChannelOptions::default()
2175        };
2176        let resolved = resolve_channel("session:agent.done", &options, &ctx).unwrap();
2177        assert_eq!(resolved.scope, ChannelScope::Session);
2178        assert_eq!(resolved.resolved_name, "session:sess-A:agent.done");
2179    }
2180
2181    #[test]
2182    fn explicit_session_id_conflict_reports_ambiguity() {
2183        let ctx = ChannelContext {
2184            agent_session_id: Some("sess-A".to_string()),
2185            ..context()
2186        };
2187        let options = ChannelOptions {
2188            session_id: Some("sess-B".to_string()),
2189            ..ChannelOptions::default()
2190        };
2191        let err = resolve_channel("session:agent.done", &options, &ctx).unwrap_err();
2192        assert!(
2193            err.0.contains("HARN-CHN-004"),
2194            "expected HARN-CHN-004, got: {}",
2195            err.0
2196        );
2197    }
2198
2199    #[test]
2200    fn explicit_pipeline_id_conflict_reports_ambiguity() {
2201        let ctx = ChannelContext {
2202            workflow_id: Some("pipe-A".to_string()),
2203            ..context()
2204        };
2205        let options = ChannelOptions {
2206            pipeline_id: Some("pipe-B".to_string()),
2207            ..ChannelOptions::default()
2208        };
2209        let err = resolve_channel("pipeline:stage.done", &options, &ctx).unwrap_err();
2210        assert!(
2211            err.0.contains("HARN-CHN-004"),
2212            "expected HARN-CHN-004, got: {}",
2213            err.0
2214        );
2215    }
2216
2217    #[test]
2218    fn explicit_tenant_mismatch_reports_cross_tenant() {
2219        let ctx = ChannelContext {
2220            tenant_id: Some("tenant-A".to_string()),
2221            ..context()
2222        };
2223        let options = ChannelOptions {
2224            tenant_id: Some("tenant-B".to_string()),
2225            ..ChannelOptions::default()
2226        };
2227        let err = resolve_channel("pr.merged", &options, &ctx).unwrap_err();
2228        assert!(
2229            err.0.contains("HARN-CHN-002"),
2230            "expected HARN-CHN-002, got: {}",
2231            err.0
2232        );
2233    }
2234
2235    #[test]
2236    fn explicit_tenant_in_name_matching_context_resolves() {
2237        let ctx = ChannelContext {
2238            tenant_id: Some("tenant-A".to_string()),
2239            ..context()
2240        };
2241        let resolved = resolve_channel(
2242            "tenant:tenant-A:pr.merged",
2243            &ChannelOptions::default(),
2244            &ctx,
2245        )
2246        .unwrap();
2247        assert_eq!(resolved.scope, ChannelScope::Tenant);
2248        assert_eq!(resolved.resolved_name, "tenant:tenant-A:pr.merged");
2249    }
2250
2251    #[test]
2252    fn cross_tenant_via_name_prefix_is_rejected() {
2253        let ctx = ChannelContext {
2254            tenant_id: Some("tenant-A".to_string()),
2255            ..context()
2256        };
2257        let err = resolve_channel(
2258            "tenant:tenant-B:pr.merged",
2259            &ChannelOptions::default(),
2260            &ctx,
2261        )
2262        .unwrap_err();
2263        assert!(err.0.contains("HARN-CHN-002"));
2264    }
2265
2266    #[test]
2267    fn channel_selector_parses_tenant_default_shorthand() {
2268        let selector = ChannelSelector::parse("channel:pr.merged").expect("parses");
2269        assert_eq!(selector.scope(), "tenant");
2270        assert_eq!(selector.name(), "pr.merged");
2271        assert!(selector.matches("tenant", "default", "pr.merged", "default"));
2272        assert!(!selector.matches("tenant", "default", "other.event", "default"));
2273        assert!(!selector.matches("session", "default", "pr.merged", "default"));
2274    }
2275
2276    #[test]
2277    fn channel_selector_parses_session_scope() {
2278        let selector = ChannelSelector::parse("channel:session:my-event").expect("parses");
2279        assert_eq!(selector.scope(), "session");
2280        assert_eq!(selector.name(), "my-event");
2281        assert!(selector.matches("session", "any-session-id", "my-event", "any-session-id"));
2282        assert!(!selector.matches("tenant", "any", "my-event", "any"));
2283    }
2284
2285    #[test]
2286    fn channel_selector_parses_explicit_tenant() {
2287        let selector =
2288            ChannelSelector::parse("channel:tenant:burin-labs:pr.merged").expect("parses");
2289        assert!(selector.matches("tenant", "burin-labs", "pr.merged", "default"));
2290        assert!(!selector.matches("tenant", "other-tenant", "pr.merged", "default"));
2291    }
2292
2293    #[test]
2294    fn channel_selector_parses_tenant_wildcard() {
2295        let selector = ChannelSelector::parse("channel:tenant:*:pr.merged").expect("parses");
2296        assert!(selector.matches("tenant", "burin-labs", "pr.merged", "default"));
2297        assert!(selector.matches("tenant", "other-tenant", "pr.merged", "default"));
2298        assert!(!selector.matches("tenant", "any", "different-event", "default"));
2299        assert!(!selector.matches("session", "any", "pr.merged", "default"));
2300    }
2301
2302    #[test]
2303    fn channel_selector_rejects_malformed_inputs() {
2304        assert!(ChannelSelector::parse("not-a-channel").is_err());
2305        assert!(ChannelSelector::parse("channel:").is_err());
2306        assert!(ChannelSelector::parse("channel:session:").is_err());
2307        assert!(ChannelSelector::parse("channel:session:has:extra:colons").is_err());
2308        assert!(ChannelSelector::parse("channel:org:no-name").is_err());
2309        assert!(ChannelSelector::parse("channel:tenant::missing-id").is_err());
2310        assert!(ChannelSelector::parse("channel:tenant:foo:").is_err());
2311    }
2312
2313    #[test]
2314    fn channel_filter_matches_equality_paths() {
2315        let payload = serde_json::json!({"repo": "harn", "nested": {"k": "v"}});
2316        assert!(channel_filter_matches("{\"repo\": \"harn\"}", &payload));
2317        assert!(!channel_filter_matches("{\"repo\": \"other\"}", &payload));
2318        assert!(channel_filter_matches("{\"nested.k\": \"v\"}", &payload));
2319        assert!(!channel_filter_matches("{\"nested.k\": \"x\"}", &payload));
2320        // Missing path → no match.
2321        assert!(!channel_filter_matches("{\"missing\": \"x\"}", &payload));
2322        // Empty filter → permissive.
2323        assert!(channel_filter_matches("", &payload));
2324        // Non-dict filter → permissive (back-compat with legacy `filter:` field).
2325        assert!(channel_filter_matches("just-a-string", &payload));
2326    }
2327
2328    // CH-07 (#1878): `channel_payload_hash` is the byte signal the
2329    // replay-oracle's `HARN-REP-CHN-002` diagnostic compares across
2330    // runs. The hash MUST be deterministic across object-key iteration
2331    // order (so the test in `replay_oracle` and the conformance fixture
2332    // both rely on canonical ordering) and MUST change when the payload
2333    // changes.
2334    #[test]
2335    fn channel_payload_hash_is_deterministic_across_key_order() {
2336        let a = serde_json::json!({"a": 1, "b": 2, "nested": {"x": 10, "y": 20}});
2337        let b = serde_json::json!({"nested": {"y": 20, "x": 10}, "b": 2, "a": 1});
2338        assert_eq!(channel_payload_hash(&a), channel_payload_hash(&b));
2339    }
2340
2341    #[test]
2342    fn channel_payload_hash_changes_with_value_drift() {
2343        let baseline = serde_json::json!({"repo": "harn", "attempt": 1});
2344        let drifted = serde_json::json!({"repo": "harn", "attempt": 2});
2345        assert_ne!(
2346            channel_payload_hash(&baseline),
2347            channel_payload_hash(&drifted)
2348        );
2349    }
2350
2351    #[test]
2352    fn channel_payload_hash_is_sha256_prefixed_hex() {
2353        let value = serde_json::json!({"k": "v"});
2354        let hash = channel_payload_hash(&value);
2355        assert!(hash.starts_with("sha256:"));
2356        assert_eq!(hash.len(), "sha256:".len() + 64);
2357    }
2358}