Skip to main content

harn_vm/channels/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::atomic::AtomicBool;
3use std::sync::{Arc, Mutex, OnceLock};
4
5use futures::StreamExt;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::format_description::well_known::Rfc3339;
9
10use crate::event_log::{
11    active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
12    ConsumerId, EventId, EventLog, LogEvent, Topic,
13};
14use crate::llm::vm_value_to_json;
15use crate::runtime_limits::RuntimeLimits;
16use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
17use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
18use crate::value::{VmError, VmStream, VmValue};
19
20const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
21const CHANNEL_EVENT_KIND: &str = "channel.emit";
22const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
23const NAME_HEADER: &str = "harn.channel.name";
24const SCOPE_HEADER: &str = "harn.channel.scope";
25const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
26const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
27
28/// CH-06 (#1877): topic for `transcript.channel.emit` / `transcript.channel.match`
29/// transcript events. Consumers subscribe to this topic to render channel
30/// activity alongside reminder/suspension lifecycle events.
31pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
32pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
33pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
34
35/// CH-07 (#1878): durable audit topic for the replay-determinism receipts
36/// emitted alongside every channel emit + every channel match. Consumers
37/// drive replay/audit tooling off this topic instead of the lossy
38/// `transcript.channel.lifecycle` summary topic — receipts carry the full
39/// payload, signed timestamps, and cached match linkage so the
40/// `replay_oracle` can byte-compare two runs of the same workload.
41pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
42pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
43pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
44/// CH-07 (#1878): receipt schema header so downstream tooling can
45/// version-gate parsers (mirrors `harn.pool_submit.v1` from PL-06 / #1891).
46const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
47const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
48
49/// CH-11 (#1911): event kinds + schema header for guardrail middleware
50/// audit entries. Block + warn outcomes both write to the same
51/// `lifecycle.channel.audit` topic so security review tooling reads a
52/// single stream and discriminates on `kind`.
53pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
54pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
55const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
56
57/// CH-06 (#1877): per-event headers stamped on the `TriggerEvent` so the
58/// channel-match dispatcher can link the `ChannelMatch` span back to the
59/// originating `ChannelEmit` span across the async boundary (also through
60/// the aggregation buffer for batched triggers).
61const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
62const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
63
64static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
65static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
66
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69enum ChannelScope {
70    Session,
71    Pipeline,
72    Tenant,
73    Org,
74}
75
76impl ChannelScope {
77    fn parse(value: &str) -> Result<Self, ChannelError> {
78        match value.trim() {
79            "session" => Ok(Self::Session),
80            "pipeline" => Ok(Self::Pipeline),
81            "tenant" => Ok(Self::Tenant),
82            "org" => Ok(Self::Org),
83            other => Err(ChannelError::malformed(format!(
84                "HARN-CHN-003 malformed channel scope '{other}'"
85            ))),
86        }
87    }
88
89    fn as_str(self) -> &'static str {
90        match self {
91            Self::Session => "session",
92            Self::Pipeline => "pipeline",
93            Self::Tenant => "tenant",
94            Self::Org => "org",
95        }
96    }
97}
98
99#[derive(Clone, Debug, Default)]
100struct ChannelContext {
101    task_id: Option<String>,
102    root_task_id: Option<String>,
103    scope_id: Option<String>,
104    workflow_id: Option<String>,
105    run_id: Option<String>,
106    worker_id: Option<String>,
107    agent_session_id: Option<String>,
108    root_agent_session_id: Option<String>,
109    tenant_id: Option<String>,
110}
111
112#[derive(Clone, Debug, Default)]
113struct ChannelOptions {
114    scope: Option<ChannelScope>,
115    id: Option<String>,
116    tenant_id: Option<String>,
117    session_id: Option<String>,
118    pipeline_id: Option<String>,
119    from_cursor: Option<EventId>,
120    limit: Option<usize>,
121    ttl_ms: Option<i64>,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedChannel {
126    scope: ChannelScope,
127    scope_id: String,
128    resolved_name: String,
129    topic: Topic,
130    retention: &'static str,
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
134pub struct SignedTimestamp {
135    pub at_ms: i64,
136    pub at: String,
137    pub algorithm: String,
138    pub key_id: String,
139    pub signature: String,
140}
141
142#[derive(Clone, Debug, Serialize, Deserialize)]
143struct StoredChannelEvent {
144    id: String,
145    name: String,
146    payload: serde_json::Value,
147    emitted_at: SignedTimestamp,
148    emitted_by: String,
149    scope: String,
150    scope_id: String,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pipeline_id: Option<String>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    session_id: Option<String>,
155    #[serde(skip_serializing_if = "Option::is_none")]
156    tenant_id: Option<String>,
157    retention: String,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    ttl_ms: Option<i64>,
160}
161
162/// CH-07 (#1878): durable replay-determinism receipt for a single
163/// `emit_channel(...)` call. Pairs 1:1 with the `ChannelEmit` span (CH-06
164/// / #1877) and with the durable journal append. Persisted to the
165/// `lifecycle.channel.audit` event-log topic so the `replay_oracle` can
166/// reproduce the entire emit chain across two runs of the same workload.
167///
168/// `payload_hash` lets the oracle detect producer-side drift
169/// (`HARN-REP-CHN-002`) without having to canonicalize the full payload
170/// during comparison; `payload` carries the verbatim value for byte
171/// equality + replay reconstruction.
172#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
173pub struct ChannelEmitReceipt {
174    pub event_id: String,
175    pub name_resolved: String,
176    pub scope: String,
177    pub scope_id: String,
178    pub payload_hash: String,
179    pub payload: serde_json::Value,
180    pub emitted_at: SignedTimestamp,
181    pub emitted_by: String,
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub pipeline_id: Option<String>,
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub session_id: Option<String>,
186    #[serde(skip_serializing_if = "Option::is_none")]
187    pub tenant_id: Option<String>,
188    pub topic: String,
189    pub inserted: bool,
190    #[serde(skip_serializing_if = "Option::is_none")]
191    pub span_id: Option<u64>,
192}
193
194/// CH-07 (#1878): durable replay-determinism receipt for a single channel
195/// match — one per `(binding, event)` pair the dispatcher fires. For
196/// aggregation/batched triggers (CH-04 / #1875) the receipt carries
197/// `batch.constituent_event_ids` so the replay oracle can verify the
198/// full batch composition matches across runs (`HARN-REP-CHN-003`).
199///
200/// `event_id` doubles as the cached-match key: on replay the dispatcher
201/// looks up the recorded match by `event_id` instead of re-evaluating
202/// the filter spec, preserving "the journal IS the spec" determinism.
203#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
204pub struct ChannelMatchReceipt {
205    pub event_id: String,
206    pub trigger_id: String,
207    pub binding_key: String,
208    pub name_resolved: String,
209    pub scope: String,
210    pub scope_id: String,
211    pub matched_at: SignedTimestamp,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub matched_in_session_id: Option<String>,
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub batch: Option<ChannelMatchBatchInfo>,
216    pub handler_kind: String,
217    pub handler_result: ChannelMatchResultSummary,
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub span_id: Option<u64>,
220}
221
222/// CH-07 (#1878): batched-dispatch summary stamped onto
223/// `ChannelMatchReceipt`. The `constituent_event_ids` list is the full
224/// recorded composition; replay reconstructs the batch from those ids.
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct ChannelMatchBatchInfo {
227    pub count: usize,
228    pub constituent_event_ids: Vec<String>,
229}
230
231/// CH-07 (#1878): summary of the handler invocation outcome. Mirrors the
232/// shape of the dispatcher's `DispatchOutcome` but kept lightweight so
233/// the receipt stays compact and self-contained — replay tooling never
234/// needs to cross-reference the dispatcher log to interpret a match.
235#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
236pub struct ChannelMatchResultSummary {
237    pub status: String,
238    pub attempt_count: u32,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub error: Option<String>,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub dispatch_failed: Option<bool>,
243}
244
245impl ChannelMatchResultSummary {
246    fn from_dispatch(
247        outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
248    ) -> Self {
249        match outcome {
250            Ok(outcome) => Self {
251                status: outcome.status.as_str().to_string(),
252                attempt_count: outcome.attempt_count,
253                error: outcome.error.clone(),
254                dispatch_failed: None,
255            },
256            Err(error) => Self {
257                status: "dispatch_error".to_string(),
258                attempt_count: 0,
259                error: Some(error.to_string()),
260                dispatch_failed: Some(true),
261            },
262        }
263    }
264}
265
266#[derive(Debug)]
267struct ChannelError(String);
268
269impl ChannelError {
270    fn missing_pipeline() -> Self {
271        Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
272    }
273
274    fn cross_tenant(message: impl Into<String>) -> Self {
275        Self(format!("HARN-CHN-002 {}", message.into()))
276    }
277
278    fn malformed(message: impl Into<String>) -> Self {
279        Self(message.into())
280    }
281
282    fn scope_ambiguous(message: impl Into<String>) -> Self {
283        Self(format!("HARN-CHN-004 {}", message.into()))
284    }
285}
286
287impl From<ChannelError> for VmError {
288    fn from(error: ChannelError) -> Self {
289        VmError::Runtime(error.0)
290    }
291}
292
293pub fn reset_channel_state() {
294    if let Some(slot) = SESSION_CHANNEL_LOG.get() {
295        *slot.lock().expect("channel session log poisoned") = None;
296    }
297    // CH-11 (#1911): clear the per-thread guardrail registry so the
298    // next pipeline starts with a clean slate. Mirrors how
299    // SESSION_CHANNEL_LOG is reset above.
300    crate::channel_guardrails::clear();
301}
302
303pub(crate) async fn emit_channel_from_vm(
304    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
305    args: Vec<VmValue>,
306) -> Result<VmValue, VmError> {
307    let name = required_string(args.first(), "emit_channel", "name")?;
308    let payload = vm_value_to_json(
309        args.get(1)
310            .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
311    );
312    let options = parse_options(args.get(2), "emit_channel")?;
313    let context = ChannelContext::current(ctx);
314    let resolved = resolve_channel(&name, &options, &context)?;
315    let event_id = options
316        .id
317        .clone()
318        .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
319    let emitted_by = emitted_by(&context);
320    let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
321    let occurred_at_ms = emitted_at.at_ms;
322
323    // CH-11 (#1911): channel guardrails middleware. Runs BEFORE the
324    // durable journal append so a blocked payload never gets persisted
325    // (the block itself IS persisted on the lifecycle.channel.audit
326    // topic so the audit trail is durable). Warn verdicts proceed but
327    // record an audit. The aggregate decision is the worst verdict
328    // across every registered guardrail.
329    let guardrail_context = serde_json::json!({
330        "name": name,
331        "name_resolved": resolved.resolved_name,
332        "scope": resolved.scope.as_str(),
333        "scope_id": resolved.scope_id,
334        "event_id": event_id,
335        "emitted_by": emitted_by,
336    });
337    let decision = crate::channel_guardrails::evaluate(
338        ctx,
339        &payload,
340        &guardrail_context,
341        &resolved.resolved_name,
342    )
343    .await?;
344    if matches!(
345        decision.verdict,
346        crate::channel_guardrails::Verdict::Block { .. }
347    ) {
348        return handle_blocked_emit(
349            &name,
350            &resolved,
351            &event_id,
352            &emitted_by,
353            &emitted_at,
354            &payload,
355            &decision,
356        )
357        .await;
358    }
359    record_guardrail_warnings(
360        &resolved,
361        &event_id,
362        &emitted_by,
363        &payload,
364        decision.fired.as_slice(),
365    )
366    .await;
367    let record = StoredChannelEvent {
368        id: event_id.clone(),
369        name: resolved.resolved_name.clone(),
370        payload,
371        emitted_at,
372        emitted_by: emitted_by.clone(),
373        scope: resolved.scope.as_str().to_string(),
374        scope_id: resolved.scope_id.clone(),
375        pipeline_id: context.pipeline_id_for_receipt(&resolved),
376        session_id: context.session_id_for_receipt(&resolved),
377        tenant_id: context.tenant_id_for_receipt(&resolved),
378        retention: resolved.retention.to_string(),
379        ttl_ms: options.ttl_ms,
380    };
381
382    // CH-06 (#1877): open the ChannelEmit span around the durable append +
383    // trigger fan-out. The span captures emit-time metadata (scope, name,
384    // payload summary) and its trace/span id is stashed on the trigger
385    // event headers so the downstream ChannelMatch span can link back.
386    let mut emit_span = ChannelSpanGuard::start(
387        crate::tracing::SpanKind::ChannelEmit,
388        format!("channel.emit {}", resolved.resolved_name),
389        Vec::new(),
390    );
391    emit_span.set_metadata("event_id", serde_json::json!(record.id));
392    emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
393    emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
394    emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
395    emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
396    let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
397    let emit_link = emit_span.link();
398
399    let mut headers = BTreeMap::new();
400    headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
401    headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
402    headers.insert(
403        SCOPE_HEADER.to_string(),
404        resolved.scope.as_str().to_string(),
405    );
406    headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
407    headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
408
409    let log = log_for_scope(resolved.scope);
410    let mut log_event = LogEvent::new(
411        CHANNEL_EVENT_KIND,
412        serde_json::to_value(&record)
413            .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
414    )
415    .with_headers(headers);
416    log_event.occurred_at_ms = occurred_at_ms;
417    let outcome = log
418        .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
419        .await
420        .map_err(channel_log_error)?;
421    let receipt = receipt_value(
422        &resolved.topic,
423        outcome.event_id,
424        &outcome.event,
425        outcome.inserted,
426    )?;
427    // CH-06 (#1877): emit the transcript event whether the append was fresh
428    // or idempotent — both outcomes are first-class observability signals.
429    emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
430    // CH-07 (#1878): persist the durable emit receipt on the
431    // `lifecycle.channel.audit` topic so the replay oracle has the full
432    // emit chain (payload hash, signed timestamp, scope correlation)
433    // independently of the lossy transcript summary topic.
434    record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
435    // CH-02 (#1872): fan out the emit to channel-source triggers. Only fresh
436    // appends fan out; idempotent duplicates short-circuit because the producer
437    // already saw the original delivery.
438    if outcome.inserted {
439        let payload_json = outcome
440            .event
441            .payload
442            .get("payload")
443            .cloned()
444            .unwrap_or(serde_json::Value::Null);
445        let context_for_fanout = ChannelContext::current(ctx);
446        let fanout_payload = ChannelEventPayload {
447            id: event_id.clone(),
448            name: parse_name(&name)
449                .map(|parsed| parsed.name)
450                .unwrap_or_else(|_| resolved.resolved_name.clone()),
451            name_resolved: resolved.resolved_name.clone(),
452            scope: resolved.scope.as_str().to_string(),
453            scope_id: resolved.scope_id.clone(),
454            payload: payload_json,
455            emitted_by: emitted_by.clone(),
456            tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
457            session_id: context_for_fanout.session_id_for_receipt(&resolved),
458            pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
459        };
460        dispatch_channel_emit_to_triggers(ctx, &resolved, fanout_payload, emit_link).await?;
461    }
462    emit_span.end();
463    Ok(crate::stdlib::json_to_vm_value(&receipt))
464}
465
466pub(crate) async fn channel_events_from_vm(
467    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
468    args: Vec<VmValue>,
469) -> Result<VmValue, VmError> {
470    let name = required_string(args.first(), "channel_events", "name")?;
471    let options = parse_options(args.get(1), "channel_events")?;
472    let context = ChannelContext::current(ctx);
473    let resolved = resolve_channel(&name, &options, &context)?;
474    let events = log_for_scope(resolved.scope)
475        .read_range(
476            &resolved.topic,
477            options.from_cursor,
478            options.limit.unwrap_or(usize::MAX),
479        )
480        .await
481        .map_err(channel_log_error)?;
482    let values = events
483        .into_iter()
484        .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
485        .collect::<Result<Vec<_>, _>>()?;
486    Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
487        values,
488    )))
489}
490
491pub(crate) async fn channel_subscribe_from_vm(
492    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
493    args: Vec<VmValue>,
494) -> Result<VmValue, VmError> {
495    let name = required_string(args.first(), "channel_subscribe", "name")?;
496    let options = parse_options(args.get(1), "channel_subscribe")?;
497    let context = ChannelContext::current(ctx);
498    let resolved = resolve_channel(&name, &options, &context)?;
499    let topic = resolved.topic.clone();
500    let mut events = log_for_scope(resolved.scope)
501        .subscribe(&topic, options.from_cursor)
502        .await
503        .map_err(channel_log_error)?;
504    let (tx, rx) = tokio::sync::mpsc::channel::<Result<VmValue, VmError>>(1);
505    tokio::task::spawn_local(async move {
506        while let Some(next) = events.next().await {
507            let value = match next {
508                Ok((event_id, event)) => event_value(&topic, event_id, event)
509                    .map(|value| crate::stdlib::json_to_vm_value(&value)),
510                Err(error) => Err(channel_log_error(error)),
511            };
512            if tx.send(value).await.is_err() {
513                return;
514            }
515        }
516    });
517    Ok(VmValue::stream(VmStream {
518        done: Arc::new(AtomicBool::new(false)),
519        receiver: Arc::new(tokio::sync::Mutex::new(rx)),
520        cancel: None,
521    }))
522}
523
524pub(crate) async fn channel_consumer_cursor_from_vm(
525    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
526    args: Vec<VmValue>,
527) -> Result<VmValue, VmError> {
528    let name = required_string(args.first(), "channel_consumer_cursor", "name")?;
529    let consumer = required_consumer_id(args.get(1), "channel_consumer_cursor")?;
530    let options = parse_options(args.get(2), "channel_consumer_cursor")?;
531    let context = ChannelContext::current(ctx);
532    let resolved = resolve_channel(&name, &options, &context)?;
533    let cursor = log_for_scope(resolved.scope)
534        .consumer_cursor(&resolved.topic, &consumer)
535        .await
536        .map_err(channel_log_error)?;
537    match cursor {
538        Some(event_id) => Ok(VmValue::Int(event_id_to_i64(
539            event_id,
540            "channel_consumer_cursor",
541        )?)),
542        None => Ok(VmValue::Nil),
543    }
544}
545
546pub(crate) async fn channel_ack_from_vm(
547    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
548    args: Vec<VmValue>,
549) -> Result<VmValue, VmError> {
550    let name = required_string(args.first(), "channel_ack", "name")?;
551    let consumer = required_consumer_id(args.get(1), "channel_ack")?;
552    let cursor = required_event_id(args.get(2), "channel_ack", "cursor")?;
553    let options = parse_options(args.get(3), "channel_ack")?;
554    let context = ChannelContext::current(ctx);
555    let resolved = resolve_channel(&name, &options, &context)?;
556    log_for_scope(resolved.scope)
557        .ack(&resolved.topic, &consumer, cursor)
558        .await
559        .map_err(channel_log_error)?;
560    Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
561        "name": name,
562        "name_resolved": resolved.resolved_name,
563        "scope": resolved.scope.as_str(),
564        "scope_id": resolved.scope_id,
565        "topic": resolved.topic.as_str(),
566        "consumer_id": consumer.as_str(),
567        "cursor": cursor,
568    })))
569}
570
571impl ChannelContext {
572    fn current(ctx: Option<&crate::vm::AsyncBuiltinCtx>) -> Self {
573        let mut context = Self::default();
574        if let Some(vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) {
575            context.task_id = Some(vm.runtime_context.task_id.clone());
576            context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
577            context.scope_id = vm.runtime_context.scope_id.clone();
578            if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
579                context.workflow_id = dict_string(&values, "workflow_id");
580                context.run_id = dict_string(&values, "run_id");
581                context.worker_id = dict_string(&values, "worker_id");
582                context.agent_session_id = dict_string(&values, "agent_session_id");
583                context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
584                context.tenant_id = dict_string(&values, "tenant_id");
585            }
586        }
587        context.agent_session_id = context
588            .agent_session_id
589            .or_else(crate::agent_sessions::current_session_id);
590        context
591    }
592
593    fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
594        // CH-03 (#1874): an explicit `options.session_id` that disagrees with the
595        // active session is HARN-CHN-004 ambiguity, not a silent override. The
596        // resolver MUST be deterministic for a given runtime context.
597        if let Some(requested) = options.session_id.as_deref() {
598            if let Some(active) = self.agent_session_id.as_deref() {
599                if active != requested {
600                    return Err(ChannelError::scope_ambiguous(format!(
601                        "session scope ambiguous: options.session_id '{requested}' \
602                         conflicts with active session '{active}'"
603                    )));
604                }
605            }
606        }
607        Ok(options
608            .session_id
609            .clone()
610            .or_else(|| self.agent_session_id.clone())
611            .or_else(|| self.root_agent_session_id.clone())
612            .or_else(|| self.scope_id.clone())
613            .or_else(|| self.root_task_id.clone())
614            .unwrap_or_else(|| "session".to_string()))
615    }
616
617    fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
618        // CH-03 (#1874): explicit `options.pipeline_id` that conflicts with the
619        // active workflow/run is HARN-CHN-004. Two pipelines cannot share a
620        // resolved channel without an explicit disambiguation.
621        let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
622        if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
623        {
624            if requested != active {
625                return Err(ChannelError::scope_ambiguous(format!(
626                    "pipeline scope ambiguous: options.pipeline_id '{requested}' \
627                     conflicts with active pipeline '{active}'"
628                )));
629            }
630        }
631        options
632            .pipeline_id
633            .clone()
634            .or(active)
635            .ok_or_else(ChannelError::missing_pipeline)
636    }
637
638    fn tenant_id(
639        &self,
640        options: &ChannelOptions,
641        requested: Option<&str>,
642    ) -> Result<String, ChannelError> {
643        let current = self.tenant_id.as_deref();
644        let requested = requested
645            .map(ToOwned::to_owned)
646            .or_else(|| options.tenant_id.clone());
647        if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
648            if current != requested {
649                return Err(ChannelError::cross_tenant(format!(
650                    "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
651                )));
652            }
653        }
654        Ok(requested
655            .or_else(|| self.tenant_id.clone())
656            .unwrap_or_else(|| "default".to_string()))
657    }
658
659    fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
660        match resolved.scope {
661            ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
662            _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
663        }
664    }
665
666    fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
667        match resolved.scope {
668            ChannelScope::Session => Some(resolved.scope_id.clone()),
669            _ => self
670                .agent_session_id
671                .clone()
672                .or_else(|| self.root_agent_session_id.clone()),
673        }
674    }
675
676    fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
677        match resolved.scope {
678            ChannelScope::Tenant => Some(resolved.scope_id.clone()),
679            _ => self.tenant_id.clone(),
680        }
681    }
682}
683
684fn resolve_channel(
685    raw_name: &str,
686    options: &ChannelOptions,
687    context: &ChannelContext,
688) -> Result<ResolvedChannel, ChannelError> {
689    let parsed = parse_name(raw_name)?;
690    if let Some(option_scope) = options.scope {
691        if let Some(prefix_scope) = parsed.scope {
692            if prefix_scope != option_scope {
693                return Err(ChannelError::malformed(format!(
694                    "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
695                    prefix_scope.as_str(),
696                    option_scope.as_str()
697                )));
698            }
699        }
700    }
701
702    let scope = parsed
703        .scope
704        .or(options.scope)
705        .unwrap_or(ChannelScope::Tenant);
706    if scope == ChannelScope::Org {
707        return Err(ChannelError::cross_tenant(
708            "org-scoped channels are disabled until org grants are available",
709        ));
710    }
711
712    validate_channel_name(&parsed.name)?;
713    let scope_id = match scope {
714        ChannelScope::Session => match parsed.scope_id.clone() {
715            Some(id) => id,
716            None => context.session_id(options)?,
717        },
718        ChannelScope::Pipeline => context.pipeline_id(options)?,
719        ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
720        ChannelScope::Org => unreachable!("org scope returned above"),
721    };
722    validate_scope_id(scope, &scope_id)?;
723    let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
724    let topic = Topic::new(format!(
725        "channels.{}.{}.{}",
726        scope.as_str(),
727        sanitize_topic_component(&scope_id),
728        sanitize_topic_component(&parsed.name)
729    ))
730    .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
731    Ok(ResolvedChannel {
732        scope,
733        scope_id,
734        resolved_name,
735        topic,
736        retention: retention_for_scope(scope),
737    })
738}
739
740#[derive(Clone, Debug)]
741struct ParsedName {
742    scope: Option<ChannelScope>,
743    scope_id: Option<String>,
744    name: String,
745}
746
747fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
748    let raw_name = raw_name.trim();
749    if raw_name.is_empty() {
750        return Err(ChannelError::malformed(
751            "HARN-CHN-003 channel name cannot be empty",
752        ));
753    }
754    let Some((prefix, rest)) = raw_name.split_once(':') else {
755        return Ok(ParsedName {
756            scope: None,
757            scope_id: None,
758            name: raw_name.to_string(),
759        });
760    };
761    let scope = ChannelScope::parse(prefix)?;
762    match scope {
763        ChannelScope::Session | ChannelScope::Pipeline => {
764            if rest.is_empty() || rest.contains(':') {
765                return Err(ChannelError::malformed(format!(
766                    "HARN-CHN-003 malformed {} channel name '{raw_name}'",
767                    scope.as_str()
768                )));
769            }
770            Ok(ParsedName {
771                scope: Some(scope),
772                scope_id: None,
773                name: rest.to_string(),
774            })
775        }
776        ChannelScope::Tenant => {
777            if rest.is_empty() {
778                return Err(ChannelError::malformed(
779                    "HARN-CHN-003 tenant channel name cannot be empty",
780                ));
781            }
782            let (scope_id, name) = match rest.split_once(':') {
783                Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
784                    (Some(tenant_id.to_string()), name.to_string())
785                }
786                Some(_) => {
787                    return Err(ChannelError::malformed(format!(
788                        "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
789                    )))
790                }
791                None => (None, rest.to_string()),
792            };
793            Ok(ParsedName {
794                scope: Some(scope),
795                scope_id,
796                name,
797            })
798        }
799        ChannelScope::Org => {
800            let Some((org_id, name)) = rest.split_once(':') else {
801                return Err(ChannelError::malformed(format!(
802                    "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
803                )));
804            };
805            if org_id.is_empty() || name.is_empty() {
806                return Err(ChannelError::malformed(format!(
807                    "HARN-CHN-003 malformed org channel name '{raw_name}'"
808                )));
809            }
810            Ok(ParsedName {
811                scope: Some(scope),
812                scope_id: Some(org_id.to_string()),
813                name: name.to_string(),
814            })
815        }
816    }
817}
818
819fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
820    if name.trim().is_empty()
821        || name.contains(':')
822        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
823    {
824        return Err(ChannelError::malformed(format!(
825            "HARN-CHN-003 malformed channel name '{name}'"
826        )));
827    }
828    Ok(())
829}
830
831fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
832    if scope_id.trim().is_empty()
833        || scope_id
834            .chars()
835            .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
836    {
837        return Err(ChannelError::malformed(format!(
838            "HARN-CHN-003 malformed {} scope id '{scope_id}'",
839            scope.as_str()
840        )));
841    }
842    Ok(())
843}
844
845fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
846    match scope {
847        ChannelScope::Session => {
848            let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
849            let mut guard = slot.lock().expect("channel session log poisoned");
850            guard
851                .get_or_insert_with(|| {
852                    Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
853                        CHANNEL_QUEUE_DEPTH,
854                    )))
855                })
856                .clone()
857        }
858        ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
859            .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
860        ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
861    }
862}
863
864fn signed_timestamp(
865    resolved: &ResolvedChannel,
866    event_id: &str,
867    emitted_by: &str,
868) -> SignedTimestamp {
869    let at = crate::clock_mock::now_utc();
870    let at_ms = harn_clock::offset_datetime_to_ms(at);
871    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
872    let material = format!(
873        "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
874        resolved.resolved_name,
875        resolved.scope.as_str(),
876        resolved.scope_id
877    );
878    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
879        signing_salt(),
880        material.as_bytes(),
881    ));
882    SignedTimestamp {
883        at_ms,
884        at: at_text,
885        algorithm: "hmac-sha256".to_string(),
886        key_id: "local-session".to_string(),
887        signature: format!("sha256:{signature}"),
888    }
889}
890
891fn signing_salt() -> &'static [u8] {
892    SIGNING_SALT
893        .get_or_init(|| {
894            format!(
895                "harn-channel-signing-salt:{}:{}",
896                std::process::id(),
897                uuid::Uuid::now_v7()
898            )
899            .into_bytes()
900        })
901        .as_slice()
902}
903
904/// CH-07 (#1878): signed timestamp for a channel match. Mirrors the emit
905/// timestamp algorithm in `signed_timestamp` so the replay oracle can
906/// verify match timestamps with the same `signing_salt()` key material.
907/// Including `event_id` + `trigger_id` in the signing material binds the
908/// timestamp to this specific (emit, binding) pair so a replayed match
909/// can't be re-stamped onto a different binding.
910fn signed_match_timestamp(
911    resolved: &ResolvedChannel,
912    event_id: &str,
913    trigger_id: &str,
914) -> SignedTimestamp {
915    let at = crate::clock_mock::now_utc();
916    let at_ms = harn_clock::offset_datetime_to_ms(at);
917    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
918    let material = format!(
919        "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
920        resolved.resolved_name,
921        resolved.scope.as_str(),
922        resolved.scope_id
923    );
924    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
925        signing_salt(),
926        material.as_bytes(),
927    ));
928    SignedTimestamp {
929        at_ms,
930        at: at_text,
931        algorithm: "hmac-sha256".to_string(),
932        key_id: "local-session".to_string(),
933        signature: format!("sha256:{signature}"),
934    }
935}
936
937/// CH-07 (#1878): SHA-256 of the canonical JSON encoding of a channel
938/// emit payload. `serde_json::to_string` sorts object keys
939/// deterministically when the value originates from `serde_json::Value`
940/// (BTreeMap-backed `Map` with `preserve_order` disabled — the default
941/// for this crate). Used by the replay oracle to detect producer-side
942/// drift (`HARN-REP-CHN-002`).
943pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
944    let canonical = canonical_json_string(payload);
945    let digest = Sha256::digest(canonical.as_bytes());
946    format!("sha256:{}", hex::encode(digest))
947}
948
949/// CH-07 (#1878): canonicalize a JSON value to a deterministic string so
950/// the SHA-256 hash on `ChannelEmitReceipt.payload_hash` is stable
951/// across reruns. Recursively sorts object keys; arrays preserve their
952/// (semantically meaningful) order. Mirrors the canonicalization rule
953/// `replay_oracle::canonicalize_run` relies on for cross-run diffs.
954fn canonical_json_string(value: &serde_json::Value) -> String {
955    match value {
956        serde_json::Value::Object(map) => {
957            let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
958                std::collections::BTreeMap::new();
959            for (key, value) in map {
960                sorted.insert(key, value);
961            }
962            let parts: Vec<String> = sorted
963                .iter()
964                .map(|(key, value)| {
965                    format!(
966                        "{}:{}",
967                        serde_json::to_string(key).unwrap_or_else(|_| (*key).clone()),
968                        canonical_json_string(value)
969                    )
970                })
971                .collect();
972            format!("{{{}}}", parts.join(","))
973        }
974        serde_json::Value::Array(items) => {
975            let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
976            format!("[{}]", parts.join(","))
977        }
978        other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
979    }
980}
981
982/// CH-07 (#1878): append a channel audit receipt to the durable
983/// `lifecycle.channel.audit` topic. Mirrors `emit_pool_*_receipt`
984/// (PL-06 / #1891). The audit log uses `active_event_log()` so receipts
985/// inherit the pipeline's durability (in-memory in tests; durable
986/// SQLite/etc. in production). Best-effort: receipt append errors do not
987/// fail the emit/match — the emit's user-visible receipt is the source
988/// of truth for caller behavior. Audit consumers learn about gaps via
989/// the canonical event-log read API.
990async fn append_channel_audit_event(
991    kind: &'static str,
992    schema: &'static str,
993    payload: serde_json::Value,
994) {
995    let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
996        Ok(topic) => topic,
997        Err(_) => return,
998    };
999    let log = active_event_log()
1000        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1001    let mut headers = BTreeMap::new();
1002    headers.insert("schema".to_string(), schema.to_string());
1003    let _ = log
1004        .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
1005        .await;
1006}
1007
1008/// CH-11 (#1911): emit a guardrail-blocked / -warned audit entry to
1009/// the durable channel-audit topic AND to the in-process lifecycle
1010/// audit log. The lifecycle entry is what `pipeline_lifecycle_audit_log_take()`
1011/// surfaces to test fixtures; the event-log entry is what production
1012/// audit consumers tail. Both echo the verbatim payload (callers
1013/// concerned about PII should layer `redact::*` on top of the
1014/// guardrail). Best-effort; audit write errors do not propagate.
1015async fn record_guardrail_audit(
1016    kind: &'static str,
1017    resolved: &ResolvedChannel,
1018    event_id: &str,
1019    emitted_by: &str,
1020    payload: &serde_json::Value,
1021    fired: &[crate::channel_guardrails::FiredGuardrail],
1022) {
1023    let fired_json: Vec<serde_json::Value> = fired
1024        .iter()
1025        .map(|entry| {
1026            serde_json::json!({
1027                "id": entry.id,
1028                "kind": entry.kind,
1029                "verdict_label": entry.verdict_label,
1030                "reason": entry.reason,
1031            })
1032        })
1033        .collect();
1034    let audit_payload = serde_json::json!({
1035        "event_id": event_id,
1036        "name_resolved": resolved.resolved_name,
1037        "scope": resolved.scope.as_str(),
1038        "scope_id": resolved.scope_id,
1039        "emitted_by": emitted_by,
1040        "payload_hash": channel_payload_hash(payload),
1041        "payload": payload,
1042        "fired": fired_json,
1043    });
1044    append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
1045    // Mirror to the lifecycle audit log so tests using
1046    // `pipeline_lifecycle_audit_log_take()` see the entry without
1047    // having to scan the event log directly.
1048    crate::orchestration::record_lifecycle_audit(kind, audit_payload);
1049}
1050
1051/// CH-11 (#1911): record a warning audit for every Warn verdict that
1052/// fired. A no-op when there were no Warn-level fires (the common
1053/// case). Called BEFORE the durable append so the warning order
1054/// matches the dispatch order.
1055async fn record_guardrail_warnings(
1056    resolved: &ResolvedChannel,
1057    event_id: &str,
1058    emitted_by: &str,
1059    payload: &serde_json::Value,
1060    fired: &[crate::channel_guardrails::FiredGuardrail],
1061) {
1062    if fired.is_empty() {
1063        return;
1064    }
1065    record_guardrail_audit(
1066        CHANNEL_GUARDRAIL_WARNING_KIND,
1067        resolved,
1068        event_id,
1069        emitted_by,
1070        payload,
1071        fired,
1072    )
1073    .await;
1074}
1075
1076/// CH-11 (#1911): record a `channel_guardrail_blocked` audit and
1077/// return the synthetic "blocked" receipt so the caller can
1078/// distinguish a guardrail block from a successful append or an
1079/// idempotent dedupe. No durable journal append happens for a blocked
1080/// emit; the audit log entry IS the durable artifact.
1081async fn handle_blocked_emit(
1082    raw_name: &str,
1083    resolved: &ResolvedChannel,
1084    event_id: &str,
1085    emitted_by: &str,
1086    emitted_at: &SignedTimestamp,
1087    payload: &serde_json::Value,
1088    decision: &crate::channel_guardrails::GuardrailDecision,
1089) -> Result<VmValue, VmError> {
1090    record_guardrail_audit(
1091        CHANNEL_GUARDRAIL_BLOCKED_KIND,
1092        resolved,
1093        event_id,
1094        emitted_by,
1095        payload,
1096        decision.fired.as_slice(),
1097    )
1098    .await;
1099    let block_reason = decision
1100        .fired
1101        .iter()
1102        .rev()
1103        .find_map(|f| {
1104            if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1105                || f.verdict_label.contains("block")
1106            {
1107                Some(f.reason.clone())
1108            } else {
1109                None
1110            }
1111        })
1112        .unwrap_or_else(|| "guardrail blocked".to_string());
1113    let fired_json: Vec<serde_json::Value> = decision
1114        .fired
1115        .iter()
1116        .map(|entry| {
1117            serde_json::json!({
1118                "id": entry.id,
1119                "kind": entry.kind,
1120                "verdict_label": entry.verdict_label,
1121                "reason": entry.reason,
1122            })
1123        })
1124        .collect();
1125    let receipt = serde_json::json!({
1126        "event_id": event_id,
1127        "cursor": serde_json::Value::Null,
1128        "id": event_id,
1129        "name": raw_name,
1130        "name_resolved": resolved.resolved_name,
1131        "scope": resolved.scope.as_str(),
1132        "scope_id": resolved.scope_id,
1133        "emitted_at": emitted_at,
1134        "emitted_by": emitted_by,
1135        "retention": resolved.retention,
1136        "topic": resolved.topic.as_str(),
1137        "inserted": false,
1138        "duplicate": false,
1139        "blocked": true,
1140        "block_reason": block_reason,
1141        "guardrail_fired": fired_json,
1142    });
1143    Ok(crate::stdlib::json_to_vm_value(&receipt))
1144}
1145
1146/// CH-07 (#1878): build + persist the emit receipt on the audit topic.
1147/// Called from `emit_channel_from_vm` immediately after the durable
1148/// journal append succeeds (whether the append was fresh or
1149/// idempotent-suppressed — both outcomes are first-class audit signals).
1150async fn record_channel_emit_receipt(
1151    record: &StoredChannelEvent,
1152    resolved: &ResolvedChannel,
1153    inserted: bool,
1154    span_id: u64,
1155) {
1156    let receipt = ChannelEmitReceipt {
1157        event_id: record.id.clone(),
1158        name_resolved: resolved.resolved_name.clone(),
1159        scope: resolved.scope.as_str().to_string(),
1160        scope_id: resolved.scope_id.clone(),
1161        payload_hash: channel_payload_hash(&record.payload),
1162        payload: record.payload.clone(),
1163        emitted_at: record.emitted_at.clone(),
1164        emitted_by: record.emitted_by.clone(),
1165        pipeline_id: record.pipeline_id.clone(),
1166        session_id: record.session_id.clone(),
1167        tenant_id: record.tenant_id.clone(),
1168        topic: resolved.topic.as_str().to_string(),
1169        inserted,
1170        span_id: if span_id == 0 { None } else { Some(span_id) },
1171    };
1172    let payload = match serde_json::to_value(&receipt) {
1173        Ok(value) => value,
1174        Err(_) => return,
1175    };
1176    append_channel_audit_event(
1177        CHANNEL_EMIT_RECEIPT_KIND,
1178        CHANNEL_EMIT_RECEIPT_SCHEMA,
1179        payload,
1180    )
1181    .await;
1182}
1183
1184/// CH-07 (#1878): build + persist the match receipt on the audit topic.
1185/// Called from `fire_channel_match` after the dispatcher returns, so
1186/// `handler_result` reflects the recorded outcome (succeeded / failed /
1187/// dlq / cancelled / ...) and replay tooling can verify the same handler
1188/// shape fires on every replay.
1189#[allow(clippy::too_many_arguments)]
1190async fn record_channel_match_receipt(
1191    trigger_id: &str,
1192    binding_key: &str,
1193    handler_kind: &str,
1194    resolved: &ResolvedChannel,
1195    event_id: &str,
1196    matched_in_session_id: Option<&str>,
1197    batch: Option<ChannelMatchBatchInfo>,
1198    span_id: u64,
1199    dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1200) {
1201    let receipt = ChannelMatchReceipt {
1202        event_id: event_id.to_string(),
1203        trigger_id: trigger_id.to_string(),
1204        binding_key: binding_key.to_string(),
1205        name_resolved: resolved.resolved_name.clone(),
1206        scope: resolved.scope.as_str().to_string(),
1207        scope_id: resolved.scope_id.clone(),
1208        matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1209        matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1210        batch,
1211        handler_kind: handler_kind.to_string(),
1212        handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1213        span_id: if span_id == 0 { None } else { Some(span_id) },
1214    };
1215    let payload = match serde_json::to_value(&receipt) {
1216        Ok(value) => value,
1217        Err(_) => return,
1218    };
1219    append_channel_audit_event(
1220        CHANNEL_MATCH_RECEIPT_KIND,
1221        CHANNEL_MATCH_RECEIPT_SCHEMA,
1222        payload,
1223    )
1224    .await;
1225}
1226
1227/// CH-07 (#1878): extract `ChannelMatchBatchInfo` from the transcript
1228/// `batch_summary` JSON the dispatcher already builds for batched
1229/// triggers. Returns `None` for non-batched dispatch.
1230fn batch_info_from_summary(
1231    batch_summary: Option<&serde_json::Value>,
1232) -> Option<ChannelMatchBatchInfo> {
1233    let summary = batch_summary?.as_object()?;
1234    let count = summary
1235        .get("count")
1236        .and_then(|v| v.as_u64())
1237        .map(|n| n as usize)?;
1238    let constituent_event_ids = summary
1239        .get("constituent_event_ids")
1240        .and_then(|v| v.as_array())
1241        .map(|arr| {
1242            arr.iter()
1243                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1244                .collect()
1245        })
1246        .unwrap_or_default();
1247    Some(ChannelMatchBatchInfo {
1248        count,
1249        constituent_event_ids,
1250    })
1251}
1252
1253fn emitted_by(context: &ChannelContext) -> String {
1254    context
1255        .worker_id
1256        .clone()
1257        .or_else(|| context.agent_session_id.clone())
1258        .or_else(|| context.task_id.clone())
1259        .unwrap_or_else(|| "harn".to_string())
1260}
1261
1262fn retention_for_scope(scope: ChannelScope) -> &'static str {
1263    match scope {
1264        ChannelScope::Session => "in_process_session",
1265        ChannelScope::Pipeline => "pipeline_event_log",
1266        ChannelScope::Tenant => "tenant_event_log",
1267        ChannelScope::Org => "org_event_log",
1268    }
1269}
1270
1271fn receipt_value(
1272    topic: &Topic,
1273    event_id: EventId,
1274    event: &LogEvent,
1275    inserted: bool,
1276) -> Result<serde_json::Value, VmError> {
1277    let record = stored_record(event)?;
1278    Ok(serde_json::json!({
1279        "event_id": event_id,
1280        "cursor": event_id,
1281        "id": record.id,
1282        "name": record.name,
1283        "name_resolved": record.name,
1284        "scope": record.scope,
1285        "scope_id": record.scope_id,
1286        "payload": record.payload,
1287        "emitted_at": record.emitted_at,
1288        "emitted_by": record.emitted_by,
1289        "pipeline_id": record.pipeline_id,
1290        "session_id": record.session_id,
1291        "tenant_id": record.tenant_id,
1292        "retention": record.retention,
1293        "ttl_ms": record.ttl_ms,
1294        "topic": topic.as_str(),
1295        "inserted": inserted,
1296        "duplicate": !inserted,
1297    }))
1298}
1299
1300fn event_value(
1301    topic: &Topic,
1302    event_id: EventId,
1303    event: LogEvent,
1304) -> Result<serde_json::Value, VmError> {
1305    let record = stored_record(&event)?;
1306    Ok(serde_json::json!({
1307        "event_id": event_id,
1308        "cursor": event_id,
1309        "topic": topic.as_str(),
1310        "kind": event.kind,
1311        "headers": event.headers,
1312        "occurred_at_ms": event.occurred_at_ms,
1313        "id": record.id,
1314        "name": record.name,
1315        "name_resolved": record.name,
1316        "scope": record.scope,
1317        "scope_id": record.scope_id,
1318        "payload": record.payload,
1319        "emitted_at": record.emitted_at,
1320        "emitted_by": record.emitted_by,
1321        "pipeline_id": record.pipeline_id,
1322        "session_id": record.session_id,
1323        "tenant_id": record.tenant_id,
1324        "retention": record.retention,
1325        "ttl_ms": record.ttl_ms,
1326    }))
1327}
1328
1329fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1330    serde_json::from_value(event.payload.clone()).map_err(|error| {
1331        VmError::Runtime(format!(
1332            "channel event store contained malformed channel payload: {error}"
1333        ))
1334    })
1335}
1336
1337fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1338    let Some(value) = value else {
1339        return Ok(ChannelOptions::default());
1340    };
1341    match value {
1342        VmValue::Nil => Ok(ChannelOptions::default()),
1343        VmValue::Dict(options) => Ok(ChannelOptions {
1344            scope: option_string(options, "scope", builtin)?
1345                .map(|scope| ChannelScope::parse(&scope))
1346                .transpose()
1347                .map_err(VmError::from)?,
1348            id: option_string(options, "id", builtin)?,
1349            tenant_id: option_string(options, "tenant_id", builtin)?,
1350            session_id: option_string(options, "session_id", builtin)?,
1351            pipeline_id: option_string(options, "pipeline_id", builtin)?,
1352            from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1353                .or(option_non_negative_int(options, "cursor", builtin)?)
1354                .map(|value| value as EventId),
1355            limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1356            ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1357        }),
1358        other => Err(VmError::TypeError(format!(
1359            "{builtin}: options must be a dict or nil, got {}",
1360            other.type_name()
1361        ))),
1362    }
1363}
1364
1365fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1366    match value {
1367        Some(VmValue::String(value)) => Ok(value.to_string()),
1368        Some(other) => Err(VmError::TypeError(format!(
1369            "{builtin}: {name} must be a string, got {}",
1370            other.type_name()
1371        ))),
1372        None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1373    }
1374}
1375
1376fn required_consumer_id(value: Option<&VmValue>, builtin: &str) -> Result<ConsumerId, VmError> {
1377    ConsumerId::new(required_string(value, builtin, "consumer_id")?).map_err(channel_log_error)
1378}
1379
1380fn required_event_id(
1381    value: Option<&VmValue>,
1382    builtin: &str,
1383    name: &str,
1384) -> Result<EventId, VmError> {
1385    match value {
1386        Some(VmValue::Int(value)) if *value >= 0 => Ok(*value as EventId),
1387        Some(other) => Err(VmError::TypeError(format!(
1388            "{builtin}: {name} must be a non-negative int, got {}",
1389            other.type_name()
1390        ))),
1391        None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1392    }
1393}
1394
1395fn event_id_to_i64(value: EventId, builtin: &str) -> Result<i64, VmError> {
1396    i64::try_from(value)
1397        .map_err(|_| VmError::Runtime(format!("{builtin}: event id {value} exceeds int range")))
1398}
1399
1400fn option_string(
1401    options: &crate::value::DictMap,
1402    key: &str,
1403    builtin: &str,
1404) -> Result<Option<String>, VmError> {
1405    match options.get(key) {
1406        None | Some(VmValue::Nil) => Ok(None),
1407        Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1408        Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1409            "{builtin}: options.{key} cannot be empty"
1410        ))),
1411        Some(other) => Err(VmError::TypeError(format!(
1412            "{builtin}: options.{key} must be a string or nil, got {}",
1413            other.type_name()
1414        ))),
1415    }
1416}
1417
1418fn option_non_negative_int(
1419    options: &crate::value::DictMap,
1420    key: &str,
1421    builtin: &str,
1422) -> Result<Option<u64>, VmError> {
1423    match options.get(key) {
1424        None | Some(VmValue::Nil) => Ok(None),
1425        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1426        Some(other) => Err(VmError::TypeError(format!(
1427            "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1428            other.type_name()
1429        ))),
1430    }
1431}
1432
1433fn option_duration_ms(
1434    options: &crate::value::DictMap,
1435    key: &str,
1436    builtin: &str,
1437) -> Result<Option<i64>, VmError> {
1438    match options.get(key) {
1439        None | Some(VmValue::Nil) => Ok(None),
1440        Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1441        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1442        Some(other) => Err(VmError::TypeError(format!(
1443            "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1444            other.type_name()
1445        ))),
1446    }
1447}
1448
1449fn dict_string(values: &crate::value::DictMap, key: &str) -> Option<String> {
1450    match values.get(key) {
1451        Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1452        _ => None,
1453    }
1454}
1455
1456fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1457    VmError::Runtime(format!("channel event log: {error}"))
1458}
1459
1460/// CH-06 (#1877): RAII guard around channel emit/match tracing spans.
1461///
1462/// Mirrors `PoolSpanGuard` (PL-06 / #1891): opens both a thread-local
1463/// Harn span (visible to `trace_spans()`) and an OTel `tracing::Span`
1464/// (visible to the exporter), wires OTel span links via
1465/// `crate::observability::otel::set_span_link`, and closes them both on
1466/// `end()` / `Drop`. Disabled-tracing path is a no-op because
1467/// `crate::tracing::span_start_*` returns id 0 and short-circuits.
1468struct ChannelSpanGuard {
1469    span_id: u64,
1470    otel_span: tracing::Span,
1471}
1472
1473impl ChannelSpanGuard {
1474    fn start(
1475        kind: crate::tracing::SpanKind,
1476        name: String,
1477        links: Vec<crate::tracing::SpanLink>,
1478    ) -> Self {
1479        Self::start_with_parenting(kind, name, links, true)
1480    }
1481
1482    fn start_detached(
1483        kind: crate::tracing::SpanKind,
1484        name: String,
1485        links: Vec<crate::tracing::SpanLink>,
1486    ) -> Self {
1487        Self::start_with_parenting(kind, name, links, false)
1488    }
1489
1490    fn start_with_parenting(
1491        kind: crate::tracing::SpanKind,
1492        name: String,
1493        links: Vec<crate::tracing::SpanLink>,
1494        inherit_parent: bool,
1495    ) -> Self {
1496        let span_id = if inherit_parent {
1497            crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1498        } else {
1499            crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1500        };
1501        let otel_span = tracing::info_span!(
1502            target: "harn.vm.channel",
1503            "harn.channel",
1504            harn.kind = kind.as_str(),
1505            harn.name = %name,
1506        );
1507        for link in links {
1508            let trace_id = crate::TraceId(link.trace_id);
1509            let mut attributes: std::collections::HashMap<String, String> =
1510                link.attributes.into_iter().collect();
1511            attributes
1512                .entry("harn.link.kind".to_string())
1513                .or_insert_with(|| "channel_emit".to_string());
1514            let _ = crate::observability::otel::set_span_link(
1515                &otel_span,
1516                &trace_id,
1517                &link.span_id,
1518                Some(attributes),
1519            );
1520        }
1521        Self { span_id, otel_span }
1522    }
1523
1524    fn link(&self) -> Option<crate::tracing::SpanLink> {
1525        crate::observability::otel::current_span_context_hex(&self.otel_span)
1526            .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1527            .or_else(|| crate::tracing::span_link(self.span_id))
1528    }
1529
1530    fn set_metadata(&self, key: &str, value: serde_json::Value) {
1531        crate::tracing::span_set_metadata(self.span_id, key, value);
1532    }
1533
1534    fn end(&mut self) {
1535        if self.span_id != 0 {
1536            crate::tracing::span_end(self.span_id);
1537            self.span_id = 0;
1538        }
1539    }
1540}
1541
1542impl Drop for ChannelSpanGuard {
1543    fn drop(&mut self) {
1544        self.end();
1545    }
1546}
1547
1548/// CH-06 (#1877): summarize an emit payload for the transcript event so
1549/// downstream renderers / audit feeds don't have to ingest the full
1550/// payload. Numbers/bools render verbatim; strings are truncated; dicts
1551/// and lists collapse to their top-level field count. Keeps the
1552/// transcript log compact while preserving enough context for human
1553/// inspection.
1554fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1555    const MAX_STRING_LEN: usize = 120;
1556    match payload {
1557        serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1558        serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1559        serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1560        serde_json::Value::String(value) => {
1561            let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1562            let len = value.chars().count();
1563            serde_json::json!({
1564                "kind": "string",
1565                "value": truncated,
1566                "truncated": len > MAX_STRING_LEN,
1567                "length": len,
1568            })
1569        }
1570        serde_json::Value::Array(items) => {
1571            serde_json::json!({"kind": "array", "length": items.len()})
1572        }
1573        serde_json::Value::Object(map) => {
1574            let fields: Vec<&String> = map.keys().take(8).collect();
1575            serde_json::json!({
1576                "kind": "object",
1577                "field_count": map.len(),
1578                "fields": fields,
1579            })
1580        }
1581    }
1582}
1583
1584/// CH-06 (#1877): append a channel transcript lifecycle event onto the
1585/// active event log. No-op when no log is installed (e.g. unit tests
1586/// running outside a VM context) so emission stays infallible from the
1587/// caller's perspective.
1588fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1589    let Some(log) = active_event_log() else {
1590        return;
1591    };
1592    let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1593        return;
1594    };
1595    let event = LogEvent::new(kind, payload);
1596    if tokio::runtime::Handle::try_current().is_ok() {
1597        if let Ok(join) = std::thread::Builder::new()
1598            .name("harn-channel-transcript".to_string())
1599            .spawn(move || {
1600                let _ = futures::executor::block_on(log.append(&topic, event));
1601            })
1602        {
1603            let _ = join.join();
1604        }
1605    } else {
1606        let _ = futures::executor::block_on(log.append(&topic, event));
1607    }
1608}
1609
1610/// CH-06 (#1877): emit the `transcript.channel.emit` lifecycle event the
1611/// moment the durable append succeeds (whether the append was fresh or
1612/// idempotent). Carries the emit span id when tracing is on so the
1613/// transcript log can be stitched against the OTel trace.
1614fn emit_channel_emit_transcript(
1615    record: &StoredChannelEvent,
1616    resolved: &ResolvedChannel,
1617    inserted: bool,
1618    span_id: u64,
1619) {
1620    let payload = serde_json::json!({
1621        "event_id": record.id,
1622        "name": record.name,
1623        "name_resolved": resolved.resolved_name,
1624        "scope": record.scope,
1625        "scope_id": record.scope_id,
1626        "payload_summary": summarize_payload(&record.payload),
1627        "emitted_at": record.emitted_at,
1628        "emitted_at_ms": record.emitted_at.at_ms,
1629        "emitted_by": record.emitted_by,
1630        "session_id": record.session_id,
1631        "pipeline_id": record.pipeline_id,
1632        "tenant_id": record.tenant_id,
1633        "inserted": inserted,
1634        "duplicate": !inserted,
1635        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1636    });
1637    emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1638}
1639
1640/// CH-06 (#1877): emit the `transcript.channel.match` lifecycle event
1641/// just before the dispatcher invokes the handler. Carries the match
1642/// span id and, for batched triggers, the constituent event ids so the
1643/// transcript can render the full batch context inline.
1644#[allow(clippy::too_many_arguments)]
1645fn emit_channel_match_transcript(
1646    trigger_id: &str,
1647    handler_kind: &str,
1648    resolved: &ResolvedChannel,
1649    event_id: &str,
1650    matched_at_ms: i64,
1651    matched_in_session_id: Option<&str>,
1652    span_id: u64,
1653    batch: Option<serde_json::Value>,
1654) {
1655    let mut payload = serde_json::json!({
1656        "event_id": event_id,
1657        "name_resolved": resolved.resolved_name,
1658        "scope": resolved.scope.as_str(),
1659        "scope_id": resolved.scope_id,
1660        "trigger_id": trigger_id,
1661        "handler_kind": handler_kind,
1662        "matched_at_ms": matched_at_ms,
1663        "matched_in_session_id": matched_in_session_id,
1664        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1665    });
1666    if let Some(batch) = batch {
1667        if let Some(map) = payload.as_object_mut() {
1668            map.insert("batch".to_string(), batch);
1669        }
1670    }
1671    emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1672}
1673
1674/// CH-06 (#1877): collect the originating-emit span links stashed on a
1675/// batch of `TriggerEvent`s. The non-batched dispatch path uses the
1676/// single-event variant directly; the aggregated/batched path threads
1677/// the per-event headers through the buffer and rebuilds the link list
1678/// here so the resulting `ChannelMatch` span multi-links to every
1679/// constituent emit.
1680fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1681    let mut links = Vec::new();
1682    if let (Some(trace_id), Some(span_id)) = (
1683        event.headers.get(EMIT_TRACE_ID_HEADER),
1684        event.headers.get(EMIT_SPAN_ID_HEADER),
1685    ) {
1686        links.push(
1687            crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1688                BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1689            ),
1690        );
1691    }
1692    links
1693}
1694
1695fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1696    let mut links = Vec::new();
1697    for event in events {
1698        links.extend(emit_links_from_event(event));
1699    }
1700    links
1701}
1702
1703fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1704    let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1705    serde_json::json!({
1706        "count": events.len(),
1707        "constituent_event_ids": constituent_ids,
1708    })
1709}
1710
1711/// Parsed channel-source trigger selector.
1712///
1713/// Trigger DSL strings look like `channel:<scope>:<scope-id>:<name>` with
1714/// shorthand forms for tenant-default and session/pipeline scopes. The
1715/// `scope_id_pattern` is `None` for "current" (e.g. `channel:foo` against
1716/// the trigger's tenant) or `Some("*")` for an explicit wildcard
1717/// (`channel:tenant:*:foo`).
1718#[derive(Clone, Debug, PartialEq, Eq)]
1719pub struct ChannelSelector {
1720    scope: ChannelScope,
1721    scope_id_pattern: ScopeIdPattern,
1722    name: String,
1723}
1724
1725#[derive(Clone, Debug, PartialEq, Eq)]
1726enum ScopeIdPattern {
1727    /// Match the current tenant/session/pipeline of the trigger registry
1728    /// (no explicit scope id supplied in the selector string).
1729    Current,
1730    /// Explicit scope id from the selector string.
1731    Exact(String),
1732    /// Wildcard, e.g. `tenant:*:foo` — match any scope id within the
1733    /// trigger's entitled boundary (today: the current tenant only).
1734    Wildcard,
1735}
1736
1737impl ChannelSelector {
1738    /// Parse a `channel:...` trigger source string.
1739    ///
1740    /// Accepted shapes:
1741    /// - `channel:<name>` — tenant scope (current tenant), exact `<name>`
1742    /// - `channel:session:<name>` — session scope, exact `<name>`
1743    /// - `channel:pipeline:<name>` — pipeline scope, exact `<name>`
1744    /// - `channel:tenant:<tenant-id>:<name>` — explicit tenant
1745    /// - `channel:tenant:*:<name>` — tenant wildcard (within entitlement)
1746    /// - `channel:org:<org-id>:<name>` — explicit org (currently disabled)
1747    pub fn parse(input: &str) -> Result<Self, String> {
1748        let input = input.trim();
1749        let rest = input
1750            .strip_prefix("channel:")
1751            .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1752        if rest.is_empty() {
1753            return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1754        }
1755
1756        let (head, tail_opt) = match rest.split_once(':') {
1757            Some((head, tail)) => (head, Some(tail)),
1758            None => (rest, None),
1759        };
1760        let parsed_scope = ChannelScope::parse(head).ok();
1761        match (parsed_scope, tail_opt) {
1762            // `channel:<name>` — tenant default.
1763            (None, _) => {
1764                let name = rest.to_string();
1765                validate_selector_name(&name)?;
1766                Ok(Self {
1767                    scope: ChannelScope::Tenant,
1768                    scope_id_pattern: ScopeIdPattern::Current,
1769                    name,
1770                })
1771            }
1772            (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1773                if !name.is_empty() =>
1774            {
1775                if name.contains(':') {
1776                    return Err(format!(
1777                        "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1778                        scope.as_str()
1779                    ));
1780                }
1781                validate_selector_name(name)?;
1782                Ok(Self {
1783                    scope,
1784                    scope_id_pattern: ScopeIdPattern::Current,
1785                    name: name.to_string(),
1786                })
1787            }
1788            (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1789                if !tail.is_empty() =>
1790            {
1791                let Some((scope_id, name)) = tail.split_once(':') else {
1792                    // `channel:tenant:foo` — treat as `<name>` in tenant default.
1793                    if matches!(scope, ChannelScope::Tenant) {
1794                        validate_selector_name(tail)?;
1795                        return Ok(Self {
1796                            scope,
1797                            scope_id_pattern: ScopeIdPattern::Current,
1798                            name: tail.to_string(),
1799                        });
1800                    }
1801                    return Err(format!(
1802                        "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1803                    ));
1804                };
1805                if scope_id.is_empty() || name.is_empty() {
1806                    return Err(format!(
1807                        "channel selector `{input}`: scope id and name must be non-empty"
1808                    ));
1809                }
1810                validate_selector_name(name)?;
1811                let pattern = if scope_id == "*" {
1812                    ScopeIdPattern::Wildcard
1813                } else {
1814                    ScopeIdPattern::Exact(scope_id.to_string())
1815                };
1816                Ok(Self {
1817                    scope,
1818                    scope_id_pattern: pattern,
1819                    name: name.to_string(),
1820                })
1821            }
1822            (Some(scope), _) => Err(format!(
1823                "channel selector `{input}`: {} scope requires `<name>` segment",
1824                scope.as_str()
1825            )),
1826        }
1827    }
1828
1829    pub fn scope(&self) -> &'static str {
1830        self.scope.as_str()
1831    }
1832
1833    pub fn name(&self) -> &str {
1834        &self.name
1835    }
1836
1837    /// Returns true if the supplied emit (scope, scope_id, name) matches this
1838    /// selector. `current_tenant` lets the matcher resolve the implicit
1839    /// "current tenant" boundary used by both `Current` and `Wildcard` modes.
1840    pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1841        if self.scope.as_str() != scope || self.name != name {
1842            return false;
1843        }
1844        match &self.scope_id_pattern {
1845            ScopeIdPattern::Current => match self.scope {
1846                ChannelScope::Tenant => scope_id == current_tenant,
1847                ChannelScope::Session | ChannelScope::Pipeline => {
1848                    // For session/pipeline, "current" means trigger and emit
1849                    // share a runtime context. In v1 (in-process registry)
1850                    // this is implicit: both producer and consumer run in the
1851                    // same VM, so any scope_id within this scope type matches.
1852                    true
1853                }
1854                ChannelScope::Org => false,
1855            },
1856            ScopeIdPattern::Exact(value) => scope_id == value,
1857            ScopeIdPattern::Wildcard => match self.scope {
1858                ChannelScope::Tenant => true,
1859                // Session/pipeline wildcards aren't entitled yet; org wildcards disabled.
1860                _ => false,
1861            },
1862        }
1863    }
1864}
1865
1866fn validate_selector_name(name: &str) -> Result<(), String> {
1867    if name.trim().is_empty()
1868        || name.contains(':')
1869        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1870    {
1871        return Err(format!("channel selector name `{name}` is malformed"));
1872    }
1873    Ok(())
1874}
1875
1876/// Dispatch a freshly emitted channel event to any registered triggers whose
1877/// `channel:` source selector matches the (scope, scope_id, name) tuple.
1878///
1879/// This is the consumer side of the `emit_channel` ↔ trigger plumbing
1880/// (CH-02 / #1872). Errors from individual handlers do not abort the
1881/// emit; they surface in the dispatcher's DLQ + retry pipeline.
1882///
1883/// CH-04 (#1875): bindings with `batch { count, window, key, expire_action }`
1884/// route events through an aggregation buffer instead of dispatching one
1885/// event at a time. When the buffer hits `count`, the dispatcher fires
1886/// the handler with a batched event (`event.batch` populated). When the
1887/// `window` elapses with fewer than `count` events, the dispatcher
1888/// either fires a partial batch (default) or discards the buffer.
1889async fn dispatch_channel_emit_to_triggers(
1890    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
1891    resolved: &ResolvedChannel,
1892    payload: ChannelEventPayload,
1893    emit_link: Option<crate::tracing::SpanLink>,
1894) -> Result<(), VmError> {
1895    // Snapshot matching bindings outside of any async work so the registry
1896    // borrow is short-lived.
1897    let bindings = crate::triggers::registry::channel_bindings_matching(
1898        resolved.scope.as_str(),
1899        &resolved.scope_id,
1900        &payload.name,
1901    );
1902
1903    // CH-04 (#1875): flush any aggregation buffers whose window has
1904    // elapsed BEFORE this emit so old batches go out in order. The
1905    // implicit sweep runs even for emits that don't match any binding so
1906    // a stale buffer can't outlive the trigger lifecycle. Tests can also
1907    // call `flush_trigger_aggregations()` directly for deterministic
1908    // window-expire coverage.
1909    flush_expired_aggregations_inner(ctx).await;
1910
1911    if bindings.is_empty() {
1912        return Ok(());
1913    }
1914    let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1915        // No host VM (e.g. raw test path); nothing to dispatch.
1916        return Ok(());
1917    };
1918    let log = active_event_log()
1919        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1920    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1921    for binding in bindings {
1922        // Filter (#1872 acceptance): JSON-path-equality on payload.
1923        // Applied BEFORE aggregation so the buffer only collects events
1924        // the handler actually cares about.
1925        if let Some(filter_str) = binding.filter.as_ref() {
1926            if !channel_filter_matches(filter_str, &payload.payload) {
1927                continue;
1928            }
1929        }
1930        let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1931
1932        // CH-04 (#1875): aggregation path. When the binding declared
1933        // `batch`, accumulate into the per-(binding, partition_key)
1934        // buffer; only dispatch once the threshold is reached.
1935        if let Some(aggregation_config) = binding.aggregation.as_ref() {
1936            let partition_key = crate::triggers::aggregation::partition_key_for_event(
1937                aggregation_config,
1938                &payload.payload,
1939            );
1940            let binding_key = binding.binding_key();
1941            let outcome = crate::triggers::aggregation::accumulate(
1942                &binding_key,
1943                aggregation_config,
1944                partition_key.as_deref(),
1945                event,
1946            );
1947            if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1948                // CH-06 (#1877): the ChannelMatch span for a batched
1949                // trigger multi-links to ALL constituent ChannelEmit
1950                // spans so the trace tree shows the aggregation fan-in.
1951                let links = emit_links_from_batch(&events);
1952                let batch_summary = batch_summary_for_transcript(&events);
1953                let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1954                {
1955                    Ok(batched) => batched,
1956                    Err(error) => {
1957                        return Err(VmError::Runtime(format!(
1958                            "emit_channel aggregation batch: {error}"
1959                        )));
1960                    }
1961                };
1962                fire_channel_match(
1963                    &dispatcher,
1964                    binding.clone(),
1965                    batched,
1966                    resolved,
1967                    links,
1968                    Some(batch_summary),
1969                )
1970                .await;
1971            }
1972            continue;
1973        }
1974
1975        let links = emit_links_from_event(&event);
1976        fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1977    }
1978    Ok(())
1979}
1980
1981/// CH-06 (#1877): open a `ChannelMatch` span, emit the transcript
1982/// lifecycle event, and dispatch the trigger handler. The span links
1983/// back to the originating ChannelEmit span (multi-link for batched
1984/// triggers) via `set_span_link` from P-05 (#1858).
1985async fn fire_channel_match(
1986    dispatcher: &crate::triggers::Dispatcher,
1987    binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1988    event: TriggerEvent,
1989    resolved: &ResolvedChannel,
1990    links: Vec<crate::tracing::SpanLink>,
1991    batch_summary: Option<serde_json::Value>,
1992) {
1993    let trigger_id = binding.id.as_str().to_string();
1994    let handler_kind = binding.handler.kind().to_string();
1995    // CH-07 (#1878): use the channel emit's id (carried as the trigger
1996    // event's `dedupe_key`) so the match receipt links back to the
1997    // original `ChannelEmitReceipt.event_id`. `TriggerEvent::new` mints
1998    // a fresh `trigger_evt_*` id for its own `event.id` field that does
1999    // not correlate to the emit chain.
2000    let event_id = if event.dedupe_key.is_empty() {
2001        event.id.0.clone()
2002    } else {
2003        event.dedupe_key.clone()
2004    };
2005    let mut match_span = ChannelSpanGuard::start_detached(
2006        crate::tracing::SpanKind::ChannelMatch,
2007        format!("channel.match {}", resolved.resolved_name),
2008        links,
2009    );
2010    match_span.set_metadata("event_id", serde_json::json!(event_id));
2011    match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
2012    match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
2013    match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
2014    if let Some(summary) = batch_summary.as_ref() {
2015        match_span.set_metadata("batch", summary.clone());
2016    }
2017    let span_id = crate::tracing::current_span_id().unwrap_or(0);
2018    let matched_at_ms = harn_clock::offset_datetime_to_ms(crate::clock_mock::now_utc());
2019    let matched_in_session_id = crate::agent_sessions::current_session_id()
2020        .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
2021    emit_channel_match_transcript(
2022        &trigger_id,
2023        &handler_kind,
2024        resolved,
2025        &event_id,
2026        matched_at_ms,
2027        matched_in_session_id.as_deref(),
2028        span_id,
2029        batch_summary.clone(),
2030    );
2031    // CH-07 (#1878): capture the dispatch outcome BEFORE writing the
2032    // match receipt so the receipt records the recorded handler result
2033    // (succeeded/failed/dlq/...) — replay tooling treats the receipt as
2034    // the cached match: on replay the dispatcher looks up the receipt
2035    // by `event_id` instead of re-evaluating the filter spec.
2036    let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
2037    let binding_key = binding.binding_key();
2038    let batch_info = batch_info_from_summary(batch_summary.as_ref());
2039    record_channel_match_receipt(
2040        &trigger_id,
2041        &binding_key,
2042        &handler_kind,
2043        resolved,
2044        &event_id,
2045        matched_in_session_id.as_deref(),
2046        batch_info,
2047        span_id,
2048        &dispatch_outcome,
2049    )
2050    .await;
2051    // Pre-CH-07 the dispatch error was discarded with `let _ = ...`. The
2052    // CH-07 match receipt now records the failure with
2053    // `dispatch_failed: true` so audit consumers don't lose the signal —
2054    // we keep the same fire-and-forget callsite semantics here.
2055    drop(dispatch_outcome);
2056    match_span.end();
2057}
2058
2059/// Drain all expired aggregation buffers and dispatch them. Exposed as
2060/// the `flush_trigger_aggregations()` builtin so Harn scripts (and the
2061/// production runtime) can deterministically advance window-expire
2062/// processing.
2063pub(crate) async fn flush_expired_aggregations_inner(ctx: Option<&crate::vm::AsyncBuiltinCtx>) {
2064    let expirations = crate::triggers::aggregation::drain_expired_aggregations();
2065    if expirations.is_empty() {
2066        return;
2067    }
2068    let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
2069        return;
2070    };
2071    let log = active_event_log()
2072        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
2073    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
2074    for expired in expirations {
2075        if matches!(
2076            expired.action,
2077            crate::triggers::aggregation::ExpireAction::Discard
2078        ) {
2079            continue;
2080        }
2081        // Resolve the binding by parsing the binding_key (id@vN). Skip
2082        // if the binding has been terminated since the buffer was opened.
2083        let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
2084            continue;
2085        };
2086        let Ok(version) = version_str.parse::<u32>() else {
2087            continue;
2088        };
2089        let Ok(binding) =
2090            crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
2091        else {
2092            continue;
2093        };
2094        // CH-06 (#1877): rebuild the channel-resolved metadata from the
2095        // first buffered event so the ChannelMatch span carries the
2096        // right scope/name even for window-expire flushes.
2097        let resolved_for_match = resolved_from_first_event(&expired.events);
2098        let links = emit_links_from_batch(&expired.events);
2099        let batch_summary = batch_summary_for_transcript(&expired.events);
2100        let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
2101        {
2102            Ok(batched) => batched,
2103            Err(_) => continue,
2104        };
2105        match resolved_for_match {
2106            Some(resolved) => {
2107                fire_channel_match(
2108                    &dispatcher,
2109                    binding,
2110                    batched,
2111                    &resolved,
2112                    links,
2113                    Some(batch_summary),
2114                )
2115                .await;
2116            }
2117            None => {
2118                let _ = dispatcher.dispatch(&binding, batched).await;
2119            }
2120        }
2121    }
2122}
2123
2124/// CH-06 (#1877): synthesize a `ResolvedChannel` from the first buffered
2125/// trigger event when a window-expire flush dispatches without going
2126/// back through `resolve_channel`. Returns `None` if the event payload
2127/// isn't a known channel payload (defensive — should not happen in
2128/// practice since the dispatcher only buffers channel events).
2129fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2130    let first = events.first()?;
2131    let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2132    else {
2133        return None;
2134    };
2135    let scope = ChannelScope::parse(&payload.scope).ok()?;
2136    let topic = Topic::new(format!(
2137        "channels.{}.{}.{}",
2138        payload.scope,
2139        sanitize_topic_component(&payload.scope_id),
2140        sanitize_topic_component(&payload.name),
2141    ))
2142    .ok()?;
2143    Some(ResolvedChannel {
2144        scope,
2145        scope_id: payload.scope_id.clone(),
2146        resolved_name: payload.name_resolved.clone(),
2147        topic,
2148        retention: retention_for_scope(scope),
2149    })
2150}
2151
2152fn build_channel_trigger_event(
2153    payload: &ChannelEventPayload,
2154    emit_link: Option<&crate::tracing::SpanLink>,
2155) -> TriggerEvent {
2156    let mut event = TriggerEvent::new(
2157        ProviderId::from("channel"),
2158        "channel.emit",
2159        None,
2160        payload.id.clone(),
2161        payload.tenant_id.clone().map(TenantId::new),
2162        BTreeMap::new(),
2163        ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2164        SignatureStatus::Unsigned,
2165    );
2166    event.headers.insert(
2167        "harn_channel_name".to_string(),
2168        payload.name_resolved.clone(),
2169    );
2170    event
2171        .headers
2172        .insert("harn_channel_scope".to_string(), payload.scope.clone());
2173    event.headers.insert(
2174        "harn_channel_scope_id".to_string(),
2175        payload.scope_id.clone(),
2176    );
2177    // CH-06 (#1877): stash the ChannelEmit span coordinates on the trigger
2178    // event so the downstream ChannelMatch span can link back via
2179    // `set_span_link` — even after travelling through an aggregation
2180    // buffer or being serialized into a batched envelope.
2181    if let Some(link) = emit_link {
2182        event
2183            .headers
2184            .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2185        event
2186            .headers
2187            .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2188    }
2189    event
2190}
2191
2192/// Evaluate the trigger filter spec (CH-02 / #1872) against the channel payload.
2193///
2194/// Supported syntax v1: JSON dict (`{"repo": "harn"}`) — each key is a
2195/// dot-path into the payload that must equality-match the value. Missing
2196/// path = no match. Non-dict filter strings are treated as no-op (return
2197/// true) so we don't regress pre-existing trigger `filter:` semantics that
2198/// reuse this field for other purposes.
2199fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2200    let trimmed = filter_raw.trim();
2201    if trimmed.is_empty() {
2202        return true;
2203    }
2204    let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2205        Ok(value) => value,
2206        Err(_) => return true,
2207    };
2208    let Some(map) = parsed.as_object() else {
2209        return true;
2210    };
2211    map.iter()
2212        .all(|(key, expected)| match payload_path(payload, key) {
2213            Some(actual) => actual == expected,
2214            None => false,
2215        })
2216}
2217
2218fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2219    let mut current = value;
2220    for segment in path.split('.') {
2221        if segment.is_empty() {
2222            return None;
2223        }
2224        current = match current {
2225            serde_json::Value::Object(map) => map.get(segment)?,
2226            _ => return None,
2227        };
2228    }
2229    Some(current)
2230}
2231
2232#[cfg(test)]
2233mod tests;