1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7
8use crate::event_log::{
9 active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
10 EventId, EventLog, LogEvent, Topic,
11};
12use crate::llm::vm_value_to_json;
13use crate::runtime_limits::RuntimeLimits;
14use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
15use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
16use crate::value::{VmError, VmValue};
17
18const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
19const CHANNEL_EVENT_KIND: &str = "channel.emit";
20const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
21const NAME_HEADER: &str = "harn.channel.name";
22const SCOPE_HEADER: &str = "harn.channel.scope";
23const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
24const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
25
26pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
30pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
31pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
32
33pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
40pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
41pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
42const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
45const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
46
47pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
52pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
53const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
54
55const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
60const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
61
62static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
63static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67enum ChannelScope {
68 Session,
69 Pipeline,
70 Tenant,
71 Org,
72}
73
74impl ChannelScope {
75 fn parse(value: &str) -> Result<Self, ChannelError> {
76 match value.trim() {
77 "session" => Ok(Self::Session),
78 "pipeline" => Ok(Self::Pipeline),
79 "tenant" => Ok(Self::Tenant),
80 "org" => Ok(Self::Org),
81 other => Err(ChannelError::malformed(format!(
82 "HARN-CHN-003 malformed channel scope '{other}'"
83 ))),
84 }
85 }
86
87 fn as_str(self) -> &'static str {
88 match self {
89 Self::Session => "session",
90 Self::Pipeline => "pipeline",
91 Self::Tenant => "tenant",
92 Self::Org => "org",
93 }
94 }
95}
96
97#[derive(Clone, Debug, Default)]
98struct ChannelContext {
99 task_id: Option<String>,
100 root_task_id: Option<String>,
101 scope_id: Option<String>,
102 workflow_id: Option<String>,
103 run_id: Option<String>,
104 worker_id: Option<String>,
105 agent_session_id: Option<String>,
106 root_agent_session_id: Option<String>,
107 tenant_id: Option<String>,
108}
109
110#[derive(Clone, Debug, Default)]
111struct ChannelOptions {
112 scope: Option<ChannelScope>,
113 id: Option<String>,
114 tenant_id: Option<String>,
115 session_id: Option<String>,
116 pipeline_id: Option<String>,
117 from_cursor: Option<EventId>,
118 limit: Option<usize>,
119 ttl_ms: Option<i64>,
120}
121
122#[derive(Clone, Debug)]
123struct ResolvedChannel {
124 scope: ChannelScope,
125 scope_id: String,
126 resolved_name: String,
127 topic: Topic,
128 retention: &'static str,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct SignedTimestamp {
133 pub at_ms: i64,
134 pub at: String,
135 pub algorithm: String,
136 pub key_id: String,
137 pub signature: String,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141struct StoredChannelEvent {
142 id: String,
143 name: String,
144 payload: serde_json::Value,
145 emitted_at: SignedTimestamp,
146 emitted_by: String,
147 scope: String,
148 scope_id: String,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pipeline_id: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 session_id: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 tenant_id: Option<String>,
155 retention: String,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 ttl_ms: Option<i64>,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
171pub struct ChannelEmitReceipt {
172 pub event_id: String,
173 pub name_resolved: String,
174 pub scope: String,
175 pub scope_id: String,
176 pub payload_hash: String,
177 pub payload: serde_json::Value,
178 pub emitted_at: SignedTimestamp,
179 pub emitted_by: String,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub pipeline_id: Option<String>,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub session_id: Option<String>,
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub tenant_id: Option<String>,
186 pub topic: String,
187 pub inserted: bool,
188 #[serde(skip_serializing_if = "Option::is_none")]
189 pub span_id: Option<u64>,
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
202pub struct ChannelMatchReceipt {
203 pub event_id: String,
204 pub trigger_id: String,
205 pub binding_key: String,
206 pub name_resolved: String,
207 pub scope: String,
208 pub scope_id: String,
209 pub matched_at: SignedTimestamp,
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub matched_in_session_id: Option<String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub batch: Option<ChannelMatchBatchInfo>,
214 pub handler_kind: String,
215 pub handler_result: ChannelMatchResultSummary,
216 #[serde(skip_serializing_if = "Option::is_none")]
217 pub span_id: Option<u64>,
218}
219
220#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224pub struct ChannelMatchBatchInfo {
225 pub count: usize,
226 pub constituent_event_ids: Vec<String>,
227}
228
229#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
234pub struct ChannelMatchResultSummary {
235 pub status: String,
236 pub attempt_count: u32,
237 #[serde(skip_serializing_if = "Option::is_none")]
238 pub error: Option<String>,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub dispatch_failed: Option<bool>,
241}
242
243impl ChannelMatchResultSummary {
244 fn from_dispatch(
245 outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
246 ) -> Self {
247 match outcome {
248 Ok(outcome) => Self {
249 status: outcome.status.as_str().to_string(),
250 attempt_count: outcome.attempt_count,
251 error: outcome.error.clone(),
252 dispatch_failed: None,
253 },
254 Err(error) => Self {
255 status: "dispatch_error".to_string(),
256 attempt_count: 0,
257 error: Some(error.to_string()),
258 dispatch_failed: Some(true),
259 },
260 }
261 }
262}
263
264#[derive(Debug)]
265struct ChannelError(String);
266
267impl ChannelError {
268 fn missing_pipeline() -> Self {
269 Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
270 }
271
272 fn cross_tenant(message: impl Into<String>) -> Self {
273 Self(format!("HARN-CHN-002 {}", message.into()))
274 }
275
276 fn malformed(message: impl Into<String>) -> Self {
277 Self(message.into())
278 }
279
280 fn scope_ambiguous(message: impl Into<String>) -> Self {
281 Self(format!("HARN-CHN-004 {}", message.into()))
282 }
283}
284
285impl From<ChannelError> for VmError {
286 fn from(error: ChannelError) -> Self {
287 VmError::Runtime(error.0)
288 }
289}
290
291pub fn reset_channel_state() {
292 if let Some(slot) = SESSION_CHANNEL_LOG.get() {
293 *slot.lock().expect("channel session log poisoned") = None;
294 }
295 crate::channel_guardrails::clear();
299}
300
301pub(crate) async fn emit_channel_from_vm(args: Vec<VmValue>) -> Result<VmValue, VmError> {
302 let name = required_string(args.first(), "emit_channel", "name")?;
303 let payload = vm_value_to_json(
304 args.get(1)
305 .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
306 );
307 let options = parse_options(args.get(2), "emit_channel")?;
308 let context = ChannelContext::current();
309 let resolved = resolve_channel(&name, &options, &context)?;
310 let event_id = options
311 .id
312 .clone()
313 .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
314 let emitted_by = emitted_by(&context);
315 let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
316 let occurred_at_ms = emitted_at.at_ms;
317
318 let guardrail_context = serde_json::json!({
325 "name": name,
326 "name_resolved": resolved.resolved_name,
327 "scope": resolved.scope.as_str(),
328 "scope_id": resolved.scope_id,
329 "event_id": event_id,
330 "emitted_by": emitted_by,
331 });
332 let decision =
333 crate::channel_guardrails::evaluate(&payload, &guardrail_context, &resolved.resolved_name)
334 .await?;
335 if matches!(
336 decision.verdict,
337 crate::channel_guardrails::Verdict::Block { .. }
338 ) {
339 return handle_blocked_emit(
340 &name,
341 &resolved,
342 &event_id,
343 &emitted_by,
344 &emitted_at,
345 &payload,
346 &decision,
347 )
348 .await;
349 }
350 record_guardrail_warnings(
351 &resolved,
352 &event_id,
353 &emitted_by,
354 &payload,
355 decision.fired.as_slice(),
356 )
357 .await;
358 let record = StoredChannelEvent {
359 id: event_id.clone(),
360 name: resolved.resolved_name.clone(),
361 payload,
362 emitted_at,
363 emitted_by: emitted_by.clone(),
364 scope: resolved.scope.as_str().to_string(),
365 scope_id: resolved.scope_id.clone(),
366 pipeline_id: context.pipeline_id_for_receipt(&resolved),
367 session_id: context.session_id_for_receipt(&resolved),
368 tenant_id: context.tenant_id_for_receipt(&resolved),
369 retention: resolved.retention.to_string(),
370 ttl_ms: options.ttl_ms,
371 };
372
373 let mut emit_span = ChannelSpanGuard::start(
378 crate::tracing::SpanKind::ChannelEmit,
379 format!("channel.emit {}", resolved.resolved_name),
380 Vec::new(),
381 );
382 emit_span.set_metadata("event_id", serde_json::json!(record.id));
383 emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
384 emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
385 emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
386 emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
387 let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
388 let emit_link = emit_span.link();
389
390 let mut headers = BTreeMap::new();
391 headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
392 headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
393 headers.insert(
394 SCOPE_HEADER.to_string(),
395 resolved.scope.as_str().to_string(),
396 );
397 headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
398 headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
399
400 let log = log_for_scope(resolved.scope);
401 let mut log_event = LogEvent::new(
402 CHANNEL_EVENT_KIND,
403 serde_json::to_value(&record)
404 .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
405 )
406 .with_headers(headers);
407 log_event.occurred_at_ms = occurred_at_ms;
408 let outcome = log
409 .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
410 .await
411 .map_err(channel_log_error)?;
412 let receipt = receipt_value(
413 &resolved.topic,
414 outcome.event_id,
415 &outcome.event,
416 outcome.inserted,
417 )?;
418 emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
421 record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
426 if outcome.inserted {
430 let payload_json = outcome
431 .event
432 .payload
433 .get("payload")
434 .cloned()
435 .unwrap_or(serde_json::Value::Null);
436 let context_for_fanout = ChannelContext::current();
437 let fanout_payload = ChannelEventPayload {
438 id: event_id.clone(),
439 name: parse_name(&name)
440 .map(|parsed| parsed.name)
441 .unwrap_or_else(|_| resolved.resolved_name.clone()),
442 name_resolved: resolved.resolved_name.clone(),
443 scope: resolved.scope.as_str().to_string(),
444 scope_id: resolved.scope_id.clone(),
445 payload: payload_json,
446 emitted_by: emitted_by.clone(),
447 tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
448 session_id: context_for_fanout.session_id_for_receipt(&resolved),
449 pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
450 };
451 dispatch_channel_emit_to_triggers(&resolved, fanout_payload, emit_link).await?;
452 }
453 emit_span.end();
454 Ok(crate::stdlib::json_to_vm_value(&receipt))
455}
456
457pub(crate) async fn channel_events_from_vm(args: Vec<VmValue>) -> Result<VmValue, VmError> {
458 let name = required_string(args.first(), "channel_events", "name")?;
459 let options = parse_options(args.get(1), "channel_events")?;
460 let context = ChannelContext::current();
461 let resolved = resolve_channel(&name, &options, &context)?;
462 let events = log_for_scope(resolved.scope)
463 .read_range(
464 &resolved.topic,
465 options.from_cursor,
466 options.limit.unwrap_or(usize::MAX),
467 )
468 .await
469 .map_err(channel_log_error)?;
470 let values = events
471 .into_iter()
472 .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
473 .collect::<Result<Vec<_>, _>>()?;
474 Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
475 values,
476 )))
477}
478
479impl ChannelContext {
480 fn current() -> Self {
481 let mut context = Self::default();
482 if let Some(vm) = crate::vm::clone_async_builtin_child_vm() {
483 context.task_id = Some(vm.runtime_context.task_id.clone());
484 context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
485 context.scope_id = vm.runtime_context.scope_id.clone();
486 if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
487 context.workflow_id = dict_string(&values, "workflow_id");
488 context.run_id = dict_string(&values, "run_id");
489 context.worker_id = dict_string(&values, "worker_id");
490 context.agent_session_id = dict_string(&values, "agent_session_id");
491 context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
492 context.tenant_id = dict_string(&values, "tenant_id");
493 }
494 }
495 context.agent_session_id = context
496 .agent_session_id
497 .or_else(crate::agent_sessions::current_session_id);
498 context
499 }
500
501 fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
502 if let Some(requested) = options.session_id.as_deref() {
506 if let Some(active) = self.agent_session_id.as_deref() {
507 if active != requested {
508 return Err(ChannelError::scope_ambiguous(format!(
509 "session scope ambiguous: options.session_id '{requested}' \
510 conflicts with active session '{active}'"
511 )));
512 }
513 }
514 }
515 Ok(options
516 .session_id
517 .clone()
518 .or_else(|| self.agent_session_id.clone())
519 .or_else(|| self.root_agent_session_id.clone())
520 .or_else(|| self.scope_id.clone())
521 .or_else(|| self.root_task_id.clone())
522 .unwrap_or_else(|| "session".to_string()))
523 }
524
525 fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
526 let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
530 if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
531 {
532 if requested != active {
533 return Err(ChannelError::scope_ambiguous(format!(
534 "pipeline scope ambiguous: options.pipeline_id '{requested}' \
535 conflicts with active pipeline '{active}'"
536 )));
537 }
538 }
539 options
540 .pipeline_id
541 .clone()
542 .or(active)
543 .ok_or_else(ChannelError::missing_pipeline)
544 }
545
546 fn tenant_id(
547 &self,
548 options: &ChannelOptions,
549 requested: Option<&str>,
550 ) -> Result<String, ChannelError> {
551 let current = self.tenant_id.as_deref();
552 let requested = requested
553 .map(ToOwned::to_owned)
554 .or_else(|| options.tenant_id.clone());
555 if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
556 if current != requested {
557 return Err(ChannelError::cross_tenant(format!(
558 "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
559 )));
560 }
561 }
562 Ok(requested
563 .or_else(|| self.tenant_id.clone())
564 .unwrap_or_else(|| "default".to_string()))
565 }
566
567 fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
568 match resolved.scope {
569 ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
570 _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
571 }
572 }
573
574 fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
575 match resolved.scope {
576 ChannelScope::Session => Some(resolved.scope_id.clone()),
577 _ => self
578 .agent_session_id
579 .clone()
580 .or_else(|| self.root_agent_session_id.clone()),
581 }
582 }
583
584 fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
585 match resolved.scope {
586 ChannelScope::Tenant => Some(resolved.scope_id.clone()),
587 _ => self.tenant_id.clone(),
588 }
589 }
590}
591
592fn resolve_channel(
593 raw_name: &str,
594 options: &ChannelOptions,
595 context: &ChannelContext,
596) -> Result<ResolvedChannel, ChannelError> {
597 let parsed = parse_name(raw_name)?;
598 if let Some(option_scope) = options.scope {
599 if let Some(prefix_scope) = parsed.scope {
600 if prefix_scope != option_scope {
601 return Err(ChannelError::malformed(format!(
602 "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
603 prefix_scope.as_str(),
604 option_scope.as_str()
605 )));
606 }
607 }
608 }
609
610 let scope = parsed
611 .scope
612 .or(options.scope)
613 .unwrap_or(ChannelScope::Tenant);
614 if scope == ChannelScope::Org {
615 return Err(ChannelError::cross_tenant(
616 "org-scoped channels are disabled until org grants are available",
617 ));
618 }
619
620 validate_channel_name(&parsed.name)?;
621 let scope_id = match scope {
622 ChannelScope::Session => match parsed.scope_id.clone() {
623 Some(id) => id,
624 None => context.session_id(options)?,
625 },
626 ChannelScope::Pipeline => context.pipeline_id(options)?,
627 ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
628 ChannelScope::Org => unreachable!("org scope returned above"),
629 };
630 validate_scope_id(scope, &scope_id)?;
631 let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
632 let topic = Topic::new(format!(
633 "channels.{}.{}.{}",
634 scope.as_str(),
635 sanitize_topic_component(&scope_id),
636 sanitize_topic_component(&parsed.name)
637 ))
638 .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
639 Ok(ResolvedChannel {
640 scope,
641 scope_id,
642 resolved_name,
643 topic,
644 retention: retention_for_scope(scope),
645 })
646}
647
648#[derive(Clone, Debug)]
649struct ParsedName {
650 scope: Option<ChannelScope>,
651 scope_id: Option<String>,
652 name: String,
653}
654
655fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
656 let raw_name = raw_name.trim();
657 if raw_name.is_empty() {
658 return Err(ChannelError::malformed(
659 "HARN-CHN-003 channel name cannot be empty",
660 ));
661 }
662 let Some((prefix, rest)) = raw_name.split_once(':') else {
663 return Ok(ParsedName {
664 scope: None,
665 scope_id: None,
666 name: raw_name.to_string(),
667 });
668 };
669 let scope = ChannelScope::parse(prefix)?;
670 match scope {
671 ChannelScope::Session | ChannelScope::Pipeline => {
672 if rest.is_empty() || rest.contains(':') {
673 return Err(ChannelError::malformed(format!(
674 "HARN-CHN-003 malformed {} channel name '{raw_name}'",
675 scope.as_str()
676 )));
677 }
678 Ok(ParsedName {
679 scope: Some(scope),
680 scope_id: None,
681 name: rest.to_string(),
682 })
683 }
684 ChannelScope::Tenant => {
685 if rest.is_empty() {
686 return Err(ChannelError::malformed(
687 "HARN-CHN-003 tenant channel name cannot be empty",
688 ));
689 }
690 let (scope_id, name) = match rest.split_once(':') {
691 Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
692 (Some(tenant_id.to_string()), name.to_string())
693 }
694 Some(_) => {
695 return Err(ChannelError::malformed(format!(
696 "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
697 )))
698 }
699 None => (None, rest.to_string()),
700 };
701 Ok(ParsedName {
702 scope: Some(scope),
703 scope_id,
704 name,
705 })
706 }
707 ChannelScope::Org => {
708 let Some((org_id, name)) = rest.split_once(':') else {
709 return Err(ChannelError::malformed(format!(
710 "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
711 )));
712 };
713 if org_id.is_empty() || name.is_empty() {
714 return Err(ChannelError::malformed(format!(
715 "HARN-CHN-003 malformed org channel name '{raw_name}'"
716 )));
717 }
718 Ok(ParsedName {
719 scope: Some(scope),
720 scope_id: Some(org_id.to_string()),
721 name: name.to_string(),
722 })
723 }
724 }
725}
726
727fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
728 if name.trim().is_empty()
729 || name.contains(':')
730 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
731 {
732 return Err(ChannelError::malformed(format!(
733 "HARN-CHN-003 malformed channel name '{name}'"
734 )));
735 }
736 Ok(())
737}
738
739fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
740 if scope_id.trim().is_empty()
741 || scope_id
742 .chars()
743 .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
744 {
745 return Err(ChannelError::malformed(format!(
746 "HARN-CHN-003 malformed {} scope id '{scope_id}'",
747 scope.as_str()
748 )));
749 }
750 Ok(())
751}
752
753fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
754 match scope {
755 ChannelScope::Session => {
756 let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
757 let mut guard = slot.lock().expect("channel session log poisoned");
758 guard
759 .get_or_insert_with(|| {
760 Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
761 CHANNEL_QUEUE_DEPTH,
762 )))
763 })
764 .clone()
765 }
766 ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
767 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
768 ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
769 }
770}
771
772fn signed_timestamp(
773 resolved: &ResolvedChannel,
774 event_id: &str,
775 emitted_by: &str,
776) -> SignedTimestamp {
777 let at = crate::clock_mock::now_utc();
778 let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
779 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
780 let material = format!(
781 "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
782 resolved.resolved_name,
783 resolved.scope.as_str(),
784 resolved.scope_id
785 );
786 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
787 signing_salt(),
788 material.as_bytes(),
789 ));
790 SignedTimestamp {
791 at_ms,
792 at: at_text,
793 algorithm: "hmac-sha256".to_string(),
794 key_id: "local-session".to_string(),
795 signature: format!("sha256:{signature}"),
796 }
797}
798
799fn signing_salt() -> &'static [u8] {
800 SIGNING_SALT
801 .get_or_init(|| {
802 format!(
803 "harn-channel-signing-salt:{}:{}",
804 std::process::id(),
805 uuid::Uuid::now_v7()
806 )
807 .into_bytes()
808 })
809 .as_slice()
810}
811
812fn signed_match_timestamp(
819 resolved: &ResolvedChannel,
820 event_id: &str,
821 trigger_id: &str,
822) -> SignedTimestamp {
823 let at = crate::clock_mock::now_utc();
824 let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
825 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
826 let material = format!(
827 "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
828 resolved.resolved_name,
829 resolved.scope.as_str(),
830 resolved.scope_id
831 );
832 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
833 signing_salt(),
834 material.as_bytes(),
835 ));
836 SignedTimestamp {
837 at_ms,
838 at: at_text,
839 algorithm: "hmac-sha256".to_string(),
840 key_id: "local-session".to_string(),
841 signature: format!("sha256:{signature}"),
842 }
843}
844
845pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
852 let canonical = canonical_json_string(payload);
853 let digest = Sha256::digest(canonical.as_bytes());
854 format!("sha256:{}", hex::encode(digest))
855}
856
857fn canonical_json_string(value: &serde_json::Value) -> String {
863 match value {
864 serde_json::Value::Object(map) => {
865 let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
866 std::collections::BTreeMap::new();
867 for (key, value) in map {
868 sorted.insert(key, value);
869 }
870 let parts: Vec<String> = sorted
871 .iter()
872 .map(|(key, value)| {
873 format!(
874 "{}:{}",
875 serde_json::to_string(key).unwrap_or_else(|_| key.to_string()),
876 canonical_json_string(value)
877 )
878 })
879 .collect();
880 format!("{{{}}}", parts.join(","))
881 }
882 serde_json::Value::Array(items) => {
883 let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
884 format!("[{}]", parts.join(","))
885 }
886 other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
887 }
888}
889
890async fn append_channel_audit_event(
899 kind: &'static str,
900 schema: &'static str,
901 payload: serde_json::Value,
902) {
903 let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
904 Ok(topic) => topic,
905 Err(_) => return,
906 };
907 let log = active_event_log()
908 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
909 let mut headers = BTreeMap::new();
910 headers.insert("schema".to_string(), schema.to_string());
911 let _ = log
912 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
913 .await;
914}
915
916async fn record_guardrail_audit(
924 kind: &'static str,
925 resolved: &ResolvedChannel,
926 event_id: &str,
927 emitted_by: &str,
928 payload: &serde_json::Value,
929 fired: &[crate::channel_guardrails::FiredGuardrail],
930) {
931 let fired_json: Vec<serde_json::Value> = fired
932 .iter()
933 .map(|entry| {
934 serde_json::json!({
935 "id": entry.id,
936 "kind": entry.kind,
937 "verdict_label": entry.verdict_label,
938 "reason": entry.reason,
939 })
940 })
941 .collect();
942 let audit_payload = serde_json::json!({
943 "event_id": event_id,
944 "name_resolved": resolved.resolved_name,
945 "scope": resolved.scope.as_str(),
946 "scope_id": resolved.scope_id,
947 "emitted_by": emitted_by,
948 "payload_hash": channel_payload_hash(payload),
949 "payload": payload,
950 "fired": fired_json,
951 });
952 append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
953 crate::orchestration::record_lifecycle_audit(kind, audit_payload);
957}
958
959async fn record_guardrail_warnings(
964 resolved: &ResolvedChannel,
965 event_id: &str,
966 emitted_by: &str,
967 payload: &serde_json::Value,
968 fired: &[crate::channel_guardrails::FiredGuardrail],
969) {
970 if fired.is_empty() {
971 return;
972 }
973 record_guardrail_audit(
974 CHANNEL_GUARDRAIL_WARNING_KIND,
975 resolved,
976 event_id,
977 emitted_by,
978 payload,
979 fired,
980 )
981 .await;
982}
983
984async fn handle_blocked_emit(
990 raw_name: &str,
991 resolved: &ResolvedChannel,
992 event_id: &str,
993 emitted_by: &str,
994 emitted_at: &SignedTimestamp,
995 payload: &serde_json::Value,
996 decision: &crate::channel_guardrails::GuardrailDecision,
997) -> Result<VmValue, VmError> {
998 record_guardrail_audit(
999 CHANNEL_GUARDRAIL_BLOCKED_KIND,
1000 resolved,
1001 event_id,
1002 emitted_by,
1003 payload,
1004 decision.fired.as_slice(),
1005 )
1006 .await;
1007 let block_reason = decision
1008 .fired
1009 .iter()
1010 .rev()
1011 .find_map(|f| {
1012 if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1013 || f.verdict_label.contains("block")
1014 {
1015 Some(f.reason.clone())
1016 } else {
1017 None
1018 }
1019 })
1020 .unwrap_or_else(|| "guardrail blocked".to_string());
1021 let fired_json: Vec<serde_json::Value> = decision
1022 .fired
1023 .iter()
1024 .map(|entry| {
1025 serde_json::json!({
1026 "id": entry.id,
1027 "kind": entry.kind,
1028 "verdict_label": entry.verdict_label,
1029 "reason": entry.reason,
1030 })
1031 })
1032 .collect();
1033 let receipt = serde_json::json!({
1034 "event_id": event_id,
1035 "cursor": serde_json::Value::Null,
1036 "id": event_id,
1037 "name": raw_name,
1038 "name_resolved": resolved.resolved_name,
1039 "scope": resolved.scope.as_str(),
1040 "scope_id": resolved.scope_id,
1041 "emitted_at": emitted_at,
1042 "emitted_by": emitted_by,
1043 "retention": resolved.retention,
1044 "topic": resolved.topic.as_str(),
1045 "inserted": false,
1046 "duplicate": false,
1047 "blocked": true,
1048 "block_reason": block_reason,
1049 "guardrail_fired": fired_json,
1050 });
1051 Ok(crate::stdlib::json_to_vm_value(&receipt))
1052}
1053
1054async fn record_channel_emit_receipt(
1059 record: &StoredChannelEvent,
1060 resolved: &ResolvedChannel,
1061 inserted: bool,
1062 span_id: u64,
1063) {
1064 let receipt = ChannelEmitReceipt {
1065 event_id: record.id.clone(),
1066 name_resolved: resolved.resolved_name.clone(),
1067 scope: resolved.scope.as_str().to_string(),
1068 scope_id: resolved.scope_id.clone(),
1069 payload_hash: channel_payload_hash(&record.payload),
1070 payload: record.payload.clone(),
1071 emitted_at: record.emitted_at.clone(),
1072 emitted_by: record.emitted_by.clone(),
1073 pipeline_id: record.pipeline_id.clone(),
1074 session_id: record.session_id.clone(),
1075 tenant_id: record.tenant_id.clone(),
1076 topic: resolved.topic.as_str().to_string(),
1077 inserted,
1078 span_id: if span_id == 0 { None } else { Some(span_id) },
1079 };
1080 let payload = match serde_json::to_value(&receipt) {
1081 Ok(value) => value,
1082 Err(_) => return,
1083 };
1084 append_channel_audit_event(
1085 CHANNEL_EMIT_RECEIPT_KIND,
1086 CHANNEL_EMIT_RECEIPT_SCHEMA,
1087 payload,
1088 )
1089 .await;
1090}
1091
1092#[allow(clippy::too_many_arguments)]
1098async fn record_channel_match_receipt(
1099 trigger_id: &str,
1100 binding_key: &str,
1101 handler_kind: &str,
1102 resolved: &ResolvedChannel,
1103 event_id: &str,
1104 matched_in_session_id: Option<&str>,
1105 batch: Option<ChannelMatchBatchInfo>,
1106 span_id: u64,
1107 dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1108) {
1109 let receipt = ChannelMatchReceipt {
1110 event_id: event_id.to_string(),
1111 trigger_id: trigger_id.to_string(),
1112 binding_key: binding_key.to_string(),
1113 name_resolved: resolved.resolved_name.clone(),
1114 scope: resolved.scope.as_str().to_string(),
1115 scope_id: resolved.scope_id.clone(),
1116 matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1117 matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1118 batch,
1119 handler_kind: handler_kind.to_string(),
1120 handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1121 span_id: if span_id == 0 { None } else { Some(span_id) },
1122 };
1123 let payload = match serde_json::to_value(&receipt) {
1124 Ok(value) => value,
1125 Err(_) => return,
1126 };
1127 append_channel_audit_event(
1128 CHANNEL_MATCH_RECEIPT_KIND,
1129 CHANNEL_MATCH_RECEIPT_SCHEMA,
1130 payload,
1131 )
1132 .await;
1133}
1134
1135fn batch_info_from_summary(
1139 batch_summary: Option<&serde_json::Value>,
1140) -> Option<ChannelMatchBatchInfo> {
1141 let summary = batch_summary?.as_object()?;
1142 let count = summary
1143 .get("count")
1144 .and_then(|v| v.as_u64())
1145 .map(|n| n as usize)?;
1146 let constituent_event_ids = summary
1147 .get("constituent_event_ids")
1148 .and_then(|v| v.as_array())
1149 .map(|arr| {
1150 arr.iter()
1151 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1152 .collect()
1153 })
1154 .unwrap_or_default();
1155 Some(ChannelMatchBatchInfo {
1156 count,
1157 constituent_event_ids,
1158 })
1159}
1160
1161fn emitted_by(context: &ChannelContext) -> String {
1162 context
1163 .worker_id
1164 .clone()
1165 .or_else(|| context.agent_session_id.clone())
1166 .or_else(|| context.task_id.clone())
1167 .unwrap_or_else(|| "harn".to_string())
1168}
1169
1170fn retention_for_scope(scope: ChannelScope) -> &'static str {
1171 match scope {
1172 ChannelScope::Session => "in_process_session",
1173 ChannelScope::Pipeline => "pipeline_event_log",
1174 ChannelScope::Tenant => "tenant_event_log",
1175 ChannelScope::Org => "org_event_log",
1176 }
1177}
1178
1179fn receipt_value(
1180 topic: &Topic,
1181 event_id: EventId,
1182 event: &LogEvent,
1183 inserted: bool,
1184) -> Result<serde_json::Value, VmError> {
1185 let record = stored_record(event)?;
1186 Ok(serde_json::json!({
1187 "event_id": event_id,
1188 "cursor": event_id,
1189 "id": record.id,
1190 "name": record.name,
1191 "name_resolved": record.name,
1192 "scope": record.scope,
1193 "scope_id": record.scope_id,
1194 "payload": record.payload,
1195 "emitted_at": record.emitted_at,
1196 "emitted_by": record.emitted_by,
1197 "pipeline_id": record.pipeline_id,
1198 "session_id": record.session_id,
1199 "tenant_id": record.tenant_id,
1200 "retention": record.retention,
1201 "ttl_ms": record.ttl_ms,
1202 "topic": topic.as_str(),
1203 "inserted": inserted,
1204 "duplicate": !inserted,
1205 }))
1206}
1207
1208fn event_value(
1209 topic: &Topic,
1210 event_id: EventId,
1211 event: LogEvent,
1212) -> Result<serde_json::Value, VmError> {
1213 let record = stored_record(&event)?;
1214 Ok(serde_json::json!({
1215 "event_id": event_id,
1216 "cursor": event_id,
1217 "topic": topic.as_str(),
1218 "kind": event.kind,
1219 "headers": event.headers,
1220 "occurred_at_ms": event.occurred_at_ms,
1221 "id": record.id,
1222 "name": record.name,
1223 "name_resolved": record.name,
1224 "scope": record.scope,
1225 "scope_id": record.scope_id,
1226 "payload": record.payload,
1227 "emitted_at": record.emitted_at,
1228 "emitted_by": record.emitted_by,
1229 "pipeline_id": record.pipeline_id,
1230 "session_id": record.session_id,
1231 "tenant_id": record.tenant_id,
1232 "retention": record.retention,
1233 "ttl_ms": record.ttl_ms,
1234 }))
1235}
1236
1237fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1238 serde_json::from_value(event.payload.clone()).map_err(|error| {
1239 VmError::Runtime(format!(
1240 "channel event store contained malformed channel payload: {error}"
1241 ))
1242 })
1243}
1244
1245fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1246 let Some(value) = value else {
1247 return Ok(ChannelOptions::default());
1248 };
1249 match value {
1250 VmValue::Nil => Ok(ChannelOptions::default()),
1251 VmValue::Dict(options) => Ok(ChannelOptions {
1252 scope: option_string(options, "scope", builtin)?
1253 .map(|scope| ChannelScope::parse(&scope))
1254 .transpose()
1255 .map_err(VmError::from)?,
1256 id: option_string(options, "id", builtin)?,
1257 tenant_id: option_string(options, "tenant_id", builtin)?,
1258 session_id: option_string(options, "session_id", builtin)?,
1259 pipeline_id: option_string(options, "pipeline_id", builtin)?,
1260 from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1261 .or(option_non_negative_int(options, "cursor", builtin)?)
1262 .map(|value| value as EventId),
1263 limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1264 ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1265 }),
1266 other => Err(VmError::TypeError(format!(
1267 "{builtin}: options must be a dict or nil, got {}",
1268 other.type_name()
1269 ))),
1270 }
1271}
1272
1273fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1274 match value {
1275 Some(VmValue::String(value)) => Ok(value.to_string()),
1276 Some(other) => Err(VmError::TypeError(format!(
1277 "{builtin}: {name} must be a string, got {}",
1278 other.type_name()
1279 ))),
1280 None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1281 }
1282}
1283
1284fn option_string(
1285 options: &BTreeMap<String, VmValue>,
1286 key: &str,
1287 builtin: &str,
1288) -> Result<Option<String>, VmError> {
1289 match options.get(key) {
1290 None | Some(VmValue::Nil) => Ok(None),
1291 Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1292 Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1293 "{builtin}: options.{key} cannot be empty"
1294 ))),
1295 Some(other) => Err(VmError::TypeError(format!(
1296 "{builtin}: options.{key} must be a string or nil, got {}",
1297 other.type_name()
1298 ))),
1299 }
1300}
1301
1302fn option_non_negative_int(
1303 options: &BTreeMap<String, VmValue>,
1304 key: &str,
1305 builtin: &str,
1306) -> Result<Option<u64>, VmError> {
1307 match options.get(key) {
1308 None | Some(VmValue::Nil) => Ok(None),
1309 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1310 Some(other) => Err(VmError::TypeError(format!(
1311 "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1312 other.type_name()
1313 ))),
1314 }
1315}
1316
1317fn option_duration_ms(
1318 options: &BTreeMap<String, VmValue>,
1319 key: &str,
1320 builtin: &str,
1321) -> Result<Option<i64>, VmError> {
1322 match options.get(key) {
1323 None | Some(VmValue::Nil) => Ok(None),
1324 Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1325 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1326 Some(other) => Err(VmError::TypeError(format!(
1327 "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1328 other.type_name()
1329 ))),
1330 }
1331}
1332
1333fn dict_string(values: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
1334 match values.get(key) {
1335 Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1336 _ => None,
1337 }
1338}
1339
1340fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1341 VmError::Runtime(format!("channel event log: {error}"))
1342}
1343
1344struct ChannelSpanGuard {
1353 span_id: u64,
1354 otel_span: tracing::Span,
1355}
1356
1357impl ChannelSpanGuard {
1358 fn start(
1359 kind: crate::tracing::SpanKind,
1360 name: String,
1361 links: Vec<crate::tracing::SpanLink>,
1362 ) -> Self {
1363 Self::start_with_parenting(kind, name, links, true)
1364 }
1365
1366 fn start_detached(
1367 kind: crate::tracing::SpanKind,
1368 name: String,
1369 links: Vec<crate::tracing::SpanLink>,
1370 ) -> Self {
1371 Self::start_with_parenting(kind, name, links, false)
1372 }
1373
1374 fn start_with_parenting(
1375 kind: crate::tracing::SpanKind,
1376 name: String,
1377 links: Vec<crate::tracing::SpanLink>,
1378 inherit_parent: bool,
1379 ) -> Self {
1380 let span_id = if inherit_parent {
1381 crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1382 } else {
1383 crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1384 };
1385 let otel_span = tracing::info_span!(
1386 target: "harn.vm.channel",
1387 "harn.channel",
1388 harn.kind = kind.as_str(),
1389 harn.name = %name,
1390 );
1391 for link in links {
1392 let trace_id = crate::TraceId(link.trace_id);
1393 let mut attributes: std::collections::HashMap<String, String> =
1394 link.attributes.into_iter().collect();
1395 attributes
1396 .entry("harn.link.kind".to_string())
1397 .or_insert_with(|| "channel_emit".to_string());
1398 let _ = crate::observability::otel::set_span_link(
1399 &otel_span,
1400 &trace_id,
1401 &link.span_id,
1402 Some(attributes),
1403 );
1404 }
1405 Self { span_id, otel_span }
1406 }
1407
1408 fn link(&self) -> Option<crate::tracing::SpanLink> {
1409 crate::observability::otel::current_span_context_hex(&self.otel_span)
1410 .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1411 .or_else(|| crate::tracing::span_link(self.span_id))
1412 }
1413
1414 fn set_metadata(&self, key: &str, value: serde_json::Value) {
1415 crate::tracing::span_set_metadata(self.span_id, key, value);
1416 }
1417
1418 fn end(&mut self) {
1419 if self.span_id != 0 {
1420 crate::tracing::span_end(self.span_id);
1421 self.span_id = 0;
1422 }
1423 }
1424}
1425
1426impl Drop for ChannelSpanGuard {
1427 fn drop(&mut self) {
1428 self.end();
1429 }
1430}
1431
1432fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1439 const MAX_STRING_LEN: usize = 120;
1440 match payload {
1441 serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1442 serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1443 serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1444 serde_json::Value::String(value) => {
1445 let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1446 let len = value.chars().count();
1447 serde_json::json!({
1448 "kind": "string",
1449 "value": truncated,
1450 "truncated": len > MAX_STRING_LEN,
1451 "length": len,
1452 })
1453 }
1454 serde_json::Value::Array(items) => {
1455 serde_json::json!({"kind": "array", "length": items.len()})
1456 }
1457 serde_json::Value::Object(map) => {
1458 let fields: Vec<&String> = map.keys().take(8).collect();
1459 serde_json::json!({
1460 "kind": "object",
1461 "field_count": map.len(),
1462 "fields": fields,
1463 })
1464 }
1465 }
1466}
1467
1468fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1473 let Some(log) = active_event_log() else {
1474 return;
1475 };
1476 let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1477 return;
1478 };
1479 let event = LogEvent::new(kind, payload);
1480 if tokio::runtime::Handle::try_current().is_ok() {
1481 if let Ok(join) = std::thread::Builder::new()
1482 .name("harn-channel-transcript".to_string())
1483 .spawn(move || {
1484 let _ = futures::executor::block_on(log.append(&topic, event));
1485 })
1486 {
1487 let _ = join.join();
1488 }
1489 } else {
1490 let _ = futures::executor::block_on(log.append(&topic, event));
1491 }
1492}
1493
1494fn emit_channel_emit_transcript(
1499 record: &StoredChannelEvent,
1500 resolved: &ResolvedChannel,
1501 inserted: bool,
1502 span_id: u64,
1503) {
1504 let payload = serde_json::json!({
1505 "event_id": record.id,
1506 "name": record.name,
1507 "name_resolved": resolved.resolved_name,
1508 "scope": record.scope,
1509 "scope_id": record.scope_id,
1510 "payload_summary": summarize_payload(&record.payload),
1511 "emitted_at": record.emitted_at,
1512 "emitted_at_ms": record.emitted_at.at_ms,
1513 "emitted_by": record.emitted_by,
1514 "session_id": record.session_id,
1515 "pipeline_id": record.pipeline_id,
1516 "tenant_id": record.tenant_id,
1517 "inserted": inserted,
1518 "duplicate": !inserted,
1519 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1520 });
1521 emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1522}
1523
1524#[allow(clippy::too_many_arguments)]
1529fn emit_channel_match_transcript(
1530 trigger_id: &str,
1531 handler_kind: &str,
1532 resolved: &ResolvedChannel,
1533 event_id: &str,
1534 matched_at_ms: i64,
1535 matched_in_session_id: Option<&str>,
1536 span_id: u64,
1537 batch: Option<serde_json::Value>,
1538) {
1539 let mut payload = serde_json::json!({
1540 "event_id": event_id,
1541 "name_resolved": resolved.resolved_name,
1542 "scope": resolved.scope.as_str(),
1543 "scope_id": resolved.scope_id,
1544 "trigger_id": trigger_id,
1545 "handler_kind": handler_kind,
1546 "matched_at_ms": matched_at_ms,
1547 "matched_in_session_id": matched_in_session_id,
1548 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1549 });
1550 if let Some(batch) = batch {
1551 if let Some(map) = payload.as_object_mut() {
1552 map.insert("batch".to_string(), batch);
1553 }
1554 }
1555 emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1556}
1557
1558fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1565 let mut links = Vec::new();
1566 if let (Some(trace_id), Some(span_id)) = (
1567 event.headers.get(EMIT_TRACE_ID_HEADER),
1568 event.headers.get(EMIT_SPAN_ID_HEADER),
1569 ) {
1570 links.push(
1571 crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1572 BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1573 ),
1574 );
1575 }
1576 links
1577}
1578
1579fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1580 let mut links = Vec::new();
1581 for event in events {
1582 links.extend(emit_links_from_event(event));
1583 }
1584 links
1585}
1586
1587fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1588 let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1589 serde_json::json!({
1590 "count": events.len(),
1591 "constituent_event_ids": constituent_ids,
1592 })
1593}
1594
1595#[derive(Clone, Debug, PartialEq, Eq)]
1603pub struct ChannelSelector {
1604 scope: ChannelScope,
1605 scope_id_pattern: ScopeIdPattern,
1606 name: String,
1607}
1608
1609#[derive(Clone, Debug, PartialEq, Eq)]
1610enum ScopeIdPattern {
1611 Current,
1614 Exact(String),
1616 Wildcard,
1619}
1620
1621impl ChannelSelector {
1622 pub fn parse(input: &str) -> Result<Self, String> {
1632 let input = input.trim();
1633 let rest = input
1634 .strip_prefix("channel:")
1635 .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1636 if rest.is_empty() {
1637 return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1638 }
1639
1640 let (head, tail_opt) = match rest.split_once(':') {
1641 Some((head, tail)) => (head, Some(tail)),
1642 None => (rest, None),
1643 };
1644 let parsed_scope = ChannelScope::parse(head).ok();
1645 match (parsed_scope, tail_opt) {
1646 (None, _) => {
1648 let name = rest.to_string();
1649 validate_selector_name(&name)?;
1650 Ok(Self {
1651 scope: ChannelScope::Tenant,
1652 scope_id_pattern: ScopeIdPattern::Current,
1653 name,
1654 })
1655 }
1656 (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1657 if !name.is_empty() =>
1658 {
1659 if name.contains(':') {
1660 return Err(format!(
1661 "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1662 scope.as_str()
1663 ));
1664 }
1665 validate_selector_name(name)?;
1666 Ok(Self {
1667 scope,
1668 scope_id_pattern: ScopeIdPattern::Current,
1669 name: name.to_string(),
1670 })
1671 }
1672 (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1673 if !tail.is_empty() =>
1674 {
1675 let Some((scope_id, name)) = tail.split_once(':') else {
1676 if matches!(scope, ChannelScope::Tenant) {
1678 validate_selector_name(tail)?;
1679 return Ok(Self {
1680 scope,
1681 scope_id_pattern: ScopeIdPattern::Current,
1682 name: tail.to_string(),
1683 });
1684 }
1685 return Err(format!(
1686 "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1687 ));
1688 };
1689 if scope_id.is_empty() || name.is_empty() {
1690 return Err(format!(
1691 "channel selector `{input}`: scope id and name must be non-empty"
1692 ));
1693 }
1694 validate_selector_name(name)?;
1695 let pattern = if scope_id == "*" {
1696 ScopeIdPattern::Wildcard
1697 } else {
1698 ScopeIdPattern::Exact(scope_id.to_string())
1699 };
1700 Ok(Self {
1701 scope,
1702 scope_id_pattern: pattern,
1703 name: name.to_string(),
1704 })
1705 }
1706 (Some(scope), _) => Err(format!(
1707 "channel selector `{input}`: {} scope requires `<name>` segment",
1708 scope.as_str()
1709 )),
1710 }
1711 }
1712
1713 pub fn scope(&self) -> &'static str {
1714 self.scope.as_str()
1715 }
1716
1717 pub fn name(&self) -> &str {
1718 &self.name
1719 }
1720
1721 pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1725 if self.scope.as_str() != scope || self.name != name {
1726 return false;
1727 }
1728 match &self.scope_id_pattern {
1729 ScopeIdPattern::Current => match self.scope {
1730 ChannelScope::Tenant => scope_id == current_tenant,
1731 ChannelScope::Session | ChannelScope::Pipeline => {
1732 true
1737 }
1738 ChannelScope::Org => false,
1739 },
1740 ScopeIdPattern::Exact(value) => scope_id == value,
1741 ScopeIdPattern::Wildcard => match self.scope {
1742 ChannelScope::Tenant => true,
1743 _ => false,
1745 },
1746 }
1747 }
1748}
1749
1750fn validate_selector_name(name: &str) -> Result<(), String> {
1751 if name.trim().is_empty()
1752 || name.contains(':')
1753 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1754 {
1755 return Err(format!("channel selector name `{name}` is malformed"));
1756 }
1757 Ok(())
1758}
1759
1760async fn dispatch_channel_emit_to_triggers(
1774 resolved: &ResolvedChannel,
1775 payload: ChannelEventPayload,
1776 emit_link: Option<crate::tracing::SpanLink>,
1777) -> Result<(), VmError> {
1778 let bindings = crate::triggers::registry::channel_bindings_matching(
1781 resolved.scope.as_str(),
1782 &resolved.scope_id,
1783 &payload.name,
1784 );
1785
1786 flush_expired_aggregations_inner().await;
1793
1794 if bindings.is_empty() {
1795 return Ok(());
1796 }
1797 let Some(base_vm) = crate::vm::clone_async_builtin_child_vm() else {
1798 return Ok(());
1800 };
1801 let log = active_event_log()
1802 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1803 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1804 for binding in bindings {
1805 if let Some(filter_str) = binding.filter.as_ref() {
1809 if !channel_filter_matches(filter_str, &payload.payload) {
1810 continue;
1811 }
1812 }
1813 let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1814
1815 if let Some(aggregation_config) = binding.aggregation.as_ref() {
1819 let partition_key = crate::triggers::aggregation::partition_key_for_event(
1820 aggregation_config,
1821 &payload.payload,
1822 );
1823 let binding_key = binding.binding_key();
1824 let outcome = crate::triggers::aggregation::accumulate(
1825 &binding_key,
1826 aggregation_config,
1827 partition_key.as_deref(),
1828 event,
1829 );
1830 if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1831 let links = emit_links_from_batch(&events);
1835 let batch_summary = batch_summary_for_transcript(&events);
1836 let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1837 {
1838 Ok(batched) => batched,
1839 Err(error) => {
1840 return Err(VmError::Runtime(format!(
1841 "emit_channel aggregation batch: {error}"
1842 )));
1843 }
1844 };
1845 fire_channel_match(
1846 &dispatcher,
1847 binding.clone(),
1848 batched,
1849 resolved,
1850 links,
1851 Some(batch_summary),
1852 )
1853 .await;
1854 }
1855 continue;
1856 }
1857
1858 let links = emit_links_from_event(&event);
1859 fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1860 }
1861 Ok(())
1862}
1863
1864async fn fire_channel_match(
1869 dispatcher: &crate::triggers::Dispatcher,
1870 binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1871 event: TriggerEvent,
1872 resolved: &ResolvedChannel,
1873 links: Vec<crate::tracing::SpanLink>,
1874 batch_summary: Option<serde_json::Value>,
1875) {
1876 let trigger_id = binding.id.as_str().to_string();
1877 let handler_kind = binding.handler.kind().to_string();
1878 let event_id = if event.dedupe_key.is_empty() {
1884 event.id.0.clone()
1885 } else {
1886 event.dedupe_key.clone()
1887 };
1888 let mut match_span = ChannelSpanGuard::start_detached(
1889 crate::tracing::SpanKind::ChannelMatch,
1890 format!("channel.match {}", resolved.resolved_name),
1891 links,
1892 );
1893 match_span.set_metadata("event_id", serde_json::json!(event_id));
1894 match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
1895 match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
1896 match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
1897 if let Some(summary) = batch_summary.as_ref() {
1898 match_span.set_metadata("batch", summary.clone());
1899 }
1900 let span_id = crate::tracing::current_span_id().unwrap_or(0);
1901 let matched_at_ms = crate::clock_mock::now_utc().unix_timestamp_nanos() as i64 / 1_000_000;
1902 let matched_in_session_id = crate::agent_sessions::current_session_id()
1903 .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
1904 emit_channel_match_transcript(
1905 &trigger_id,
1906 &handler_kind,
1907 resolved,
1908 &event_id,
1909 matched_at_ms,
1910 matched_in_session_id.as_deref(),
1911 span_id,
1912 batch_summary.clone(),
1913 );
1914 let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
1920 let binding_key = binding.binding_key();
1921 let batch_info = batch_info_from_summary(batch_summary.as_ref());
1922 record_channel_match_receipt(
1923 &trigger_id,
1924 &binding_key,
1925 &handler_kind,
1926 resolved,
1927 &event_id,
1928 matched_in_session_id.as_deref(),
1929 batch_info,
1930 span_id,
1931 &dispatch_outcome,
1932 )
1933 .await;
1934 drop(dispatch_outcome);
1939 match_span.end();
1940}
1941
1942pub(crate) async fn flush_expired_aggregations_inner() {
1947 let expirations = crate::triggers::aggregation::drain_expired_aggregations();
1948 if expirations.is_empty() {
1949 return;
1950 }
1951 let Some(base_vm) = crate::vm::clone_async_builtin_child_vm() else {
1952 return;
1953 };
1954 let log = active_event_log()
1955 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1956 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1957 for expired in expirations {
1958 if matches!(
1959 expired.action,
1960 crate::triggers::aggregation::ExpireAction::Discard
1961 ) {
1962 continue;
1963 }
1964 let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
1967 continue;
1968 };
1969 let Ok(version) = version_str.parse::<u32>() else {
1970 continue;
1971 };
1972 let Ok(binding) =
1973 crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
1974 else {
1975 continue;
1976 };
1977 let resolved_for_match = resolved_from_first_event(&expired.events);
1981 let links = emit_links_from_batch(&expired.events);
1982 let batch_summary = batch_summary_for_transcript(&expired.events);
1983 let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
1984 {
1985 Ok(batched) => batched,
1986 Err(_) => continue,
1987 };
1988 match resolved_for_match {
1989 Some(resolved) => {
1990 fire_channel_match(
1991 &dispatcher,
1992 binding,
1993 batched,
1994 &resolved,
1995 links,
1996 Some(batch_summary),
1997 )
1998 .await;
1999 }
2000 None => {
2001 let _ = dispatcher.dispatch(&binding, batched).await;
2002 }
2003 }
2004 }
2005}
2006
2007fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2013 let first = events.first()?;
2014 let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2015 else {
2016 return None;
2017 };
2018 let scope = ChannelScope::parse(&payload.scope).ok()?;
2019 let topic = Topic::new(format!(
2020 "channels.{}.{}.{}",
2021 payload.scope,
2022 sanitize_topic_component(&payload.scope_id),
2023 sanitize_topic_component(&payload.name),
2024 ))
2025 .ok()?;
2026 Some(ResolvedChannel {
2027 scope,
2028 scope_id: payload.scope_id.clone(),
2029 resolved_name: payload.name_resolved.clone(),
2030 topic,
2031 retention: retention_for_scope(scope),
2032 })
2033}
2034
2035fn build_channel_trigger_event(
2036 payload: &ChannelEventPayload,
2037 emit_link: Option<&crate::tracing::SpanLink>,
2038) -> TriggerEvent {
2039 let mut event = TriggerEvent::new(
2040 ProviderId::from("channel"),
2041 "channel.emit",
2042 None,
2043 payload.id.clone(),
2044 payload.tenant_id.clone().map(TenantId::new),
2045 BTreeMap::new(),
2046 ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2047 SignatureStatus::Unsigned,
2048 );
2049 event.headers.insert(
2050 "harn_channel_name".to_string(),
2051 payload.name_resolved.clone(),
2052 );
2053 event
2054 .headers
2055 .insert("harn_channel_scope".to_string(), payload.scope.clone());
2056 event.headers.insert(
2057 "harn_channel_scope_id".to_string(),
2058 payload.scope_id.clone(),
2059 );
2060 if let Some(link) = emit_link {
2065 event
2066 .headers
2067 .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2068 event
2069 .headers
2070 .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2071 }
2072 event
2073}
2074
2075fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2083 let trimmed = filter_raw.trim();
2084 if trimmed.is_empty() {
2085 return true;
2086 }
2087 let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2088 Ok(value) => value,
2089 Err(_) => return true,
2090 };
2091 let Some(map) = parsed.as_object() else {
2092 return true;
2093 };
2094 map.iter()
2095 .all(|(key, expected)| match payload_path(payload, key) {
2096 Some(actual) => actual == expected,
2097 None => false,
2098 })
2099}
2100
2101fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2102 let mut current = value;
2103 for segment in path.split('.') {
2104 if segment.is_empty() {
2105 return None;
2106 }
2107 current = match current {
2108 serde_json::Value::Object(map) => map.get(segment)?,
2109 _ => return None,
2110 };
2111 }
2112 Some(current)
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117 use super::*;
2118
2119 fn context() -> ChannelContext {
2120 ChannelContext {
2121 task_id: Some("task".to_string()),
2122 root_task_id: Some("root".to_string()),
2123 ..ChannelContext::default()
2124 }
2125 }
2126
2127 #[test]
2128 fn resolves_bare_name_to_default_tenant() {
2129 let resolved =
2130 resolve_channel("pr.merged", &ChannelOptions::default(), &context()).unwrap();
2131 assert_eq!(resolved.scope, ChannelScope::Tenant);
2132 assert_eq!(resolved.resolved_name, "tenant:default:pr.merged");
2133 assert_eq!(resolved.topic.as_str(), "channels.tenant.default.pr.merged");
2134 }
2135
2136 #[test]
2137 fn resolves_session_prefix_from_context() {
2138 let resolved =
2139 resolve_channel("session:agent.done", &ChannelOptions::default(), &context()).unwrap();
2140 assert_eq!(resolved.scope, ChannelScope::Session);
2141 assert_eq!(resolved.resolved_name, "session:root:agent.done");
2142 }
2143
2144 #[test]
2145 fn missing_pipeline_context_reports_channel_error() {
2146 let err = resolve_channel(
2147 "pipeline:stage.done",
2148 &ChannelOptions::default(),
2149 &context(),
2150 )
2151 .unwrap_err();
2152 assert!(err.0.contains("HARN-CHN-001"));
2153 }
2154
2155 #[test]
2156 fn org_scope_is_disabled() {
2157 let err = resolve_channel(
2158 "org:burin-labs:pr.merged",
2159 &ChannelOptions::default(),
2160 &context(),
2161 )
2162 .unwrap_err();
2163 assert!(err.0.contains("HARN-CHN-002"));
2164 }
2165
2166 #[test]
2167 fn explicit_session_id_matching_context_resolves() {
2168 let ctx = ChannelContext {
2169 agent_session_id: Some("sess-A".to_string()),
2170 ..context()
2171 };
2172 let options = ChannelOptions {
2173 session_id: Some("sess-A".to_string()),
2174 ..ChannelOptions::default()
2175 };
2176 let resolved = resolve_channel("session:agent.done", &options, &ctx).unwrap();
2177 assert_eq!(resolved.scope, ChannelScope::Session);
2178 assert_eq!(resolved.resolved_name, "session:sess-A:agent.done");
2179 }
2180
2181 #[test]
2182 fn explicit_session_id_conflict_reports_ambiguity() {
2183 let ctx = ChannelContext {
2184 agent_session_id: Some("sess-A".to_string()),
2185 ..context()
2186 };
2187 let options = ChannelOptions {
2188 session_id: Some("sess-B".to_string()),
2189 ..ChannelOptions::default()
2190 };
2191 let err = resolve_channel("session:agent.done", &options, &ctx).unwrap_err();
2192 assert!(
2193 err.0.contains("HARN-CHN-004"),
2194 "expected HARN-CHN-004, got: {}",
2195 err.0
2196 );
2197 }
2198
2199 #[test]
2200 fn explicit_pipeline_id_conflict_reports_ambiguity() {
2201 let ctx = ChannelContext {
2202 workflow_id: Some("pipe-A".to_string()),
2203 ..context()
2204 };
2205 let options = ChannelOptions {
2206 pipeline_id: Some("pipe-B".to_string()),
2207 ..ChannelOptions::default()
2208 };
2209 let err = resolve_channel("pipeline:stage.done", &options, &ctx).unwrap_err();
2210 assert!(
2211 err.0.contains("HARN-CHN-004"),
2212 "expected HARN-CHN-004, got: {}",
2213 err.0
2214 );
2215 }
2216
2217 #[test]
2218 fn explicit_tenant_mismatch_reports_cross_tenant() {
2219 let ctx = ChannelContext {
2220 tenant_id: Some("tenant-A".to_string()),
2221 ..context()
2222 };
2223 let options = ChannelOptions {
2224 tenant_id: Some("tenant-B".to_string()),
2225 ..ChannelOptions::default()
2226 };
2227 let err = resolve_channel("pr.merged", &options, &ctx).unwrap_err();
2228 assert!(
2229 err.0.contains("HARN-CHN-002"),
2230 "expected HARN-CHN-002, got: {}",
2231 err.0
2232 );
2233 }
2234
2235 #[test]
2236 fn explicit_tenant_in_name_matching_context_resolves() {
2237 let ctx = ChannelContext {
2238 tenant_id: Some("tenant-A".to_string()),
2239 ..context()
2240 };
2241 let resolved = resolve_channel(
2242 "tenant:tenant-A:pr.merged",
2243 &ChannelOptions::default(),
2244 &ctx,
2245 )
2246 .unwrap();
2247 assert_eq!(resolved.scope, ChannelScope::Tenant);
2248 assert_eq!(resolved.resolved_name, "tenant:tenant-A:pr.merged");
2249 }
2250
2251 #[test]
2252 fn cross_tenant_via_name_prefix_is_rejected() {
2253 let ctx = ChannelContext {
2254 tenant_id: Some("tenant-A".to_string()),
2255 ..context()
2256 };
2257 let err = resolve_channel(
2258 "tenant:tenant-B:pr.merged",
2259 &ChannelOptions::default(),
2260 &ctx,
2261 )
2262 .unwrap_err();
2263 assert!(err.0.contains("HARN-CHN-002"));
2264 }
2265
2266 #[test]
2267 fn channel_selector_parses_tenant_default_shorthand() {
2268 let selector = ChannelSelector::parse("channel:pr.merged").expect("parses");
2269 assert_eq!(selector.scope(), "tenant");
2270 assert_eq!(selector.name(), "pr.merged");
2271 assert!(selector.matches("tenant", "default", "pr.merged", "default"));
2272 assert!(!selector.matches("tenant", "default", "other.event", "default"));
2273 assert!(!selector.matches("session", "default", "pr.merged", "default"));
2274 }
2275
2276 #[test]
2277 fn channel_selector_parses_session_scope() {
2278 let selector = ChannelSelector::parse("channel:session:my-event").expect("parses");
2279 assert_eq!(selector.scope(), "session");
2280 assert_eq!(selector.name(), "my-event");
2281 assert!(selector.matches("session", "any-session-id", "my-event", "any-session-id"));
2282 assert!(!selector.matches("tenant", "any", "my-event", "any"));
2283 }
2284
2285 #[test]
2286 fn channel_selector_parses_explicit_tenant() {
2287 let selector =
2288 ChannelSelector::parse("channel:tenant:burin-labs:pr.merged").expect("parses");
2289 assert!(selector.matches("tenant", "burin-labs", "pr.merged", "default"));
2290 assert!(!selector.matches("tenant", "other-tenant", "pr.merged", "default"));
2291 }
2292
2293 #[test]
2294 fn channel_selector_parses_tenant_wildcard() {
2295 let selector = ChannelSelector::parse("channel:tenant:*:pr.merged").expect("parses");
2296 assert!(selector.matches("tenant", "burin-labs", "pr.merged", "default"));
2297 assert!(selector.matches("tenant", "other-tenant", "pr.merged", "default"));
2298 assert!(!selector.matches("tenant", "any", "different-event", "default"));
2299 assert!(!selector.matches("session", "any", "pr.merged", "default"));
2300 }
2301
2302 #[test]
2303 fn channel_selector_rejects_malformed_inputs() {
2304 assert!(ChannelSelector::parse("not-a-channel").is_err());
2305 assert!(ChannelSelector::parse("channel:").is_err());
2306 assert!(ChannelSelector::parse("channel:session:").is_err());
2307 assert!(ChannelSelector::parse("channel:session:has:extra:colons").is_err());
2308 assert!(ChannelSelector::parse("channel:org:no-name").is_err());
2309 assert!(ChannelSelector::parse("channel:tenant::missing-id").is_err());
2310 assert!(ChannelSelector::parse("channel:tenant:foo:").is_err());
2311 }
2312
2313 #[test]
2314 fn channel_filter_matches_equality_paths() {
2315 let payload = serde_json::json!({"repo": "harn", "nested": {"k": "v"}});
2316 assert!(channel_filter_matches("{\"repo\": \"harn\"}", &payload));
2317 assert!(!channel_filter_matches("{\"repo\": \"other\"}", &payload));
2318 assert!(channel_filter_matches("{\"nested.k\": \"v\"}", &payload));
2319 assert!(!channel_filter_matches("{\"nested.k\": \"x\"}", &payload));
2320 assert!(!channel_filter_matches("{\"missing\": \"x\"}", &payload));
2322 assert!(channel_filter_matches("", &payload));
2324 assert!(channel_filter_matches("just-a-string", &payload));
2326 }
2327
2328 #[test]
2335 fn channel_payload_hash_is_deterministic_across_key_order() {
2336 let a = serde_json::json!({"a": 1, "b": 2, "nested": {"x": 10, "y": 20}});
2337 let b = serde_json::json!({"nested": {"y": 20, "x": 10}, "b": 2, "a": 1});
2338 assert_eq!(channel_payload_hash(&a), channel_payload_hash(&b));
2339 }
2340
2341 #[test]
2342 fn channel_payload_hash_changes_with_value_drift() {
2343 let baseline = serde_json::json!({"repo": "harn", "attempt": 1});
2344 let drifted = serde_json::json!({"repo": "harn", "attempt": 2});
2345 assert_ne!(
2346 channel_payload_hash(&baseline),
2347 channel_payload_hash(&drifted)
2348 );
2349 }
2350
2351 #[test]
2352 fn channel_payload_hash_is_sha256_prefixed_hex() {
2353 let value = serde_json::json!({"k": "v"});
2354 let hash = channel_payload_hash(&value);
2355 assert!(hash.starts_with("sha256:"));
2356 assert_eq!(hash.len(), "sha256:".len() + 64);
2357 }
2358}