Skip to main content

harn_vm/channels/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7
8use crate::event_log::{
9    active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
10    EventId, EventLog, LogEvent, Topic,
11};
12use crate::llm::vm_value_to_json;
13use crate::runtime_limits::RuntimeLimits;
14use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
15use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
16use crate::value::{VmError, VmValue};
17
18const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
19const CHANNEL_EVENT_KIND: &str = "channel.emit";
20const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
21const NAME_HEADER: &str = "harn.channel.name";
22const SCOPE_HEADER: &str = "harn.channel.scope";
23const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
24const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
25
26/// CH-06 (#1877): topic for `transcript.channel.emit` / `transcript.channel.match`
27/// transcript events. Consumers subscribe to this topic to render channel
28/// activity alongside reminder/suspension lifecycle events.
29pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
30pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
31pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
32
33/// CH-07 (#1878): durable audit topic for the replay-determinism receipts
34/// emitted alongside every channel emit + every channel match. Consumers
35/// drive replay/audit tooling off this topic instead of the lossy
36/// `transcript.channel.lifecycle` summary topic — receipts carry the full
37/// payload, signed timestamps, and cached match linkage so the
38/// `replay_oracle` can byte-compare two runs of the same workload.
39pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
40pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
41pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
42/// CH-07 (#1878): receipt schema header so downstream tooling can
43/// version-gate parsers (mirrors `harn.pool_submit.v1` from PL-06 / #1891).
44const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
45const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
46
47/// CH-11 (#1911): event kinds + schema header for guardrail middleware
48/// audit entries. Block + warn outcomes both write to the same
49/// `lifecycle.channel.audit` topic so security review tooling reads a
50/// single stream and discriminates on `kind`.
51pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
52pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
53const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
54
55/// CH-06 (#1877): per-event headers stamped on the `TriggerEvent` so the
56/// channel-match dispatcher can link the `ChannelMatch` span back to the
57/// originating `ChannelEmit` span across the async boundary (also through
58/// the aggregation buffer for batched triggers).
59const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
60const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
61
62static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
63static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67enum ChannelScope {
68    Session,
69    Pipeline,
70    Tenant,
71    Org,
72}
73
74impl ChannelScope {
75    fn parse(value: &str) -> Result<Self, ChannelError> {
76        match value.trim() {
77            "session" => Ok(Self::Session),
78            "pipeline" => Ok(Self::Pipeline),
79            "tenant" => Ok(Self::Tenant),
80            "org" => Ok(Self::Org),
81            other => Err(ChannelError::malformed(format!(
82                "HARN-CHN-003 malformed channel scope '{other}'"
83            ))),
84        }
85    }
86
87    fn as_str(self) -> &'static str {
88        match self {
89            Self::Session => "session",
90            Self::Pipeline => "pipeline",
91            Self::Tenant => "tenant",
92            Self::Org => "org",
93        }
94    }
95}
96
97#[derive(Clone, Debug, Default)]
98struct ChannelContext {
99    task_id: Option<String>,
100    root_task_id: Option<String>,
101    scope_id: Option<String>,
102    workflow_id: Option<String>,
103    run_id: Option<String>,
104    worker_id: Option<String>,
105    agent_session_id: Option<String>,
106    root_agent_session_id: Option<String>,
107    tenant_id: Option<String>,
108}
109
110#[derive(Clone, Debug, Default)]
111struct ChannelOptions {
112    scope: Option<ChannelScope>,
113    id: Option<String>,
114    tenant_id: Option<String>,
115    session_id: Option<String>,
116    pipeline_id: Option<String>,
117    from_cursor: Option<EventId>,
118    limit: Option<usize>,
119    ttl_ms: Option<i64>,
120}
121
122#[derive(Clone, Debug)]
123struct ResolvedChannel {
124    scope: ChannelScope,
125    scope_id: String,
126    resolved_name: String,
127    topic: Topic,
128    retention: &'static str,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub struct SignedTimestamp {
133    pub at_ms: i64,
134    pub at: String,
135    pub algorithm: String,
136    pub key_id: String,
137    pub signature: String,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141struct StoredChannelEvent {
142    id: String,
143    name: String,
144    payload: serde_json::Value,
145    emitted_at: SignedTimestamp,
146    emitted_by: String,
147    scope: String,
148    scope_id: String,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pipeline_id: Option<String>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    session_id: Option<String>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    tenant_id: Option<String>,
155    retention: String,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    ttl_ms: Option<i64>,
158}
159
160/// CH-07 (#1878): durable replay-determinism receipt for a single
161/// `emit_channel(...)` call. Pairs 1:1 with the `ChannelEmit` span (CH-06
162/// / #1877) and with the durable journal append. Persisted to the
163/// `lifecycle.channel.audit` event-log topic so the `replay_oracle` can
164/// reproduce the entire emit chain across two runs of the same workload.
165///
166/// `payload_hash` lets the oracle detect producer-side drift
167/// (`HARN-REP-CHN-002`) without having to canonicalize the full payload
168/// during comparison; `payload` carries the verbatim value for byte
169/// equality + replay reconstruction.
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub struct ChannelEmitReceipt {
172    pub event_id: String,
173    pub name_resolved: String,
174    pub scope: String,
175    pub scope_id: String,
176    pub payload_hash: String,
177    pub payload: serde_json::Value,
178    pub emitted_at: SignedTimestamp,
179    pub emitted_by: String,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub pipeline_id: Option<String>,
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub session_id: Option<String>,
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub tenant_id: Option<String>,
186    pub topic: String,
187    pub inserted: bool,
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub span_id: Option<u64>,
190}
191
192/// CH-07 (#1878): durable replay-determinism receipt for a single channel
193/// match — one per `(binding, event)` pair the dispatcher fires. For
194/// aggregation/batched triggers (CH-04 / #1875) the receipt carries
195/// `batch.constituent_event_ids` so the replay oracle can verify the
196/// full batch composition matches across runs (`HARN-REP-CHN-003`).
197///
198/// `event_id` doubles as the cached-match key: on replay the dispatcher
199/// looks up the recorded match by `event_id` instead of re-evaluating
200/// the filter spec, preserving "the journal IS the spec" determinism.
201#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
202pub struct ChannelMatchReceipt {
203    pub event_id: String,
204    pub trigger_id: String,
205    pub binding_key: String,
206    pub name_resolved: String,
207    pub scope: String,
208    pub scope_id: String,
209    pub matched_at: SignedTimestamp,
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub matched_in_session_id: Option<String>,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub batch: Option<ChannelMatchBatchInfo>,
214    pub handler_kind: String,
215    pub handler_result: ChannelMatchResultSummary,
216    #[serde(skip_serializing_if = "Option::is_none")]
217    pub span_id: Option<u64>,
218}
219
220/// CH-07 (#1878): batched-dispatch summary stamped onto
221/// `ChannelMatchReceipt`. The `constituent_event_ids` list is the full
222/// recorded composition; replay reconstructs the batch from those ids.
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
224pub struct ChannelMatchBatchInfo {
225    pub count: usize,
226    pub constituent_event_ids: Vec<String>,
227}
228
229/// CH-07 (#1878): summary of the handler invocation outcome. Mirrors the
230/// shape of the dispatcher's `DispatchOutcome` but kept lightweight so
231/// the receipt stays compact and self-contained — replay tooling never
232/// needs to cross-reference the dispatcher log to interpret a match.
233#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
234pub struct ChannelMatchResultSummary {
235    pub status: String,
236    pub attempt_count: u32,
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub error: Option<String>,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub dispatch_failed: Option<bool>,
241}
242
243impl ChannelMatchResultSummary {
244    fn from_dispatch(
245        outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
246    ) -> Self {
247        match outcome {
248            Ok(outcome) => Self {
249                status: outcome.status.as_str().to_string(),
250                attempt_count: outcome.attempt_count,
251                error: outcome.error.clone(),
252                dispatch_failed: None,
253            },
254            Err(error) => Self {
255                status: "dispatch_error".to_string(),
256                attempt_count: 0,
257                error: Some(error.to_string()),
258                dispatch_failed: Some(true),
259            },
260        }
261    }
262}
263
264#[derive(Debug)]
265struct ChannelError(String);
266
267impl ChannelError {
268    fn missing_pipeline() -> Self {
269        Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
270    }
271
272    fn cross_tenant(message: impl Into<String>) -> Self {
273        Self(format!("HARN-CHN-002 {}", message.into()))
274    }
275
276    fn malformed(message: impl Into<String>) -> Self {
277        Self(message.into())
278    }
279
280    fn scope_ambiguous(message: impl Into<String>) -> Self {
281        Self(format!("HARN-CHN-004 {}", message.into()))
282    }
283}
284
285impl From<ChannelError> for VmError {
286    fn from(error: ChannelError) -> Self {
287        VmError::Runtime(error.0)
288    }
289}
290
291pub fn reset_channel_state() {
292    if let Some(slot) = SESSION_CHANNEL_LOG.get() {
293        *slot.lock().expect("channel session log poisoned") = None;
294    }
295    // CH-11 (#1911): clear the per-thread guardrail registry so the
296    // next pipeline starts with a clean slate. Mirrors how
297    // SESSION_CHANNEL_LOG is reset above.
298    crate::channel_guardrails::clear();
299}
300
301pub(crate) async fn emit_channel_from_vm(
302    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
303    args: Vec<VmValue>,
304) -> Result<VmValue, VmError> {
305    let name = required_string(args.first(), "emit_channel", "name")?;
306    let payload = vm_value_to_json(
307        args.get(1)
308            .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
309    );
310    let options = parse_options(args.get(2), "emit_channel")?;
311    let context = ChannelContext::current(ctx);
312    let resolved = resolve_channel(&name, &options, &context)?;
313    let event_id = options
314        .id
315        .clone()
316        .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
317    let emitted_by = emitted_by(&context);
318    let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
319    let occurred_at_ms = emitted_at.at_ms;
320
321    // CH-11 (#1911): channel guardrails middleware. Runs BEFORE the
322    // durable journal append so a blocked payload never gets persisted
323    // (the block itself IS persisted on the lifecycle.channel.audit
324    // topic so the audit trail is durable). Warn verdicts proceed but
325    // record an audit. The aggregate decision is the worst verdict
326    // across every registered guardrail.
327    let guardrail_context = serde_json::json!({
328        "name": name,
329        "name_resolved": resolved.resolved_name,
330        "scope": resolved.scope.as_str(),
331        "scope_id": resolved.scope_id,
332        "event_id": event_id,
333        "emitted_by": emitted_by,
334    });
335    let decision = crate::channel_guardrails::evaluate(
336        ctx,
337        &payload,
338        &guardrail_context,
339        &resolved.resolved_name,
340    )
341    .await?;
342    if matches!(
343        decision.verdict,
344        crate::channel_guardrails::Verdict::Block { .. }
345    ) {
346        return handle_blocked_emit(
347            &name,
348            &resolved,
349            &event_id,
350            &emitted_by,
351            &emitted_at,
352            &payload,
353            &decision,
354        )
355        .await;
356    }
357    record_guardrail_warnings(
358        &resolved,
359        &event_id,
360        &emitted_by,
361        &payload,
362        decision.fired.as_slice(),
363    )
364    .await;
365    let record = StoredChannelEvent {
366        id: event_id.clone(),
367        name: resolved.resolved_name.clone(),
368        payload,
369        emitted_at,
370        emitted_by: emitted_by.clone(),
371        scope: resolved.scope.as_str().to_string(),
372        scope_id: resolved.scope_id.clone(),
373        pipeline_id: context.pipeline_id_for_receipt(&resolved),
374        session_id: context.session_id_for_receipt(&resolved),
375        tenant_id: context.tenant_id_for_receipt(&resolved),
376        retention: resolved.retention.to_string(),
377        ttl_ms: options.ttl_ms,
378    };
379
380    // CH-06 (#1877): open the ChannelEmit span around the durable append +
381    // trigger fan-out. The span captures emit-time metadata (scope, name,
382    // payload summary) and its trace/span id is stashed on the trigger
383    // event headers so the downstream ChannelMatch span can link back.
384    let mut emit_span = ChannelSpanGuard::start(
385        crate::tracing::SpanKind::ChannelEmit,
386        format!("channel.emit {}", resolved.resolved_name),
387        Vec::new(),
388    );
389    emit_span.set_metadata("event_id", serde_json::json!(record.id));
390    emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
391    emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
392    emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
393    emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
394    let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
395    let emit_link = emit_span.link();
396
397    let mut headers = BTreeMap::new();
398    headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
399    headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
400    headers.insert(
401        SCOPE_HEADER.to_string(),
402        resolved.scope.as_str().to_string(),
403    );
404    headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
405    headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
406
407    let log = log_for_scope(resolved.scope);
408    let mut log_event = LogEvent::new(
409        CHANNEL_EVENT_KIND,
410        serde_json::to_value(&record)
411            .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
412    )
413    .with_headers(headers);
414    log_event.occurred_at_ms = occurred_at_ms;
415    let outcome = log
416        .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
417        .await
418        .map_err(channel_log_error)?;
419    let receipt = receipt_value(
420        &resolved.topic,
421        outcome.event_id,
422        &outcome.event,
423        outcome.inserted,
424    )?;
425    // CH-06 (#1877): emit the transcript event whether the append was fresh
426    // or idempotent — both outcomes are first-class observability signals.
427    emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
428    // CH-07 (#1878): persist the durable emit receipt on the
429    // `lifecycle.channel.audit` topic so the replay oracle has the full
430    // emit chain (payload hash, signed timestamp, scope correlation)
431    // independently of the lossy transcript summary topic.
432    record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
433    // CH-02 (#1872): fan out the emit to channel-source triggers. Only fresh
434    // appends fan out; idempotent duplicates short-circuit because the producer
435    // already saw the original delivery.
436    if outcome.inserted {
437        let payload_json = outcome
438            .event
439            .payload
440            .get("payload")
441            .cloned()
442            .unwrap_or(serde_json::Value::Null);
443        let context_for_fanout = ChannelContext::current(ctx);
444        let fanout_payload = ChannelEventPayload {
445            id: event_id.clone(),
446            name: parse_name(&name)
447                .map(|parsed| parsed.name)
448                .unwrap_or_else(|_| resolved.resolved_name.clone()),
449            name_resolved: resolved.resolved_name.clone(),
450            scope: resolved.scope.as_str().to_string(),
451            scope_id: resolved.scope_id.clone(),
452            payload: payload_json,
453            emitted_by: emitted_by.clone(),
454            tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
455            session_id: context_for_fanout.session_id_for_receipt(&resolved),
456            pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
457        };
458        dispatch_channel_emit_to_triggers(ctx, &resolved, fanout_payload, emit_link).await?;
459    }
460    emit_span.end();
461    Ok(crate::stdlib::json_to_vm_value(&receipt))
462}
463
464pub(crate) async fn channel_events_from_vm(
465    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
466    args: Vec<VmValue>,
467) -> Result<VmValue, VmError> {
468    let name = required_string(args.first(), "channel_events", "name")?;
469    let options = parse_options(args.get(1), "channel_events")?;
470    let context = ChannelContext::current(ctx);
471    let resolved = resolve_channel(&name, &options, &context)?;
472    let events = log_for_scope(resolved.scope)
473        .read_range(
474            &resolved.topic,
475            options.from_cursor,
476            options.limit.unwrap_or(usize::MAX),
477        )
478        .await
479        .map_err(channel_log_error)?;
480    let values = events
481        .into_iter()
482        .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
483        .collect::<Result<Vec<_>, _>>()?;
484    Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
485        values,
486    )))
487}
488
489impl ChannelContext {
490    fn current(ctx: Option<&crate::vm::AsyncBuiltinCtx>) -> Self {
491        let mut context = Self::default();
492        if let Some(vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) {
493            context.task_id = Some(vm.runtime_context.task_id.clone());
494            context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
495            context.scope_id = vm.runtime_context.scope_id.clone();
496            if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
497                context.workflow_id = dict_string(&values, "workflow_id");
498                context.run_id = dict_string(&values, "run_id");
499                context.worker_id = dict_string(&values, "worker_id");
500                context.agent_session_id = dict_string(&values, "agent_session_id");
501                context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
502                context.tenant_id = dict_string(&values, "tenant_id");
503            }
504        }
505        context.agent_session_id = context
506            .agent_session_id
507            .or_else(crate::agent_sessions::current_session_id);
508        context
509    }
510
511    fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
512        // CH-03 (#1874): an explicit `options.session_id` that disagrees with the
513        // active session is HARN-CHN-004 ambiguity, not a silent override. The
514        // resolver MUST be deterministic for a given runtime context.
515        if let Some(requested) = options.session_id.as_deref() {
516            if let Some(active) = self.agent_session_id.as_deref() {
517                if active != requested {
518                    return Err(ChannelError::scope_ambiguous(format!(
519                        "session scope ambiguous: options.session_id '{requested}' \
520                         conflicts with active session '{active}'"
521                    )));
522                }
523            }
524        }
525        Ok(options
526            .session_id
527            .clone()
528            .or_else(|| self.agent_session_id.clone())
529            .or_else(|| self.root_agent_session_id.clone())
530            .or_else(|| self.scope_id.clone())
531            .or_else(|| self.root_task_id.clone())
532            .unwrap_or_else(|| "session".to_string()))
533    }
534
535    fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
536        // CH-03 (#1874): explicit `options.pipeline_id` that conflicts with the
537        // active workflow/run is HARN-CHN-004. Two pipelines cannot share a
538        // resolved channel without an explicit disambiguation.
539        let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
540        if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
541        {
542            if requested != active {
543                return Err(ChannelError::scope_ambiguous(format!(
544                    "pipeline scope ambiguous: options.pipeline_id '{requested}' \
545                     conflicts with active pipeline '{active}'"
546                )));
547            }
548        }
549        options
550            .pipeline_id
551            .clone()
552            .or(active)
553            .ok_or_else(ChannelError::missing_pipeline)
554    }
555
556    fn tenant_id(
557        &self,
558        options: &ChannelOptions,
559        requested: Option<&str>,
560    ) -> Result<String, ChannelError> {
561        let current = self.tenant_id.as_deref();
562        let requested = requested
563            .map(ToOwned::to_owned)
564            .or_else(|| options.tenant_id.clone());
565        if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
566            if current != requested {
567                return Err(ChannelError::cross_tenant(format!(
568                    "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
569                )));
570            }
571        }
572        Ok(requested
573            .or_else(|| self.tenant_id.clone())
574            .unwrap_or_else(|| "default".to_string()))
575    }
576
577    fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
578        match resolved.scope {
579            ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
580            _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
581        }
582    }
583
584    fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
585        match resolved.scope {
586            ChannelScope::Session => Some(resolved.scope_id.clone()),
587            _ => self
588                .agent_session_id
589                .clone()
590                .or_else(|| self.root_agent_session_id.clone()),
591        }
592    }
593
594    fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
595        match resolved.scope {
596            ChannelScope::Tenant => Some(resolved.scope_id.clone()),
597            _ => self.tenant_id.clone(),
598        }
599    }
600}
601
602fn resolve_channel(
603    raw_name: &str,
604    options: &ChannelOptions,
605    context: &ChannelContext,
606) -> Result<ResolvedChannel, ChannelError> {
607    let parsed = parse_name(raw_name)?;
608    if let Some(option_scope) = options.scope {
609        if let Some(prefix_scope) = parsed.scope {
610            if prefix_scope != option_scope {
611                return Err(ChannelError::malformed(format!(
612                    "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
613                    prefix_scope.as_str(),
614                    option_scope.as_str()
615                )));
616            }
617        }
618    }
619
620    let scope = parsed
621        .scope
622        .or(options.scope)
623        .unwrap_or(ChannelScope::Tenant);
624    if scope == ChannelScope::Org {
625        return Err(ChannelError::cross_tenant(
626            "org-scoped channels are disabled until org grants are available",
627        ));
628    }
629
630    validate_channel_name(&parsed.name)?;
631    let scope_id = match scope {
632        ChannelScope::Session => match parsed.scope_id.clone() {
633            Some(id) => id,
634            None => context.session_id(options)?,
635        },
636        ChannelScope::Pipeline => context.pipeline_id(options)?,
637        ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
638        ChannelScope::Org => unreachable!("org scope returned above"),
639    };
640    validate_scope_id(scope, &scope_id)?;
641    let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
642    let topic = Topic::new(format!(
643        "channels.{}.{}.{}",
644        scope.as_str(),
645        sanitize_topic_component(&scope_id),
646        sanitize_topic_component(&parsed.name)
647    ))
648    .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
649    Ok(ResolvedChannel {
650        scope,
651        scope_id,
652        resolved_name,
653        topic,
654        retention: retention_for_scope(scope),
655    })
656}
657
658#[derive(Clone, Debug)]
659struct ParsedName {
660    scope: Option<ChannelScope>,
661    scope_id: Option<String>,
662    name: String,
663}
664
665fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
666    let raw_name = raw_name.trim();
667    if raw_name.is_empty() {
668        return Err(ChannelError::malformed(
669            "HARN-CHN-003 channel name cannot be empty",
670        ));
671    }
672    let Some((prefix, rest)) = raw_name.split_once(':') else {
673        return Ok(ParsedName {
674            scope: None,
675            scope_id: None,
676            name: raw_name.to_string(),
677        });
678    };
679    let scope = ChannelScope::parse(prefix)?;
680    match scope {
681        ChannelScope::Session | ChannelScope::Pipeline => {
682            if rest.is_empty() || rest.contains(':') {
683                return Err(ChannelError::malformed(format!(
684                    "HARN-CHN-003 malformed {} channel name '{raw_name}'",
685                    scope.as_str()
686                )));
687            }
688            Ok(ParsedName {
689                scope: Some(scope),
690                scope_id: None,
691                name: rest.to_string(),
692            })
693        }
694        ChannelScope::Tenant => {
695            if rest.is_empty() {
696                return Err(ChannelError::malformed(
697                    "HARN-CHN-003 tenant channel name cannot be empty",
698                ));
699            }
700            let (scope_id, name) = match rest.split_once(':') {
701                Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
702                    (Some(tenant_id.to_string()), name.to_string())
703                }
704                Some(_) => {
705                    return Err(ChannelError::malformed(format!(
706                        "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
707                    )))
708                }
709                None => (None, rest.to_string()),
710            };
711            Ok(ParsedName {
712                scope: Some(scope),
713                scope_id,
714                name,
715            })
716        }
717        ChannelScope::Org => {
718            let Some((org_id, name)) = rest.split_once(':') else {
719                return Err(ChannelError::malformed(format!(
720                    "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
721                )));
722            };
723            if org_id.is_empty() || name.is_empty() {
724                return Err(ChannelError::malformed(format!(
725                    "HARN-CHN-003 malformed org channel name '{raw_name}'"
726                )));
727            }
728            Ok(ParsedName {
729                scope: Some(scope),
730                scope_id: Some(org_id.to_string()),
731                name: name.to_string(),
732            })
733        }
734    }
735}
736
737fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
738    if name.trim().is_empty()
739        || name.contains(':')
740        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
741    {
742        return Err(ChannelError::malformed(format!(
743            "HARN-CHN-003 malformed channel name '{name}'"
744        )));
745    }
746    Ok(())
747}
748
749fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
750    if scope_id.trim().is_empty()
751        || scope_id
752            .chars()
753            .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
754    {
755        return Err(ChannelError::malformed(format!(
756            "HARN-CHN-003 malformed {} scope id '{scope_id}'",
757            scope.as_str()
758        )));
759    }
760    Ok(())
761}
762
763fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
764    match scope {
765        ChannelScope::Session => {
766            let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
767            let mut guard = slot.lock().expect("channel session log poisoned");
768            guard
769                .get_or_insert_with(|| {
770                    Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
771                        CHANNEL_QUEUE_DEPTH,
772                    )))
773                })
774                .clone()
775        }
776        ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
777            .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
778        ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
779    }
780}
781
782fn signed_timestamp(
783    resolved: &ResolvedChannel,
784    event_id: &str,
785    emitted_by: &str,
786) -> SignedTimestamp {
787    let at = crate::clock_mock::now_utc();
788    let at_ms = harn_clock::offset_datetime_to_ms(at);
789    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
790    let material = format!(
791        "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
792        resolved.resolved_name,
793        resolved.scope.as_str(),
794        resolved.scope_id
795    );
796    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
797        signing_salt(),
798        material.as_bytes(),
799    ));
800    SignedTimestamp {
801        at_ms,
802        at: at_text,
803        algorithm: "hmac-sha256".to_string(),
804        key_id: "local-session".to_string(),
805        signature: format!("sha256:{signature}"),
806    }
807}
808
809fn signing_salt() -> &'static [u8] {
810    SIGNING_SALT
811        .get_or_init(|| {
812            format!(
813                "harn-channel-signing-salt:{}:{}",
814                std::process::id(),
815                uuid::Uuid::now_v7()
816            )
817            .into_bytes()
818        })
819        .as_slice()
820}
821
822/// CH-07 (#1878): signed timestamp for a channel match. Mirrors the emit
823/// timestamp algorithm in `signed_timestamp` so the replay oracle can
824/// verify match timestamps with the same `signing_salt()` key material.
825/// Including `event_id` + `trigger_id` in the signing material binds the
826/// timestamp to this specific (emit, binding) pair so a replayed match
827/// can't be re-stamped onto a different binding.
828fn signed_match_timestamp(
829    resolved: &ResolvedChannel,
830    event_id: &str,
831    trigger_id: &str,
832) -> SignedTimestamp {
833    let at = crate::clock_mock::now_utc();
834    let at_ms = harn_clock::offset_datetime_to_ms(at);
835    let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
836    let material = format!(
837        "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
838        resolved.resolved_name,
839        resolved.scope.as_str(),
840        resolved.scope_id
841    );
842    let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
843        signing_salt(),
844        material.as_bytes(),
845    ));
846    SignedTimestamp {
847        at_ms,
848        at: at_text,
849        algorithm: "hmac-sha256".to_string(),
850        key_id: "local-session".to_string(),
851        signature: format!("sha256:{signature}"),
852    }
853}
854
855/// CH-07 (#1878): SHA-256 of the canonical JSON encoding of a channel
856/// emit payload. `serde_json::to_string` sorts object keys
857/// deterministically when the value originates from `serde_json::Value`
858/// (BTreeMap-backed `Map` with `preserve_order` disabled — the default
859/// for this crate). Used by the replay oracle to detect producer-side
860/// drift (`HARN-REP-CHN-002`).
861pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
862    let canonical = canonical_json_string(payload);
863    let digest = Sha256::digest(canonical.as_bytes());
864    format!("sha256:{}", hex::encode(digest))
865}
866
867/// CH-07 (#1878): canonicalize a JSON value to a deterministic string so
868/// the SHA-256 hash on `ChannelEmitReceipt.payload_hash` is stable
869/// across reruns. Recursively sorts object keys; arrays preserve their
870/// (semantically meaningful) order. Mirrors the canonicalization rule
871/// `replay_oracle::canonicalize_run` relies on for cross-run diffs.
872fn canonical_json_string(value: &serde_json::Value) -> String {
873    match value {
874        serde_json::Value::Object(map) => {
875            let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
876                std::collections::BTreeMap::new();
877            for (key, value) in map {
878                sorted.insert(key, value);
879            }
880            let parts: Vec<String> = sorted
881                .iter()
882                .map(|(key, value)| {
883                    format!(
884                        "{}:{}",
885                        serde_json::to_string(key).unwrap_or_else(|_| (*key).clone()),
886                        canonical_json_string(value)
887                    )
888                })
889                .collect();
890            format!("{{{}}}", parts.join(","))
891        }
892        serde_json::Value::Array(items) => {
893            let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
894            format!("[{}]", parts.join(","))
895        }
896        other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
897    }
898}
899
900/// CH-07 (#1878): append a channel audit receipt to the durable
901/// `lifecycle.channel.audit` topic. Mirrors `emit_pool_*_receipt`
902/// (PL-06 / #1891). The audit log uses `active_event_log()` so receipts
903/// inherit the pipeline's durability (in-memory in tests; durable
904/// SQLite/etc. in production). Best-effort: receipt append errors do not
905/// fail the emit/match — the emit's user-visible receipt is the source
906/// of truth for caller behavior. Audit consumers learn about gaps via
907/// the canonical event-log read API.
908async fn append_channel_audit_event(
909    kind: &'static str,
910    schema: &'static str,
911    payload: serde_json::Value,
912) {
913    let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
914        Ok(topic) => topic,
915        Err(_) => return,
916    };
917    let log = active_event_log()
918        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
919    let mut headers = BTreeMap::new();
920    headers.insert("schema".to_string(), schema.to_string());
921    let _ = log
922        .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
923        .await;
924}
925
926/// CH-11 (#1911): emit a guardrail-blocked / -warned audit entry to
927/// the durable channel-audit topic AND to the in-process lifecycle
928/// audit log. The lifecycle entry is what `pipeline_lifecycle_audit_log_take()`
929/// surfaces to test fixtures; the event-log entry is what production
930/// audit consumers tail. Both echo the verbatim payload (callers
931/// concerned about PII should layer `redact::*` on top of the
932/// guardrail). Best-effort; audit write errors do not propagate.
933async fn record_guardrail_audit(
934    kind: &'static str,
935    resolved: &ResolvedChannel,
936    event_id: &str,
937    emitted_by: &str,
938    payload: &serde_json::Value,
939    fired: &[crate::channel_guardrails::FiredGuardrail],
940) {
941    let fired_json: Vec<serde_json::Value> = fired
942        .iter()
943        .map(|entry| {
944            serde_json::json!({
945                "id": entry.id,
946                "kind": entry.kind,
947                "verdict_label": entry.verdict_label,
948                "reason": entry.reason,
949            })
950        })
951        .collect();
952    let audit_payload = serde_json::json!({
953        "event_id": event_id,
954        "name_resolved": resolved.resolved_name,
955        "scope": resolved.scope.as_str(),
956        "scope_id": resolved.scope_id,
957        "emitted_by": emitted_by,
958        "payload_hash": channel_payload_hash(payload),
959        "payload": payload,
960        "fired": fired_json,
961    });
962    append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
963    // Mirror to the lifecycle audit log so tests using
964    // `pipeline_lifecycle_audit_log_take()` see the entry without
965    // having to scan the event log directly.
966    crate::orchestration::record_lifecycle_audit(kind, audit_payload);
967}
968
969/// CH-11 (#1911): record a warning audit for every Warn verdict that
970/// fired. A no-op when there were no Warn-level fires (the common
971/// case). Called BEFORE the durable append so the warning order
972/// matches the dispatch order.
973async fn record_guardrail_warnings(
974    resolved: &ResolvedChannel,
975    event_id: &str,
976    emitted_by: &str,
977    payload: &serde_json::Value,
978    fired: &[crate::channel_guardrails::FiredGuardrail],
979) {
980    if fired.is_empty() {
981        return;
982    }
983    record_guardrail_audit(
984        CHANNEL_GUARDRAIL_WARNING_KIND,
985        resolved,
986        event_id,
987        emitted_by,
988        payload,
989        fired,
990    )
991    .await;
992}
993
994/// CH-11 (#1911): record a `channel_guardrail_blocked` audit and
995/// return the synthetic "blocked" receipt so the caller can
996/// distinguish a guardrail block from a successful append or an
997/// idempotent dedupe. No durable journal append happens for a blocked
998/// emit; the audit log entry IS the durable artifact.
999async fn handle_blocked_emit(
1000    raw_name: &str,
1001    resolved: &ResolvedChannel,
1002    event_id: &str,
1003    emitted_by: &str,
1004    emitted_at: &SignedTimestamp,
1005    payload: &serde_json::Value,
1006    decision: &crate::channel_guardrails::GuardrailDecision,
1007) -> Result<VmValue, VmError> {
1008    record_guardrail_audit(
1009        CHANNEL_GUARDRAIL_BLOCKED_KIND,
1010        resolved,
1011        event_id,
1012        emitted_by,
1013        payload,
1014        decision.fired.as_slice(),
1015    )
1016    .await;
1017    let block_reason = decision
1018        .fired
1019        .iter()
1020        .rev()
1021        .find_map(|f| {
1022            if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1023                || f.verdict_label.contains("block")
1024            {
1025                Some(f.reason.clone())
1026            } else {
1027                None
1028            }
1029        })
1030        .unwrap_or_else(|| "guardrail blocked".to_string());
1031    let fired_json: Vec<serde_json::Value> = decision
1032        .fired
1033        .iter()
1034        .map(|entry| {
1035            serde_json::json!({
1036                "id": entry.id,
1037                "kind": entry.kind,
1038                "verdict_label": entry.verdict_label,
1039                "reason": entry.reason,
1040            })
1041        })
1042        .collect();
1043    let receipt = serde_json::json!({
1044        "event_id": event_id,
1045        "cursor": serde_json::Value::Null,
1046        "id": event_id,
1047        "name": raw_name,
1048        "name_resolved": resolved.resolved_name,
1049        "scope": resolved.scope.as_str(),
1050        "scope_id": resolved.scope_id,
1051        "emitted_at": emitted_at,
1052        "emitted_by": emitted_by,
1053        "retention": resolved.retention,
1054        "topic": resolved.topic.as_str(),
1055        "inserted": false,
1056        "duplicate": false,
1057        "blocked": true,
1058        "block_reason": block_reason,
1059        "guardrail_fired": fired_json,
1060    });
1061    Ok(crate::stdlib::json_to_vm_value(&receipt))
1062}
1063
1064/// CH-07 (#1878): build + persist the emit receipt on the audit topic.
1065/// Called from `emit_channel_from_vm` immediately after the durable
1066/// journal append succeeds (whether the append was fresh or
1067/// idempotent-suppressed — both outcomes are first-class audit signals).
1068async fn record_channel_emit_receipt(
1069    record: &StoredChannelEvent,
1070    resolved: &ResolvedChannel,
1071    inserted: bool,
1072    span_id: u64,
1073) {
1074    let receipt = ChannelEmitReceipt {
1075        event_id: record.id.clone(),
1076        name_resolved: resolved.resolved_name.clone(),
1077        scope: resolved.scope.as_str().to_string(),
1078        scope_id: resolved.scope_id.clone(),
1079        payload_hash: channel_payload_hash(&record.payload),
1080        payload: record.payload.clone(),
1081        emitted_at: record.emitted_at.clone(),
1082        emitted_by: record.emitted_by.clone(),
1083        pipeline_id: record.pipeline_id.clone(),
1084        session_id: record.session_id.clone(),
1085        tenant_id: record.tenant_id.clone(),
1086        topic: resolved.topic.as_str().to_string(),
1087        inserted,
1088        span_id: if span_id == 0 { None } else { Some(span_id) },
1089    };
1090    let payload = match serde_json::to_value(&receipt) {
1091        Ok(value) => value,
1092        Err(_) => return,
1093    };
1094    append_channel_audit_event(
1095        CHANNEL_EMIT_RECEIPT_KIND,
1096        CHANNEL_EMIT_RECEIPT_SCHEMA,
1097        payload,
1098    )
1099    .await;
1100}
1101
1102/// CH-07 (#1878): build + persist the match receipt on the audit topic.
1103/// Called from `fire_channel_match` after the dispatcher returns, so
1104/// `handler_result` reflects the recorded outcome (succeeded / failed /
1105/// dlq / cancelled / ...) and replay tooling can verify the same handler
1106/// shape fires on every replay.
1107#[allow(clippy::too_many_arguments)]
1108async fn record_channel_match_receipt(
1109    trigger_id: &str,
1110    binding_key: &str,
1111    handler_kind: &str,
1112    resolved: &ResolvedChannel,
1113    event_id: &str,
1114    matched_in_session_id: Option<&str>,
1115    batch: Option<ChannelMatchBatchInfo>,
1116    span_id: u64,
1117    dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1118) {
1119    let receipt = ChannelMatchReceipt {
1120        event_id: event_id.to_string(),
1121        trigger_id: trigger_id.to_string(),
1122        binding_key: binding_key.to_string(),
1123        name_resolved: resolved.resolved_name.clone(),
1124        scope: resolved.scope.as_str().to_string(),
1125        scope_id: resolved.scope_id.clone(),
1126        matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1127        matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1128        batch,
1129        handler_kind: handler_kind.to_string(),
1130        handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1131        span_id: if span_id == 0 { None } else { Some(span_id) },
1132    };
1133    let payload = match serde_json::to_value(&receipt) {
1134        Ok(value) => value,
1135        Err(_) => return,
1136    };
1137    append_channel_audit_event(
1138        CHANNEL_MATCH_RECEIPT_KIND,
1139        CHANNEL_MATCH_RECEIPT_SCHEMA,
1140        payload,
1141    )
1142    .await;
1143}
1144
1145/// CH-07 (#1878): extract `ChannelMatchBatchInfo` from the transcript
1146/// `batch_summary` JSON the dispatcher already builds for batched
1147/// triggers. Returns `None` for non-batched dispatch.
1148fn batch_info_from_summary(
1149    batch_summary: Option<&serde_json::Value>,
1150) -> Option<ChannelMatchBatchInfo> {
1151    let summary = batch_summary?.as_object()?;
1152    let count = summary
1153        .get("count")
1154        .and_then(|v| v.as_u64())
1155        .map(|n| n as usize)?;
1156    let constituent_event_ids = summary
1157        .get("constituent_event_ids")
1158        .and_then(|v| v.as_array())
1159        .map(|arr| {
1160            arr.iter()
1161                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1162                .collect()
1163        })
1164        .unwrap_or_default();
1165    Some(ChannelMatchBatchInfo {
1166        count,
1167        constituent_event_ids,
1168    })
1169}
1170
1171fn emitted_by(context: &ChannelContext) -> String {
1172    context
1173        .worker_id
1174        .clone()
1175        .or_else(|| context.agent_session_id.clone())
1176        .or_else(|| context.task_id.clone())
1177        .unwrap_or_else(|| "harn".to_string())
1178}
1179
1180fn retention_for_scope(scope: ChannelScope) -> &'static str {
1181    match scope {
1182        ChannelScope::Session => "in_process_session",
1183        ChannelScope::Pipeline => "pipeline_event_log",
1184        ChannelScope::Tenant => "tenant_event_log",
1185        ChannelScope::Org => "org_event_log",
1186    }
1187}
1188
1189fn receipt_value(
1190    topic: &Topic,
1191    event_id: EventId,
1192    event: &LogEvent,
1193    inserted: bool,
1194) -> Result<serde_json::Value, VmError> {
1195    let record = stored_record(event)?;
1196    Ok(serde_json::json!({
1197        "event_id": event_id,
1198        "cursor": event_id,
1199        "id": record.id,
1200        "name": record.name,
1201        "name_resolved": record.name,
1202        "scope": record.scope,
1203        "scope_id": record.scope_id,
1204        "payload": record.payload,
1205        "emitted_at": record.emitted_at,
1206        "emitted_by": record.emitted_by,
1207        "pipeline_id": record.pipeline_id,
1208        "session_id": record.session_id,
1209        "tenant_id": record.tenant_id,
1210        "retention": record.retention,
1211        "ttl_ms": record.ttl_ms,
1212        "topic": topic.as_str(),
1213        "inserted": inserted,
1214        "duplicate": !inserted,
1215    }))
1216}
1217
1218fn event_value(
1219    topic: &Topic,
1220    event_id: EventId,
1221    event: LogEvent,
1222) -> Result<serde_json::Value, VmError> {
1223    let record = stored_record(&event)?;
1224    Ok(serde_json::json!({
1225        "event_id": event_id,
1226        "cursor": event_id,
1227        "topic": topic.as_str(),
1228        "kind": event.kind,
1229        "headers": event.headers,
1230        "occurred_at_ms": event.occurred_at_ms,
1231        "id": record.id,
1232        "name": record.name,
1233        "name_resolved": record.name,
1234        "scope": record.scope,
1235        "scope_id": record.scope_id,
1236        "payload": record.payload,
1237        "emitted_at": record.emitted_at,
1238        "emitted_by": record.emitted_by,
1239        "pipeline_id": record.pipeline_id,
1240        "session_id": record.session_id,
1241        "tenant_id": record.tenant_id,
1242        "retention": record.retention,
1243        "ttl_ms": record.ttl_ms,
1244    }))
1245}
1246
1247fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1248    serde_json::from_value(event.payload.clone()).map_err(|error| {
1249        VmError::Runtime(format!(
1250            "channel event store contained malformed channel payload: {error}"
1251        ))
1252    })
1253}
1254
1255fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1256    let Some(value) = value else {
1257        return Ok(ChannelOptions::default());
1258    };
1259    match value {
1260        VmValue::Nil => Ok(ChannelOptions::default()),
1261        VmValue::Dict(options) => Ok(ChannelOptions {
1262            scope: option_string(options, "scope", builtin)?
1263                .map(|scope| ChannelScope::parse(&scope))
1264                .transpose()
1265                .map_err(VmError::from)?,
1266            id: option_string(options, "id", builtin)?,
1267            tenant_id: option_string(options, "tenant_id", builtin)?,
1268            session_id: option_string(options, "session_id", builtin)?,
1269            pipeline_id: option_string(options, "pipeline_id", builtin)?,
1270            from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1271                .or(option_non_negative_int(options, "cursor", builtin)?)
1272                .map(|value| value as EventId),
1273            limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1274            ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1275        }),
1276        other => Err(VmError::TypeError(format!(
1277            "{builtin}: options must be a dict or nil, got {}",
1278            other.type_name()
1279        ))),
1280    }
1281}
1282
1283fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1284    match value {
1285        Some(VmValue::String(value)) => Ok(value.to_string()),
1286        Some(other) => Err(VmError::TypeError(format!(
1287            "{builtin}: {name} must be a string, got {}",
1288            other.type_name()
1289        ))),
1290        None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1291    }
1292}
1293
1294fn option_string(
1295    options: &BTreeMap<String, VmValue>,
1296    key: &str,
1297    builtin: &str,
1298) -> Result<Option<String>, VmError> {
1299    match options.get(key) {
1300        None | Some(VmValue::Nil) => Ok(None),
1301        Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1302        Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1303            "{builtin}: options.{key} cannot be empty"
1304        ))),
1305        Some(other) => Err(VmError::TypeError(format!(
1306            "{builtin}: options.{key} must be a string or nil, got {}",
1307            other.type_name()
1308        ))),
1309    }
1310}
1311
1312fn option_non_negative_int(
1313    options: &BTreeMap<String, VmValue>,
1314    key: &str,
1315    builtin: &str,
1316) -> Result<Option<u64>, VmError> {
1317    match options.get(key) {
1318        None | Some(VmValue::Nil) => Ok(None),
1319        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1320        Some(other) => Err(VmError::TypeError(format!(
1321            "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1322            other.type_name()
1323        ))),
1324    }
1325}
1326
1327fn option_duration_ms(
1328    options: &BTreeMap<String, VmValue>,
1329    key: &str,
1330    builtin: &str,
1331) -> Result<Option<i64>, VmError> {
1332    match options.get(key) {
1333        None | Some(VmValue::Nil) => Ok(None),
1334        Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1335        Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1336        Some(other) => Err(VmError::TypeError(format!(
1337            "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1338            other.type_name()
1339        ))),
1340    }
1341}
1342
1343fn dict_string(values: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
1344    match values.get(key) {
1345        Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1346        _ => None,
1347    }
1348}
1349
1350fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1351    VmError::Runtime(format!("channel event log: {error}"))
1352}
1353
1354/// CH-06 (#1877): RAII guard around channel emit/match tracing spans.
1355///
1356/// Mirrors `PoolSpanGuard` (PL-06 / #1891): opens both a thread-local
1357/// Harn span (visible to `trace_spans()`) and an OTel `tracing::Span`
1358/// (visible to the exporter), wires OTel span links via
1359/// `crate::observability::otel::set_span_link`, and closes them both on
1360/// `end()` / `Drop`. Disabled-tracing path is a no-op because
1361/// `crate::tracing::span_start_*` returns id 0 and short-circuits.
1362struct ChannelSpanGuard {
1363    span_id: u64,
1364    otel_span: tracing::Span,
1365}
1366
1367impl ChannelSpanGuard {
1368    fn start(
1369        kind: crate::tracing::SpanKind,
1370        name: String,
1371        links: Vec<crate::tracing::SpanLink>,
1372    ) -> Self {
1373        Self::start_with_parenting(kind, name, links, true)
1374    }
1375
1376    fn start_detached(
1377        kind: crate::tracing::SpanKind,
1378        name: String,
1379        links: Vec<crate::tracing::SpanLink>,
1380    ) -> Self {
1381        Self::start_with_parenting(kind, name, links, false)
1382    }
1383
1384    fn start_with_parenting(
1385        kind: crate::tracing::SpanKind,
1386        name: String,
1387        links: Vec<crate::tracing::SpanLink>,
1388        inherit_parent: bool,
1389    ) -> Self {
1390        let span_id = if inherit_parent {
1391            crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1392        } else {
1393            crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1394        };
1395        let otel_span = tracing::info_span!(
1396            target: "harn.vm.channel",
1397            "harn.channel",
1398            harn.kind = kind.as_str(),
1399            harn.name = %name,
1400        );
1401        for link in links {
1402            let trace_id = crate::TraceId(link.trace_id);
1403            let mut attributes: std::collections::HashMap<String, String> =
1404                link.attributes.into_iter().collect();
1405            attributes
1406                .entry("harn.link.kind".to_string())
1407                .or_insert_with(|| "channel_emit".to_string());
1408            let _ = crate::observability::otel::set_span_link(
1409                &otel_span,
1410                &trace_id,
1411                &link.span_id,
1412                Some(attributes),
1413            );
1414        }
1415        Self { span_id, otel_span }
1416    }
1417
1418    fn link(&self) -> Option<crate::tracing::SpanLink> {
1419        crate::observability::otel::current_span_context_hex(&self.otel_span)
1420            .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1421            .or_else(|| crate::tracing::span_link(self.span_id))
1422    }
1423
1424    fn set_metadata(&self, key: &str, value: serde_json::Value) {
1425        crate::tracing::span_set_metadata(self.span_id, key, value);
1426    }
1427
1428    fn end(&mut self) {
1429        if self.span_id != 0 {
1430            crate::tracing::span_end(self.span_id);
1431            self.span_id = 0;
1432        }
1433    }
1434}
1435
1436impl Drop for ChannelSpanGuard {
1437    fn drop(&mut self) {
1438        self.end();
1439    }
1440}
1441
1442/// CH-06 (#1877): summarize an emit payload for the transcript event so
1443/// downstream renderers / audit feeds don't have to ingest the full
1444/// payload. Numbers/bools render verbatim; strings are truncated; dicts
1445/// and lists collapse to their top-level field count. Keeps the
1446/// transcript log compact while preserving enough context for human
1447/// inspection.
1448fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1449    const MAX_STRING_LEN: usize = 120;
1450    match payload {
1451        serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1452        serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1453        serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1454        serde_json::Value::String(value) => {
1455            let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1456            let len = value.chars().count();
1457            serde_json::json!({
1458                "kind": "string",
1459                "value": truncated,
1460                "truncated": len > MAX_STRING_LEN,
1461                "length": len,
1462            })
1463        }
1464        serde_json::Value::Array(items) => {
1465            serde_json::json!({"kind": "array", "length": items.len()})
1466        }
1467        serde_json::Value::Object(map) => {
1468            let fields: Vec<&String> = map.keys().take(8).collect();
1469            serde_json::json!({
1470                "kind": "object",
1471                "field_count": map.len(),
1472                "fields": fields,
1473            })
1474        }
1475    }
1476}
1477
1478/// CH-06 (#1877): append a channel transcript lifecycle event onto the
1479/// active event log. No-op when no log is installed (e.g. unit tests
1480/// running outside a VM context) so emission stays infallible from the
1481/// caller's perspective.
1482fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1483    let Some(log) = active_event_log() else {
1484        return;
1485    };
1486    let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1487        return;
1488    };
1489    let event = LogEvent::new(kind, payload);
1490    if tokio::runtime::Handle::try_current().is_ok() {
1491        if let Ok(join) = std::thread::Builder::new()
1492            .name("harn-channel-transcript".to_string())
1493            .spawn(move || {
1494                let _ = futures::executor::block_on(log.append(&topic, event));
1495            })
1496        {
1497            let _ = join.join();
1498        }
1499    } else {
1500        let _ = futures::executor::block_on(log.append(&topic, event));
1501    }
1502}
1503
1504/// CH-06 (#1877): emit the `transcript.channel.emit` lifecycle event the
1505/// moment the durable append succeeds (whether the append was fresh or
1506/// idempotent). Carries the emit span id when tracing is on so the
1507/// transcript log can be stitched against the OTel trace.
1508fn emit_channel_emit_transcript(
1509    record: &StoredChannelEvent,
1510    resolved: &ResolvedChannel,
1511    inserted: bool,
1512    span_id: u64,
1513) {
1514    let payload = serde_json::json!({
1515        "event_id": record.id,
1516        "name": record.name,
1517        "name_resolved": resolved.resolved_name,
1518        "scope": record.scope,
1519        "scope_id": record.scope_id,
1520        "payload_summary": summarize_payload(&record.payload),
1521        "emitted_at": record.emitted_at,
1522        "emitted_at_ms": record.emitted_at.at_ms,
1523        "emitted_by": record.emitted_by,
1524        "session_id": record.session_id,
1525        "pipeline_id": record.pipeline_id,
1526        "tenant_id": record.tenant_id,
1527        "inserted": inserted,
1528        "duplicate": !inserted,
1529        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1530    });
1531    emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1532}
1533
1534/// CH-06 (#1877): emit the `transcript.channel.match` lifecycle event
1535/// just before the dispatcher invokes the handler. Carries the match
1536/// span id and, for batched triggers, the constituent event ids so the
1537/// transcript can render the full batch context inline.
1538#[allow(clippy::too_many_arguments)]
1539fn emit_channel_match_transcript(
1540    trigger_id: &str,
1541    handler_kind: &str,
1542    resolved: &ResolvedChannel,
1543    event_id: &str,
1544    matched_at_ms: i64,
1545    matched_in_session_id: Option<&str>,
1546    span_id: u64,
1547    batch: Option<serde_json::Value>,
1548) {
1549    let mut payload = serde_json::json!({
1550        "event_id": event_id,
1551        "name_resolved": resolved.resolved_name,
1552        "scope": resolved.scope.as_str(),
1553        "scope_id": resolved.scope_id,
1554        "trigger_id": trigger_id,
1555        "handler_kind": handler_kind,
1556        "matched_at_ms": matched_at_ms,
1557        "matched_in_session_id": matched_in_session_id,
1558        "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1559    });
1560    if let Some(batch) = batch {
1561        if let Some(map) = payload.as_object_mut() {
1562            map.insert("batch".to_string(), batch);
1563        }
1564    }
1565    emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1566}
1567
1568/// CH-06 (#1877): collect the originating-emit span links stashed on a
1569/// batch of `TriggerEvent`s. The non-batched dispatch path uses the
1570/// single-event variant directly; the aggregated/batched path threads
1571/// the per-event headers through the buffer and rebuilds the link list
1572/// here so the resulting `ChannelMatch` span multi-links to every
1573/// constituent emit.
1574fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1575    let mut links = Vec::new();
1576    if let (Some(trace_id), Some(span_id)) = (
1577        event.headers.get(EMIT_TRACE_ID_HEADER),
1578        event.headers.get(EMIT_SPAN_ID_HEADER),
1579    ) {
1580        links.push(
1581            crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1582                BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1583            ),
1584        );
1585    }
1586    links
1587}
1588
1589fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1590    let mut links = Vec::new();
1591    for event in events {
1592        links.extend(emit_links_from_event(event));
1593    }
1594    links
1595}
1596
1597fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1598    let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1599    serde_json::json!({
1600        "count": events.len(),
1601        "constituent_event_ids": constituent_ids,
1602    })
1603}
1604
1605/// Parsed channel-source trigger selector.
1606///
1607/// Trigger DSL strings look like `channel:<scope>:<scope-id>:<name>` with
1608/// shorthand forms for tenant-default and session/pipeline scopes. The
1609/// `scope_id_pattern` is `None` for "current" (e.g. `channel:foo` against
1610/// the trigger's tenant) or `Some("*")` for an explicit wildcard
1611/// (`channel:tenant:*:foo`).
1612#[derive(Clone, Debug, PartialEq, Eq)]
1613pub struct ChannelSelector {
1614    scope: ChannelScope,
1615    scope_id_pattern: ScopeIdPattern,
1616    name: String,
1617}
1618
1619#[derive(Clone, Debug, PartialEq, Eq)]
1620enum ScopeIdPattern {
1621    /// Match the current tenant/session/pipeline of the trigger registry
1622    /// (no explicit scope id supplied in the selector string).
1623    Current,
1624    /// Explicit scope id from the selector string.
1625    Exact(String),
1626    /// Wildcard, e.g. `tenant:*:foo` — match any scope id within the
1627    /// trigger's entitled boundary (today: the current tenant only).
1628    Wildcard,
1629}
1630
1631impl ChannelSelector {
1632    /// Parse a `channel:...` trigger source string.
1633    ///
1634    /// Accepted shapes:
1635    /// - `channel:<name>` — tenant scope (current tenant), exact `<name>`
1636    /// - `channel:session:<name>` — session scope, exact `<name>`
1637    /// - `channel:pipeline:<name>` — pipeline scope, exact `<name>`
1638    /// - `channel:tenant:<tenant-id>:<name>` — explicit tenant
1639    /// - `channel:tenant:*:<name>` — tenant wildcard (within entitlement)
1640    /// - `channel:org:<org-id>:<name>` — explicit org (currently disabled)
1641    pub fn parse(input: &str) -> Result<Self, String> {
1642        let input = input.trim();
1643        let rest = input
1644            .strip_prefix("channel:")
1645            .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1646        if rest.is_empty() {
1647            return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1648        }
1649
1650        let (head, tail_opt) = match rest.split_once(':') {
1651            Some((head, tail)) => (head, Some(tail)),
1652            None => (rest, None),
1653        };
1654        let parsed_scope = ChannelScope::parse(head).ok();
1655        match (parsed_scope, tail_opt) {
1656            // `channel:<name>` — tenant default.
1657            (None, _) => {
1658                let name = rest.to_string();
1659                validate_selector_name(&name)?;
1660                Ok(Self {
1661                    scope: ChannelScope::Tenant,
1662                    scope_id_pattern: ScopeIdPattern::Current,
1663                    name,
1664                })
1665            }
1666            (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1667                if !name.is_empty() =>
1668            {
1669                if name.contains(':') {
1670                    return Err(format!(
1671                        "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1672                        scope.as_str()
1673                    ));
1674                }
1675                validate_selector_name(name)?;
1676                Ok(Self {
1677                    scope,
1678                    scope_id_pattern: ScopeIdPattern::Current,
1679                    name: name.to_string(),
1680                })
1681            }
1682            (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1683                if !tail.is_empty() =>
1684            {
1685                let Some((scope_id, name)) = tail.split_once(':') else {
1686                    // `channel:tenant:foo` — treat as `<name>` in tenant default.
1687                    if matches!(scope, ChannelScope::Tenant) {
1688                        validate_selector_name(tail)?;
1689                        return Ok(Self {
1690                            scope,
1691                            scope_id_pattern: ScopeIdPattern::Current,
1692                            name: tail.to_string(),
1693                        });
1694                    }
1695                    return Err(format!(
1696                        "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1697                    ));
1698                };
1699                if scope_id.is_empty() || name.is_empty() {
1700                    return Err(format!(
1701                        "channel selector `{input}`: scope id and name must be non-empty"
1702                    ));
1703                }
1704                validate_selector_name(name)?;
1705                let pattern = if scope_id == "*" {
1706                    ScopeIdPattern::Wildcard
1707                } else {
1708                    ScopeIdPattern::Exact(scope_id.to_string())
1709                };
1710                Ok(Self {
1711                    scope,
1712                    scope_id_pattern: pattern,
1713                    name: name.to_string(),
1714                })
1715            }
1716            (Some(scope), _) => Err(format!(
1717                "channel selector `{input}`: {} scope requires `<name>` segment",
1718                scope.as_str()
1719            )),
1720        }
1721    }
1722
1723    pub fn scope(&self) -> &'static str {
1724        self.scope.as_str()
1725    }
1726
1727    pub fn name(&self) -> &str {
1728        &self.name
1729    }
1730
1731    /// Returns true if the supplied emit (scope, scope_id, name) matches this
1732    /// selector. `current_tenant` lets the matcher resolve the implicit
1733    /// "current tenant" boundary used by both `Current` and `Wildcard` modes.
1734    pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1735        if self.scope.as_str() != scope || self.name != name {
1736            return false;
1737        }
1738        match &self.scope_id_pattern {
1739            ScopeIdPattern::Current => match self.scope {
1740                ChannelScope::Tenant => scope_id == current_tenant,
1741                ChannelScope::Session | ChannelScope::Pipeline => {
1742                    // For session/pipeline, "current" means trigger and emit
1743                    // share a runtime context. In v1 (in-process registry)
1744                    // this is implicit: both producer and consumer run in the
1745                    // same VM, so any scope_id within this scope type matches.
1746                    true
1747                }
1748                ChannelScope::Org => false,
1749            },
1750            ScopeIdPattern::Exact(value) => scope_id == value,
1751            ScopeIdPattern::Wildcard => match self.scope {
1752                ChannelScope::Tenant => true,
1753                // Session/pipeline wildcards aren't entitled yet; org wildcards disabled.
1754                _ => false,
1755            },
1756        }
1757    }
1758}
1759
1760fn validate_selector_name(name: &str) -> Result<(), String> {
1761    if name.trim().is_empty()
1762        || name.contains(':')
1763        || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1764    {
1765        return Err(format!("channel selector name `{name}` is malformed"));
1766    }
1767    Ok(())
1768}
1769
1770/// Dispatch a freshly emitted channel event to any registered triggers whose
1771/// `channel:` source selector matches the (scope, scope_id, name) tuple.
1772///
1773/// This is the consumer side of the `emit_channel` ↔ trigger plumbing
1774/// (CH-02 / #1872). Errors from individual handlers do not abort the
1775/// emit; they surface in the dispatcher's DLQ + retry pipeline.
1776///
1777/// CH-04 (#1875): bindings with `batch { count, window, key, expire_action }`
1778/// route events through an aggregation buffer instead of dispatching one
1779/// event at a time. When the buffer hits `count`, the dispatcher fires
1780/// the handler with a batched event (`event.batch` populated). When the
1781/// `window` elapses with fewer than `count` events, the dispatcher
1782/// either fires a partial batch (default) or discards the buffer.
1783async fn dispatch_channel_emit_to_triggers(
1784    ctx: Option<&crate::vm::AsyncBuiltinCtx>,
1785    resolved: &ResolvedChannel,
1786    payload: ChannelEventPayload,
1787    emit_link: Option<crate::tracing::SpanLink>,
1788) -> Result<(), VmError> {
1789    // Snapshot matching bindings outside of any async work so the registry
1790    // borrow is short-lived.
1791    let bindings = crate::triggers::registry::channel_bindings_matching(
1792        resolved.scope.as_str(),
1793        &resolved.scope_id,
1794        &payload.name,
1795    );
1796
1797    // CH-04 (#1875): flush any aggregation buffers whose window has
1798    // elapsed BEFORE this emit so old batches go out in order. The
1799    // implicit sweep runs even for emits that don't match any binding so
1800    // a stale buffer can't outlive the trigger lifecycle. Tests can also
1801    // call `flush_trigger_aggregations()` directly for deterministic
1802    // window-expire coverage.
1803    flush_expired_aggregations_inner(ctx).await;
1804
1805    if bindings.is_empty() {
1806        return Ok(());
1807    }
1808    let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1809        // No host VM (e.g. raw test path); nothing to dispatch.
1810        return Ok(());
1811    };
1812    let log = active_event_log()
1813        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1814    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1815    for binding in bindings {
1816        // Filter (#1872 acceptance): JSON-path-equality on payload.
1817        // Applied BEFORE aggregation so the buffer only collects events
1818        // the handler actually cares about.
1819        if let Some(filter_str) = binding.filter.as_ref() {
1820            if !channel_filter_matches(filter_str, &payload.payload) {
1821                continue;
1822            }
1823        }
1824        let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1825
1826        // CH-04 (#1875): aggregation path. When the binding declared
1827        // `batch`, accumulate into the per-(binding, partition_key)
1828        // buffer; only dispatch once the threshold is reached.
1829        if let Some(aggregation_config) = binding.aggregation.as_ref() {
1830            let partition_key = crate::triggers::aggregation::partition_key_for_event(
1831                aggregation_config,
1832                &payload.payload,
1833            );
1834            let binding_key = binding.binding_key();
1835            let outcome = crate::triggers::aggregation::accumulate(
1836                &binding_key,
1837                aggregation_config,
1838                partition_key.as_deref(),
1839                event,
1840            );
1841            if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1842                // CH-06 (#1877): the ChannelMatch span for a batched
1843                // trigger multi-links to ALL constituent ChannelEmit
1844                // spans so the trace tree shows the aggregation fan-in.
1845                let links = emit_links_from_batch(&events);
1846                let batch_summary = batch_summary_for_transcript(&events);
1847                let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1848                {
1849                    Ok(batched) => batched,
1850                    Err(error) => {
1851                        return Err(VmError::Runtime(format!(
1852                            "emit_channel aggregation batch: {error}"
1853                        )));
1854                    }
1855                };
1856                fire_channel_match(
1857                    &dispatcher,
1858                    binding.clone(),
1859                    batched,
1860                    resolved,
1861                    links,
1862                    Some(batch_summary),
1863                )
1864                .await;
1865            }
1866            continue;
1867        }
1868
1869        let links = emit_links_from_event(&event);
1870        fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1871    }
1872    Ok(())
1873}
1874
1875/// CH-06 (#1877): open a `ChannelMatch` span, emit the transcript
1876/// lifecycle event, and dispatch the trigger handler. The span links
1877/// back to the originating ChannelEmit span (multi-link for batched
1878/// triggers) via `set_span_link` from P-05 (#1858).
1879async fn fire_channel_match(
1880    dispatcher: &crate::triggers::Dispatcher,
1881    binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1882    event: TriggerEvent,
1883    resolved: &ResolvedChannel,
1884    links: Vec<crate::tracing::SpanLink>,
1885    batch_summary: Option<serde_json::Value>,
1886) {
1887    let trigger_id = binding.id.as_str().to_string();
1888    let handler_kind = binding.handler.kind().to_string();
1889    // CH-07 (#1878): use the channel emit's id (carried as the trigger
1890    // event's `dedupe_key`) so the match receipt links back to the
1891    // original `ChannelEmitReceipt.event_id`. `TriggerEvent::new` mints
1892    // a fresh `trigger_evt_*` id for its own `event.id` field that does
1893    // not correlate to the emit chain.
1894    let event_id = if event.dedupe_key.is_empty() {
1895        event.id.0.clone()
1896    } else {
1897        event.dedupe_key.clone()
1898    };
1899    let mut match_span = ChannelSpanGuard::start_detached(
1900        crate::tracing::SpanKind::ChannelMatch,
1901        format!("channel.match {}", resolved.resolved_name),
1902        links,
1903    );
1904    match_span.set_metadata("event_id", serde_json::json!(event_id));
1905    match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
1906    match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
1907    match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
1908    if let Some(summary) = batch_summary.as_ref() {
1909        match_span.set_metadata("batch", summary.clone());
1910    }
1911    let span_id = crate::tracing::current_span_id().unwrap_or(0);
1912    let matched_at_ms = harn_clock::offset_datetime_to_ms(crate::clock_mock::now_utc());
1913    let matched_in_session_id = crate::agent_sessions::current_session_id()
1914        .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
1915    emit_channel_match_transcript(
1916        &trigger_id,
1917        &handler_kind,
1918        resolved,
1919        &event_id,
1920        matched_at_ms,
1921        matched_in_session_id.as_deref(),
1922        span_id,
1923        batch_summary.clone(),
1924    );
1925    // CH-07 (#1878): capture the dispatch outcome BEFORE writing the
1926    // match receipt so the receipt records the recorded handler result
1927    // (succeeded/failed/dlq/...) — replay tooling treats the receipt as
1928    // the cached match: on replay the dispatcher looks up the receipt
1929    // by `event_id` instead of re-evaluating the filter spec.
1930    let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
1931    let binding_key = binding.binding_key();
1932    let batch_info = batch_info_from_summary(batch_summary.as_ref());
1933    record_channel_match_receipt(
1934        &trigger_id,
1935        &binding_key,
1936        &handler_kind,
1937        resolved,
1938        &event_id,
1939        matched_in_session_id.as_deref(),
1940        batch_info,
1941        span_id,
1942        &dispatch_outcome,
1943    )
1944    .await;
1945    // Pre-CH-07 the dispatch error was discarded with `let _ = ...`. The
1946    // CH-07 match receipt now records the failure with
1947    // `dispatch_failed: true` so audit consumers don't lose the signal —
1948    // we keep the same fire-and-forget callsite semantics here.
1949    drop(dispatch_outcome);
1950    match_span.end();
1951}
1952
1953/// Drain all expired aggregation buffers and dispatch them. Exposed as
1954/// the `flush_trigger_aggregations()` builtin so Harn scripts (and the
1955/// production runtime) can deterministically advance window-expire
1956/// processing.
1957pub(crate) async fn flush_expired_aggregations_inner(ctx: Option<&crate::vm::AsyncBuiltinCtx>) {
1958    let expirations = crate::triggers::aggregation::drain_expired_aggregations();
1959    if expirations.is_empty() {
1960        return;
1961    }
1962    let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1963        return;
1964    };
1965    let log = active_event_log()
1966        .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1967    let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1968    for expired in expirations {
1969        if matches!(
1970            expired.action,
1971            crate::triggers::aggregation::ExpireAction::Discard
1972        ) {
1973            continue;
1974        }
1975        // Resolve the binding by parsing the binding_key (id@vN). Skip
1976        // if the binding has been terminated since the buffer was opened.
1977        let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
1978            continue;
1979        };
1980        let Ok(version) = version_str.parse::<u32>() else {
1981            continue;
1982        };
1983        let Ok(binding) =
1984            crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
1985        else {
1986            continue;
1987        };
1988        // CH-06 (#1877): rebuild the channel-resolved metadata from the
1989        // first buffered event so the ChannelMatch span carries the
1990        // right scope/name even for window-expire flushes.
1991        let resolved_for_match = resolved_from_first_event(&expired.events);
1992        let links = emit_links_from_batch(&expired.events);
1993        let batch_summary = batch_summary_for_transcript(&expired.events);
1994        let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
1995        {
1996            Ok(batched) => batched,
1997            Err(_) => continue,
1998        };
1999        match resolved_for_match {
2000            Some(resolved) => {
2001                fire_channel_match(
2002                    &dispatcher,
2003                    binding,
2004                    batched,
2005                    &resolved,
2006                    links,
2007                    Some(batch_summary),
2008                )
2009                .await;
2010            }
2011            None => {
2012                let _ = dispatcher.dispatch(&binding, batched).await;
2013            }
2014        }
2015    }
2016}
2017
2018/// CH-06 (#1877): synthesize a `ResolvedChannel` from the first buffered
2019/// trigger event when a window-expire flush dispatches without going
2020/// back through `resolve_channel`. Returns `None` if the event payload
2021/// isn't a known channel payload (defensive — should not happen in
2022/// practice since the dispatcher only buffers channel events).
2023fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2024    let first = events.first()?;
2025    let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2026    else {
2027        return None;
2028    };
2029    let scope = ChannelScope::parse(&payload.scope).ok()?;
2030    let topic = Topic::new(format!(
2031        "channels.{}.{}.{}",
2032        payload.scope,
2033        sanitize_topic_component(&payload.scope_id),
2034        sanitize_topic_component(&payload.name),
2035    ))
2036    .ok()?;
2037    Some(ResolvedChannel {
2038        scope,
2039        scope_id: payload.scope_id.clone(),
2040        resolved_name: payload.name_resolved.clone(),
2041        topic,
2042        retention: retention_for_scope(scope),
2043    })
2044}
2045
2046fn build_channel_trigger_event(
2047    payload: &ChannelEventPayload,
2048    emit_link: Option<&crate::tracing::SpanLink>,
2049) -> TriggerEvent {
2050    let mut event = TriggerEvent::new(
2051        ProviderId::from("channel"),
2052        "channel.emit",
2053        None,
2054        payload.id.clone(),
2055        payload.tenant_id.clone().map(TenantId::new),
2056        BTreeMap::new(),
2057        ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2058        SignatureStatus::Unsigned,
2059    );
2060    event.headers.insert(
2061        "harn_channel_name".to_string(),
2062        payload.name_resolved.clone(),
2063    );
2064    event
2065        .headers
2066        .insert("harn_channel_scope".to_string(), payload.scope.clone());
2067    event.headers.insert(
2068        "harn_channel_scope_id".to_string(),
2069        payload.scope_id.clone(),
2070    );
2071    // CH-06 (#1877): stash the ChannelEmit span coordinates on the trigger
2072    // event so the downstream ChannelMatch span can link back via
2073    // `set_span_link` — even after travelling through an aggregation
2074    // buffer or being serialized into a batched envelope.
2075    if let Some(link) = emit_link {
2076        event
2077            .headers
2078            .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2079        event
2080            .headers
2081            .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2082    }
2083    event
2084}
2085
2086/// Evaluate the trigger filter spec (CH-02 / #1872) against the channel payload.
2087///
2088/// Supported syntax v1: JSON dict (`{"repo": "harn"}`) — each key is a
2089/// dot-path into the payload that must equality-match the value. Missing
2090/// path = no match. Non-dict filter strings are treated as no-op (return
2091/// true) so we don't regress pre-existing trigger `filter:` semantics that
2092/// reuse this field for other purposes.
2093fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2094    let trimmed = filter_raw.trim();
2095    if trimmed.is_empty() {
2096        return true;
2097    }
2098    let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2099        Ok(value) => value,
2100        Err(_) => return true,
2101    };
2102    let Some(map) = parsed.as_object() else {
2103        return true;
2104    };
2105    map.iter()
2106        .all(|(key, expected)| match payload_path(payload, key) {
2107            Some(actual) => actual == expected,
2108            None => false,
2109        })
2110}
2111
2112fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2113    let mut current = value;
2114    for segment in path.split('.') {
2115        if segment.is_empty() {
2116            return None;
2117        }
2118        current = match current {
2119            serde_json::Value::Object(map) => map.get(segment)?,
2120            _ => return None,
2121        };
2122    }
2123    Some(current)
2124}
2125
2126#[cfg(test)]
2127mod tests;