1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7
8use crate::event_log::{
9 active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
10 EventId, EventLog, LogEvent, Topic,
11};
12use crate::llm::vm_value_to_json;
13use crate::runtime_limits::RuntimeLimits;
14use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
15use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
16use crate::value::{VmError, VmValue};
17
18const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
19const CHANNEL_EVENT_KIND: &str = "channel.emit";
20const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
21const NAME_HEADER: &str = "harn.channel.name";
22const SCOPE_HEADER: &str = "harn.channel.scope";
23const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
24const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
25
26pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
30pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
31pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
32
33pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
40pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
41pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
42const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
45const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
46
47pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
52pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
53const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
54
55const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
60const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
61
62static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
63static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67enum ChannelScope {
68 Session,
69 Pipeline,
70 Tenant,
71 Org,
72}
73
74impl ChannelScope {
75 fn parse(value: &str) -> Result<Self, ChannelError> {
76 match value.trim() {
77 "session" => Ok(Self::Session),
78 "pipeline" => Ok(Self::Pipeline),
79 "tenant" => Ok(Self::Tenant),
80 "org" => Ok(Self::Org),
81 other => Err(ChannelError::malformed(format!(
82 "HARN-CHN-003 malformed channel scope '{other}'"
83 ))),
84 }
85 }
86
87 fn as_str(self) -> &'static str {
88 match self {
89 Self::Session => "session",
90 Self::Pipeline => "pipeline",
91 Self::Tenant => "tenant",
92 Self::Org => "org",
93 }
94 }
95}
96
97#[derive(Clone, Debug, Default)]
98struct ChannelContext {
99 task_id: Option<String>,
100 root_task_id: Option<String>,
101 scope_id: Option<String>,
102 workflow_id: Option<String>,
103 run_id: Option<String>,
104 worker_id: Option<String>,
105 agent_session_id: Option<String>,
106 root_agent_session_id: Option<String>,
107 tenant_id: Option<String>,
108}
109
110#[derive(Clone, Debug, Default)]
111struct ChannelOptions {
112 scope: Option<ChannelScope>,
113 id: Option<String>,
114 tenant_id: Option<String>,
115 session_id: Option<String>,
116 pipeline_id: Option<String>,
117 from_cursor: Option<EventId>,
118 limit: Option<usize>,
119 ttl_ms: Option<i64>,
120}
121
122#[derive(Clone, Debug)]
123struct ResolvedChannel {
124 scope: ChannelScope,
125 scope_id: String,
126 resolved_name: String,
127 topic: Topic,
128 retention: &'static str,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub struct SignedTimestamp {
133 pub at_ms: i64,
134 pub at: String,
135 pub algorithm: String,
136 pub key_id: String,
137 pub signature: String,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141struct StoredChannelEvent {
142 id: String,
143 name: String,
144 payload: serde_json::Value,
145 emitted_at: SignedTimestamp,
146 emitted_by: String,
147 scope: String,
148 scope_id: String,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pipeline_id: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 session_id: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 tenant_id: Option<String>,
155 retention: String,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 ttl_ms: Option<i64>,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub struct ChannelEmitReceipt {
172 pub event_id: String,
173 pub name_resolved: String,
174 pub scope: String,
175 pub scope_id: String,
176 pub payload_hash: String,
177 pub payload: serde_json::Value,
178 pub emitted_at: SignedTimestamp,
179 pub emitted_by: String,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub pipeline_id: Option<String>,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub session_id: Option<String>,
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub tenant_id: Option<String>,
186 pub topic: String,
187 pub inserted: bool,
188 #[serde(skip_serializing_if = "Option::is_none")]
189 pub span_id: Option<u64>,
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
202pub struct ChannelMatchReceipt {
203 pub event_id: String,
204 pub trigger_id: String,
205 pub binding_key: String,
206 pub name_resolved: String,
207 pub scope: String,
208 pub scope_id: String,
209 pub matched_at: SignedTimestamp,
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub matched_in_session_id: Option<String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub batch: Option<ChannelMatchBatchInfo>,
214 pub handler_kind: String,
215 pub handler_result: ChannelMatchResultSummary,
216 #[serde(skip_serializing_if = "Option::is_none")]
217 pub span_id: Option<u64>,
218}
219
220#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
224pub struct ChannelMatchBatchInfo {
225 pub count: usize,
226 pub constituent_event_ids: Vec<String>,
227}
228
229#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
234pub struct ChannelMatchResultSummary {
235 pub status: String,
236 pub attempt_count: u32,
237 #[serde(skip_serializing_if = "Option::is_none")]
238 pub error: Option<String>,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub dispatch_failed: Option<bool>,
241}
242
243impl ChannelMatchResultSummary {
244 fn from_dispatch(
245 outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
246 ) -> Self {
247 match outcome {
248 Ok(outcome) => Self {
249 status: outcome.status.as_str().to_string(),
250 attempt_count: outcome.attempt_count,
251 error: outcome.error.clone(),
252 dispatch_failed: None,
253 },
254 Err(error) => Self {
255 status: "dispatch_error".to_string(),
256 attempt_count: 0,
257 error: Some(error.to_string()),
258 dispatch_failed: Some(true),
259 },
260 }
261 }
262}
263
264#[derive(Debug)]
265struct ChannelError(String);
266
267impl ChannelError {
268 fn missing_pipeline() -> Self {
269 Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
270 }
271
272 fn cross_tenant(message: impl Into<String>) -> Self {
273 Self(format!("HARN-CHN-002 {}", message.into()))
274 }
275
276 fn malformed(message: impl Into<String>) -> Self {
277 Self(message.into())
278 }
279
280 fn scope_ambiguous(message: impl Into<String>) -> Self {
281 Self(format!("HARN-CHN-004 {}", message.into()))
282 }
283}
284
285impl From<ChannelError> for VmError {
286 fn from(error: ChannelError) -> Self {
287 VmError::Runtime(error.0)
288 }
289}
290
291pub fn reset_channel_state() {
292 if let Some(slot) = SESSION_CHANNEL_LOG.get() {
293 *slot.lock().expect("channel session log poisoned") = None;
294 }
295 crate::channel_guardrails::clear();
299}
300
301pub(crate) async fn emit_channel_from_vm(
302 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
303 args: Vec<VmValue>,
304) -> Result<VmValue, VmError> {
305 let name = required_string(args.first(), "emit_channel", "name")?;
306 let payload = vm_value_to_json(
307 args.get(1)
308 .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
309 );
310 let options = parse_options(args.get(2), "emit_channel")?;
311 let context = ChannelContext::current(ctx);
312 let resolved = resolve_channel(&name, &options, &context)?;
313 let event_id = options
314 .id
315 .clone()
316 .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
317 let emitted_by = emitted_by(&context);
318 let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
319 let occurred_at_ms = emitted_at.at_ms;
320
321 let guardrail_context = serde_json::json!({
328 "name": name,
329 "name_resolved": resolved.resolved_name,
330 "scope": resolved.scope.as_str(),
331 "scope_id": resolved.scope_id,
332 "event_id": event_id,
333 "emitted_by": emitted_by,
334 });
335 let decision = crate::channel_guardrails::evaluate(
336 ctx,
337 &payload,
338 &guardrail_context,
339 &resolved.resolved_name,
340 )
341 .await?;
342 if matches!(
343 decision.verdict,
344 crate::channel_guardrails::Verdict::Block { .. }
345 ) {
346 return handle_blocked_emit(
347 &name,
348 &resolved,
349 &event_id,
350 &emitted_by,
351 &emitted_at,
352 &payload,
353 &decision,
354 )
355 .await;
356 }
357 record_guardrail_warnings(
358 &resolved,
359 &event_id,
360 &emitted_by,
361 &payload,
362 decision.fired.as_slice(),
363 )
364 .await;
365 let record = StoredChannelEvent {
366 id: event_id.clone(),
367 name: resolved.resolved_name.clone(),
368 payload,
369 emitted_at,
370 emitted_by: emitted_by.clone(),
371 scope: resolved.scope.as_str().to_string(),
372 scope_id: resolved.scope_id.clone(),
373 pipeline_id: context.pipeline_id_for_receipt(&resolved),
374 session_id: context.session_id_for_receipt(&resolved),
375 tenant_id: context.tenant_id_for_receipt(&resolved),
376 retention: resolved.retention.to_string(),
377 ttl_ms: options.ttl_ms,
378 };
379
380 let mut emit_span = ChannelSpanGuard::start(
385 crate::tracing::SpanKind::ChannelEmit,
386 format!("channel.emit {}", resolved.resolved_name),
387 Vec::new(),
388 );
389 emit_span.set_metadata("event_id", serde_json::json!(record.id));
390 emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
391 emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
392 emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
393 emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
394 let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
395 let emit_link = emit_span.link();
396
397 let mut headers = BTreeMap::new();
398 headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
399 headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
400 headers.insert(
401 SCOPE_HEADER.to_string(),
402 resolved.scope.as_str().to_string(),
403 );
404 headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
405 headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
406
407 let log = log_for_scope(resolved.scope);
408 let mut log_event = LogEvent::new(
409 CHANNEL_EVENT_KIND,
410 serde_json::to_value(&record)
411 .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
412 )
413 .with_headers(headers);
414 log_event.occurred_at_ms = occurred_at_ms;
415 let outcome = log
416 .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
417 .await
418 .map_err(channel_log_error)?;
419 let receipt = receipt_value(
420 &resolved.topic,
421 outcome.event_id,
422 &outcome.event,
423 outcome.inserted,
424 )?;
425 emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
428 record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
433 if outcome.inserted {
437 let payload_json = outcome
438 .event
439 .payload
440 .get("payload")
441 .cloned()
442 .unwrap_or(serde_json::Value::Null);
443 let context_for_fanout = ChannelContext::current(ctx);
444 let fanout_payload = ChannelEventPayload {
445 id: event_id.clone(),
446 name: parse_name(&name)
447 .map(|parsed| parsed.name)
448 .unwrap_or_else(|_| resolved.resolved_name.clone()),
449 name_resolved: resolved.resolved_name.clone(),
450 scope: resolved.scope.as_str().to_string(),
451 scope_id: resolved.scope_id.clone(),
452 payload: payload_json,
453 emitted_by: emitted_by.clone(),
454 tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
455 session_id: context_for_fanout.session_id_for_receipt(&resolved),
456 pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
457 };
458 dispatch_channel_emit_to_triggers(ctx, &resolved, fanout_payload, emit_link).await?;
459 }
460 emit_span.end();
461 Ok(crate::stdlib::json_to_vm_value(&receipt))
462}
463
464pub(crate) async fn channel_events_from_vm(
465 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
466 args: Vec<VmValue>,
467) -> Result<VmValue, VmError> {
468 let name = required_string(args.first(), "channel_events", "name")?;
469 let options = parse_options(args.get(1), "channel_events")?;
470 let context = ChannelContext::current(ctx);
471 let resolved = resolve_channel(&name, &options, &context)?;
472 let events = log_for_scope(resolved.scope)
473 .read_range(
474 &resolved.topic,
475 options.from_cursor,
476 options.limit.unwrap_or(usize::MAX),
477 )
478 .await
479 .map_err(channel_log_error)?;
480 let values = events
481 .into_iter()
482 .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
483 .collect::<Result<Vec<_>, _>>()?;
484 Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
485 values,
486 )))
487}
488
489impl ChannelContext {
490 fn current(ctx: Option<&crate::vm::AsyncBuiltinCtx>) -> Self {
491 let mut context = Self::default();
492 if let Some(vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) {
493 context.task_id = Some(vm.runtime_context.task_id.clone());
494 context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
495 context.scope_id = vm.runtime_context.scope_id.clone();
496 if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
497 context.workflow_id = dict_string(&values, "workflow_id");
498 context.run_id = dict_string(&values, "run_id");
499 context.worker_id = dict_string(&values, "worker_id");
500 context.agent_session_id = dict_string(&values, "agent_session_id");
501 context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
502 context.tenant_id = dict_string(&values, "tenant_id");
503 }
504 }
505 context.agent_session_id = context
506 .agent_session_id
507 .or_else(crate::agent_sessions::current_session_id);
508 context
509 }
510
511 fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
512 if let Some(requested) = options.session_id.as_deref() {
516 if let Some(active) = self.agent_session_id.as_deref() {
517 if active != requested {
518 return Err(ChannelError::scope_ambiguous(format!(
519 "session scope ambiguous: options.session_id '{requested}' \
520 conflicts with active session '{active}'"
521 )));
522 }
523 }
524 }
525 Ok(options
526 .session_id
527 .clone()
528 .or_else(|| self.agent_session_id.clone())
529 .or_else(|| self.root_agent_session_id.clone())
530 .or_else(|| self.scope_id.clone())
531 .or_else(|| self.root_task_id.clone())
532 .unwrap_or_else(|| "session".to_string()))
533 }
534
535 fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
536 let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
540 if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
541 {
542 if requested != active {
543 return Err(ChannelError::scope_ambiguous(format!(
544 "pipeline scope ambiguous: options.pipeline_id '{requested}' \
545 conflicts with active pipeline '{active}'"
546 )));
547 }
548 }
549 options
550 .pipeline_id
551 .clone()
552 .or(active)
553 .ok_or_else(ChannelError::missing_pipeline)
554 }
555
556 fn tenant_id(
557 &self,
558 options: &ChannelOptions,
559 requested: Option<&str>,
560 ) -> Result<String, ChannelError> {
561 let current = self.tenant_id.as_deref();
562 let requested = requested
563 .map(ToOwned::to_owned)
564 .or_else(|| options.tenant_id.clone());
565 if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
566 if current != requested {
567 return Err(ChannelError::cross_tenant(format!(
568 "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
569 )));
570 }
571 }
572 Ok(requested
573 .or_else(|| self.tenant_id.clone())
574 .unwrap_or_else(|| "default".to_string()))
575 }
576
577 fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
578 match resolved.scope {
579 ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
580 _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
581 }
582 }
583
584 fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
585 match resolved.scope {
586 ChannelScope::Session => Some(resolved.scope_id.clone()),
587 _ => self
588 .agent_session_id
589 .clone()
590 .or_else(|| self.root_agent_session_id.clone()),
591 }
592 }
593
594 fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
595 match resolved.scope {
596 ChannelScope::Tenant => Some(resolved.scope_id.clone()),
597 _ => self.tenant_id.clone(),
598 }
599 }
600}
601
602fn resolve_channel(
603 raw_name: &str,
604 options: &ChannelOptions,
605 context: &ChannelContext,
606) -> Result<ResolvedChannel, ChannelError> {
607 let parsed = parse_name(raw_name)?;
608 if let Some(option_scope) = options.scope {
609 if let Some(prefix_scope) = parsed.scope {
610 if prefix_scope != option_scope {
611 return Err(ChannelError::malformed(format!(
612 "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
613 prefix_scope.as_str(),
614 option_scope.as_str()
615 )));
616 }
617 }
618 }
619
620 let scope = parsed
621 .scope
622 .or(options.scope)
623 .unwrap_or(ChannelScope::Tenant);
624 if scope == ChannelScope::Org {
625 return Err(ChannelError::cross_tenant(
626 "org-scoped channels are disabled until org grants are available",
627 ));
628 }
629
630 validate_channel_name(&parsed.name)?;
631 let scope_id = match scope {
632 ChannelScope::Session => match parsed.scope_id.clone() {
633 Some(id) => id,
634 None => context.session_id(options)?,
635 },
636 ChannelScope::Pipeline => context.pipeline_id(options)?,
637 ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
638 ChannelScope::Org => unreachable!("org scope returned above"),
639 };
640 validate_scope_id(scope, &scope_id)?;
641 let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
642 let topic = Topic::new(format!(
643 "channels.{}.{}.{}",
644 scope.as_str(),
645 sanitize_topic_component(&scope_id),
646 sanitize_topic_component(&parsed.name)
647 ))
648 .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
649 Ok(ResolvedChannel {
650 scope,
651 scope_id,
652 resolved_name,
653 topic,
654 retention: retention_for_scope(scope),
655 })
656}
657
658#[derive(Clone, Debug)]
659struct ParsedName {
660 scope: Option<ChannelScope>,
661 scope_id: Option<String>,
662 name: String,
663}
664
665fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
666 let raw_name = raw_name.trim();
667 if raw_name.is_empty() {
668 return Err(ChannelError::malformed(
669 "HARN-CHN-003 channel name cannot be empty",
670 ));
671 }
672 let Some((prefix, rest)) = raw_name.split_once(':') else {
673 return Ok(ParsedName {
674 scope: None,
675 scope_id: None,
676 name: raw_name.to_string(),
677 });
678 };
679 let scope = ChannelScope::parse(prefix)?;
680 match scope {
681 ChannelScope::Session | ChannelScope::Pipeline => {
682 if rest.is_empty() || rest.contains(':') {
683 return Err(ChannelError::malformed(format!(
684 "HARN-CHN-003 malformed {} channel name '{raw_name}'",
685 scope.as_str()
686 )));
687 }
688 Ok(ParsedName {
689 scope: Some(scope),
690 scope_id: None,
691 name: rest.to_string(),
692 })
693 }
694 ChannelScope::Tenant => {
695 if rest.is_empty() {
696 return Err(ChannelError::malformed(
697 "HARN-CHN-003 tenant channel name cannot be empty",
698 ));
699 }
700 let (scope_id, name) = match rest.split_once(':') {
701 Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
702 (Some(tenant_id.to_string()), name.to_string())
703 }
704 Some(_) => {
705 return Err(ChannelError::malformed(format!(
706 "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
707 )))
708 }
709 None => (None, rest.to_string()),
710 };
711 Ok(ParsedName {
712 scope: Some(scope),
713 scope_id,
714 name,
715 })
716 }
717 ChannelScope::Org => {
718 let Some((org_id, name)) = rest.split_once(':') else {
719 return Err(ChannelError::malformed(format!(
720 "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
721 )));
722 };
723 if org_id.is_empty() || name.is_empty() {
724 return Err(ChannelError::malformed(format!(
725 "HARN-CHN-003 malformed org channel name '{raw_name}'"
726 )));
727 }
728 Ok(ParsedName {
729 scope: Some(scope),
730 scope_id: Some(org_id.to_string()),
731 name: name.to_string(),
732 })
733 }
734 }
735}
736
737fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
738 if name.trim().is_empty()
739 || name.contains(':')
740 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
741 {
742 return Err(ChannelError::malformed(format!(
743 "HARN-CHN-003 malformed channel name '{name}'"
744 )));
745 }
746 Ok(())
747}
748
749fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
750 if scope_id.trim().is_empty()
751 || scope_id
752 .chars()
753 .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
754 {
755 return Err(ChannelError::malformed(format!(
756 "HARN-CHN-003 malformed {} scope id '{scope_id}'",
757 scope.as_str()
758 )));
759 }
760 Ok(())
761}
762
763fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
764 match scope {
765 ChannelScope::Session => {
766 let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
767 let mut guard = slot.lock().expect("channel session log poisoned");
768 guard
769 .get_or_insert_with(|| {
770 Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
771 CHANNEL_QUEUE_DEPTH,
772 )))
773 })
774 .clone()
775 }
776 ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
777 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
778 ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
779 }
780}
781
782fn signed_timestamp(
783 resolved: &ResolvedChannel,
784 event_id: &str,
785 emitted_by: &str,
786) -> SignedTimestamp {
787 let at = crate::clock_mock::now_utc();
788 let at_ms = harn_clock::offset_datetime_to_ms(at);
789 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
790 let material = format!(
791 "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
792 resolved.resolved_name,
793 resolved.scope.as_str(),
794 resolved.scope_id
795 );
796 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
797 signing_salt(),
798 material.as_bytes(),
799 ));
800 SignedTimestamp {
801 at_ms,
802 at: at_text,
803 algorithm: "hmac-sha256".to_string(),
804 key_id: "local-session".to_string(),
805 signature: format!("sha256:{signature}"),
806 }
807}
808
809fn signing_salt() -> &'static [u8] {
810 SIGNING_SALT
811 .get_or_init(|| {
812 format!(
813 "harn-channel-signing-salt:{}:{}",
814 std::process::id(),
815 uuid::Uuid::now_v7()
816 )
817 .into_bytes()
818 })
819 .as_slice()
820}
821
822fn signed_match_timestamp(
829 resolved: &ResolvedChannel,
830 event_id: &str,
831 trigger_id: &str,
832) -> SignedTimestamp {
833 let at = crate::clock_mock::now_utc();
834 let at_ms = harn_clock::offset_datetime_to_ms(at);
835 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
836 let material = format!(
837 "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
838 resolved.resolved_name,
839 resolved.scope.as_str(),
840 resolved.scope_id
841 );
842 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
843 signing_salt(),
844 material.as_bytes(),
845 ));
846 SignedTimestamp {
847 at_ms,
848 at: at_text,
849 algorithm: "hmac-sha256".to_string(),
850 key_id: "local-session".to_string(),
851 signature: format!("sha256:{signature}"),
852 }
853}
854
855pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
862 let canonical = canonical_json_string(payload);
863 let digest = Sha256::digest(canonical.as_bytes());
864 format!("sha256:{}", hex::encode(digest))
865}
866
867fn canonical_json_string(value: &serde_json::Value) -> String {
873 match value {
874 serde_json::Value::Object(map) => {
875 let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
876 std::collections::BTreeMap::new();
877 for (key, value) in map {
878 sorted.insert(key, value);
879 }
880 let parts: Vec<String> = sorted
881 .iter()
882 .map(|(key, value)| {
883 format!(
884 "{}:{}",
885 serde_json::to_string(key).unwrap_or_else(|_| (*key).clone()),
886 canonical_json_string(value)
887 )
888 })
889 .collect();
890 format!("{{{}}}", parts.join(","))
891 }
892 serde_json::Value::Array(items) => {
893 let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
894 format!("[{}]", parts.join(","))
895 }
896 other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
897 }
898}
899
900async fn append_channel_audit_event(
909 kind: &'static str,
910 schema: &'static str,
911 payload: serde_json::Value,
912) {
913 let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
914 Ok(topic) => topic,
915 Err(_) => return,
916 };
917 let log = active_event_log()
918 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
919 let mut headers = BTreeMap::new();
920 headers.insert("schema".to_string(), schema.to_string());
921 let _ = log
922 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
923 .await;
924}
925
926async fn record_guardrail_audit(
934 kind: &'static str,
935 resolved: &ResolvedChannel,
936 event_id: &str,
937 emitted_by: &str,
938 payload: &serde_json::Value,
939 fired: &[crate::channel_guardrails::FiredGuardrail],
940) {
941 let fired_json: Vec<serde_json::Value> = fired
942 .iter()
943 .map(|entry| {
944 serde_json::json!({
945 "id": entry.id,
946 "kind": entry.kind,
947 "verdict_label": entry.verdict_label,
948 "reason": entry.reason,
949 })
950 })
951 .collect();
952 let audit_payload = serde_json::json!({
953 "event_id": event_id,
954 "name_resolved": resolved.resolved_name,
955 "scope": resolved.scope.as_str(),
956 "scope_id": resolved.scope_id,
957 "emitted_by": emitted_by,
958 "payload_hash": channel_payload_hash(payload),
959 "payload": payload,
960 "fired": fired_json,
961 });
962 append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
963 crate::orchestration::record_lifecycle_audit(kind, audit_payload);
967}
968
969async fn record_guardrail_warnings(
974 resolved: &ResolvedChannel,
975 event_id: &str,
976 emitted_by: &str,
977 payload: &serde_json::Value,
978 fired: &[crate::channel_guardrails::FiredGuardrail],
979) {
980 if fired.is_empty() {
981 return;
982 }
983 record_guardrail_audit(
984 CHANNEL_GUARDRAIL_WARNING_KIND,
985 resolved,
986 event_id,
987 emitted_by,
988 payload,
989 fired,
990 )
991 .await;
992}
993
994async fn handle_blocked_emit(
1000 raw_name: &str,
1001 resolved: &ResolvedChannel,
1002 event_id: &str,
1003 emitted_by: &str,
1004 emitted_at: &SignedTimestamp,
1005 payload: &serde_json::Value,
1006 decision: &crate::channel_guardrails::GuardrailDecision,
1007) -> Result<VmValue, VmError> {
1008 record_guardrail_audit(
1009 CHANNEL_GUARDRAIL_BLOCKED_KIND,
1010 resolved,
1011 event_id,
1012 emitted_by,
1013 payload,
1014 decision.fired.as_slice(),
1015 )
1016 .await;
1017 let block_reason = decision
1018 .fired
1019 .iter()
1020 .rev()
1021 .find_map(|f| {
1022 if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1023 || f.verdict_label.contains("block")
1024 {
1025 Some(f.reason.clone())
1026 } else {
1027 None
1028 }
1029 })
1030 .unwrap_or_else(|| "guardrail blocked".to_string());
1031 let fired_json: Vec<serde_json::Value> = decision
1032 .fired
1033 .iter()
1034 .map(|entry| {
1035 serde_json::json!({
1036 "id": entry.id,
1037 "kind": entry.kind,
1038 "verdict_label": entry.verdict_label,
1039 "reason": entry.reason,
1040 })
1041 })
1042 .collect();
1043 let receipt = serde_json::json!({
1044 "event_id": event_id,
1045 "cursor": serde_json::Value::Null,
1046 "id": event_id,
1047 "name": raw_name,
1048 "name_resolved": resolved.resolved_name,
1049 "scope": resolved.scope.as_str(),
1050 "scope_id": resolved.scope_id,
1051 "emitted_at": emitted_at,
1052 "emitted_by": emitted_by,
1053 "retention": resolved.retention,
1054 "topic": resolved.topic.as_str(),
1055 "inserted": false,
1056 "duplicate": false,
1057 "blocked": true,
1058 "block_reason": block_reason,
1059 "guardrail_fired": fired_json,
1060 });
1061 Ok(crate::stdlib::json_to_vm_value(&receipt))
1062}
1063
1064async fn record_channel_emit_receipt(
1069 record: &StoredChannelEvent,
1070 resolved: &ResolvedChannel,
1071 inserted: bool,
1072 span_id: u64,
1073) {
1074 let receipt = ChannelEmitReceipt {
1075 event_id: record.id.clone(),
1076 name_resolved: resolved.resolved_name.clone(),
1077 scope: resolved.scope.as_str().to_string(),
1078 scope_id: resolved.scope_id.clone(),
1079 payload_hash: channel_payload_hash(&record.payload),
1080 payload: record.payload.clone(),
1081 emitted_at: record.emitted_at.clone(),
1082 emitted_by: record.emitted_by.clone(),
1083 pipeline_id: record.pipeline_id.clone(),
1084 session_id: record.session_id.clone(),
1085 tenant_id: record.tenant_id.clone(),
1086 topic: resolved.topic.as_str().to_string(),
1087 inserted,
1088 span_id: if span_id == 0 { None } else { Some(span_id) },
1089 };
1090 let payload = match serde_json::to_value(&receipt) {
1091 Ok(value) => value,
1092 Err(_) => return,
1093 };
1094 append_channel_audit_event(
1095 CHANNEL_EMIT_RECEIPT_KIND,
1096 CHANNEL_EMIT_RECEIPT_SCHEMA,
1097 payload,
1098 )
1099 .await;
1100}
1101
1102#[allow(clippy::too_many_arguments)]
1108async fn record_channel_match_receipt(
1109 trigger_id: &str,
1110 binding_key: &str,
1111 handler_kind: &str,
1112 resolved: &ResolvedChannel,
1113 event_id: &str,
1114 matched_in_session_id: Option<&str>,
1115 batch: Option<ChannelMatchBatchInfo>,
1116 span_id: u64,
1117 dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1118) {
1119 let receipt = ChannelMatchReceipt {
1120 event_id: event_id.to_string(),
1121 trigger_id: trigger_id.to_string(),
1122 binding_key: binding_key.to_string(),
1123 name_resolved: resolved.resolved_name.clone(),
1124 scope: resolved.scope.as_str().to_string(),
1125 scope_id: resolved.scope_id.clone(),
1126 matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1127 matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1128 batch,
1129 handler_kind: handler_kind.to_string(),
1130 handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1131 span_id: if span_id == 0 { None } else { Some(span_id) },
1132 };
1133 let payload = match serde_json::to_value(&receipt) {
1134 Ok(value) => value,
1135 Err(_) => return,
1136 };
1137 append_channel_audit_event(
1138 CHANNEL_MATCH_RECEIPT_KIND,
1139 CHANNEL_MATCH_RECEIPT_SCHEMA,
1140 payload,
1141 )
1142 .await;
1143}
1144
1145fn batch_info_from_summary(
1149 batch_summary: Option<&serde_json::Value>,
1150) -> Option<ChannelMatchBatchInfo> {
1151 let summary = batch_summary?.as_object()?;
1152 let count = summary
1153 .get("count")
1154 .and_then(|v| v.as_u64())
1155 .map(|n| n as usize)?;
1156 let constituent_event_ids = summary
1157 .get("constituent_event_ids")
1158 .and_then(|v| v.as_array())
1159 .map(|arr| {
1160 arr.iter()
1161 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1162 .collect()
1163 })
1164 .unwrap_or_default();
1165 Some(ChannelMatchBatchInfo {
1166 count,
1167 constituent_event_ids,
1168 })
1169}
1170
1171fn emitted_by(context: &ChannelContext) -> String {
1172 context
1173 .worker_id
1174 .clone()
1175 .or_else(|| context.agent_session_id.clone())
1176 .or_else(|| context.task_id.clone())
1177 .unwrap_or_else(|| "harn".to_string())
1178}
1179
1180fn retention_for_scope(scope: ChannelScope) -> &'static str {
1181 match scope {
1182 ChannelScope::Session => "in_process_session",
1183 ChannelScope::Pipeline => "pipeline_event_log",
1184 ChannelScope::Tenant => "tenant_event_log",
1185 ChannelScope::Org => "org_event_log",
1186 }
1187}
1188
1189fn receipt_value(
1190 topic: &Topic,
1191 event_id: EventId,
1192 event: &LogEvent,
1193 inserted: bool,
1194) -> Result<serde_json::Value, VmError> {
1195 let record = stored_record(event)?;
1196 Ok(serde_json::json!({
1197 "event_id": event_id,
1198 "cursor": event_id,
1199 "id": record.id,
1200 "name": record.name,
1201 "name_resolved": record.name,
1202 "scope": record.scope,
1203 "scope_id": record.scope_id,
1204 "payload": record.payload,
1205 "emitted_at": record.emitted_at,
1206 "emitted_by": record.emitted_by,
1207 "pipeline_id": record.pipeline_id,
1208 "session_id": record.session_id,
1209 "tenant_id": record.tenant_id,
1210 "retention": record.retention,
1211 "ttl_ms": record.ttl_ms,
1212 "topic": topic.as_str(),
1213 "inserted": inserted,
1214 "duplicate": !inserted,
1215 }))
1216}
1217
1218fn event_value(
1219 topic: &Topic,
1220 event_id: EventId,
1221 event: LogEvent,
1222) -> Result<serde_json::Value, VmError> {
1223 let record = stored_record(&event)?;
1224 Ok(serde_json::json!({
1225 "event_id": event_id,
1226 "cursor": event_id,
1227 "topic": topic.as_str(),
1228 "kind": event.kind,
1229 "headers": event.headers,
1230 "occurred_at_ms": event.occurred_at_ms,
1231 "id": record.id,
1232 "name": record.name,
1233 "name_resolved": record.name,
1234 "scope": record.scope,
1235 "scope_id": record.scope_id,
1236 "payload": record.payload,
1237 "emitted_at": record.emitted_at,
1238 "emitted_by": record.emitted_by,
1239 "pipeline_id": record.pipeline_id,
1240 "session_id": record.session_id,
1241 "tenant_id": record.tenant_id,
1242 "retention": record.retention,
1243 "ttl_ms": record.ttl_ms,
1244 }))
1245}
1246
1247fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1248 serde_json::from_value(event.payload.clone()).map_err(|error| {
1249 VmError::Runtime(format!(
1250 "channel event store contained malformed channel payload: {error}"
1251 ))
1252 })
1253}
1254
1255fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1256 let Some(value) = value else {
1257 return Ok(ChannelOptions::default());
1258 };
1259 match value {
1260 VmValue::Nil => Ok(ChannelOptions::default()),
1261 VmValue::Dict(options) => Ok(ChannelOptions {
1262 scope: option_string(options, "scope", builtin)?
1263 .map(|scope| ChannelScope::parse(&scope))
1264 .transpose()
1265 .map_err(VmError::from)?,
1266 id: option_string(options, "id", builtin)?,
1267 tenant_id: option_string(options, "tenant_id", builtin)?,
1268 session_id: option_string(options, "session_id", builtin)?,
1269 pipeline_id: option_string(options, "pipeline_id", builtin)?,
1270 from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1271 .or(option_non_negative_int(options, "cursor", builtin)?)
1272 .map(|value| value as EventId),
1273 limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1274 ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1275 }),
1276 other => Err(VmError::TypeError(format!(
1277 "{builtin}: options must be a dict or nil, got {}",
1278 other.type_name()
1279 ))),
1280 }
1281}
1282
1283fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1284 match value {
1285 Some(VmValue::String(value)) => Ok(value.to_string()),
1286 Some(other) => Err(VmError::TypeError(format!(
1287 "{builtin}: {name} must be a string, got {}",
1288 other.type_name()
1289 ))),
1290 None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1291 }
1292}
1293
1294fn option_string(
1295 options: &BTreeMap<String, VmValue>,
1296 key: &str,
1297 builtin: &str,
1298) -> Result<Option<String>, VmError> {
1299 match options.get(key) {
1300 None | Some(VmValue::Nil) => Ok(None),
1301 Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1302 Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1303 "{builtin}: options.{key} cannot be empty"
1304 ))),
1305 Some(other) => Err(VmError::TypeError(format!(
1306 "{builtin}: options.{key} must be a string or nil, got {}",
1307 other.type_name()
1308 ))),
1309 }
1310}
1311
1312fn option_non_negative_int(
1313 options: &BTreeMap<String, VmValue>,
1314 key: &str,
1315 builtin: &str,
1316) -> Result<Option<u64>, VmError> {
1317 match options.get(key) {
1318 None | Some(VmValue::Nil) => Ok(None),
1319 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1320 Some(other) => Err(VmError::TypeError(format!(
1321 "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1322 other.type_name()
1323 ))),
1324 }
1325}
1326
1327fn option_duration_ms(
1328 options: &BTreeMap<String, VmValue>,
1329 key: &str,
1330 builtin: &str,
1331) -> Result<Option<i64>, VmError> {
1332 match options.get(key) {
1333 None | Some(VmValue::Nil) => Ok(None),
1334 Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1335 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1336 Some(other) => Err(VmError::TypeError(format!(
1337 "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1338 other.type_name()
1339 ))),
1340 }
1341}
1342
1343fn dict_string(values: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
1344 match values.get(key) {
1345 Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1346 _ => None,
1347 }
1348}
1349
1350fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1351 VmError::Runtime(format!("channel event log: {error}"))
1352}
1353
1354struct ChannelSpanGuard {
1363 span_id: u64,
1364 otel_span: tracing::Span,
1365}
1366
1367impl ChannelSpanGuard {
1368 fn start(
1369 kind: crate::tracing::SpanKind,
1370 name: String,
1371 links: Vec<crate::tracing::SpanLink>,
1372 ) -> Self {
1373 Self::start_with_parenting(kind, name, links, true)
1374 }
1375
1376 fn start_detached(
1377 kind: crate::tracing::SpanKind,
1378 name: String,
1379 links: Vec<crate::tracing::SpanLink>,
1380 ) -> Self {
1381 Self::start_with_parenting(kind, name, links, false)
1382 }
1383
1384 fn start_with_parenting(
1385 kind: crate::tracing::SpanKind,
1386 name: String,
1387 links: Vec<crate::tracing::SpanLink>,
1388 inherit_parent: bool,
1389 ) -> Self {
1390 let span_id = if inherit_parent {
1391 crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1392 } else {
1393 crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1394 };
1395 let otel_span = tracing::info_span!(
1396 target: "harn.vm.channel",
1397 "harn.channel",
1398 harn.kind = kind.as_str(),
1399 harn.name = %name,
1400 );
1401 for link in links {
1402 let trace_id = crate::TraceId(link.trace_id);
1403 let mut attributes: std::collections::HashMap<String, String> =
1404 link.attributes.into_iter().collect();
1405 attributes
1406 .entry("harn.link.kind".to_string())
1407 .or_insert_with(|| "channel_emit".to_string());
1408 let _ = crate::observability::otel::set_span_link(
1409 &otel_span,
1410 &trace_id,
1411 &link.span_id,
1412 Some(attributes),
1413 );
1414 }
1415 Self { span_id, otel_span }
1416 }
1417
1418 fn link(&self) -> Option<crate::tracing::SpanLink> {
1419 crate::observability::otel::current_span_context_hex(&self.otel_span)
1420 .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1421 .or_else(|| crate::tracing::span_link(self.span_id))
1422 }
1423
1424 fn set_metadata(&self, key: &str, value: serde_json::Value) {
1425 crate::tracing::span_set_metadata(self.span_id, key, value);
1426 }
1427
1428 fn end(&mut self) {
1429 if self.span_id != 0 {
1430 crate::tracing::span_end(self.span_id);
1431 self.span_id = 0;
1432 }
1433 }
1434}
1435
1436impl Drop for ChannelSpanGuard {
1437 fn drop(&mut self) {
1438 self.end();
1439 }
1440}
1441
1442fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1449 const MAX_STRING_LEN: usize = 120;
1450 match payload {
1451 serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1452 serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1453 serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1454 serde_json::Value::String(value) => {
1455 let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1456 let len = value.chars().count();
1457 serde_json::json!({
1458 "kind": "string",
1459 "value": truncated,
1460 "truncated": len > MAX_STRING_LEN,
1461 "length": len,
1462 })
1463 }
1464 serde_json::Value::Array(items) => {
1465 serde_json::json!({"kind": "array", "length": items.len()})
1466 }
1467 serde_json::Value::Object(map) => {
1468 let fields: Vec<&String> = map.keys().take(8).collect();
1469 serde_json::json!({
1470 "kind": "object",
1471 "field_count": map.len(),
1472 "fields": fields,
1473 })
1474 }
1475 }
1476}
1477
1478fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1483 let Some(log) = active_event_log() else {
1484 return;
1485 };
1486 let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1487 return;
1488 };
1489 let event = LogEvent::new(kind, payload);
1490 if tokio::runtime::Handle::try_current().is_ok() {
1491 if let Ok(join) = std::thread::Builder::new()
1492 .name("harn-channel-transcript".to_string())
1493 .spawn(move || {
1494 let _ = futures::executor::block_on(log.append(&topic, event));
1495 })
1496 {
1497 let _ = join.join();
1498 }
1499 } else {
1500 let _ = futures::executor::block_on(log.append(&topic, event));
1501 }
1502}
1503
1504fn emit_channel_emit_transcript(
1509 record: &StoredChannelEvent,
1510 resolved: &ResolvedChannel,
1511 inserted: bool,
1512 span_id: u64,
1513) {
1514 let payload = serde_json::json!({
1515 "event_id": record.id,
1516 "name": record.name,
1517 "name_resolved": resolved.resolved_name,
1518 "scope": record.scope,
1519 "scope_id": record.scope_id,
1520 "payload_summary": summarize_payload(&record.payload),
1521 "emitted_at": record.emitted_at,
1522 "emitted_at_ms": record.emitted_at.at_ms,
1523 "emitted_by": record.emitted_by,
1524 "session_id": record.session_id,
1525 "pipeline_id": record.pipeline_id,
1526 "tenant_id": record.tenant_id,
1527 "inserted": inserted,
1528 "duplicate": !inserted,
1529 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1530 });
1531 emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1532}
1533
1534#[allow(clippy::too_many_arguments)]
1539fn emit_channel_match_transcript(
1540 trigger_id: &str,
1541 handler_kind: &str,
1542 resolved: &ResolvedChannel,
1543 event_id: &str,
1544 matched_at_ms: i64,
1545 matched_in_session_id: Option<&str>,
1546 span_id: u64,
1547 batch: Option<serde_json::Value>,
1548) {
1549 let mut payload = serde_json::json!({
1550 "event_id": event_id,
1551 "name_resolved": resolved.resolved_name,
1552 "scope": resolved.scope.as_str(),
1553 "scope_id": resolved.scope_id,
1554 "trigger_id": trigger_id,
1555 "handler_kind": handler_kind,
1556 "matched_at_ms": matched_at_ms,
1557 "matched_in_session_id": matched_in_session_id,
1558 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1559 });
1560 if let Some(batch) = batch {
1561 if let Some(map) = payload.as_object_mut() {
1562 map.insert("batch".to_string(), batch);
1563 }
1564 }
1565 emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1566}
1567
1568fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1575 let mut links = Vec::new();
1576 if let (Some(trace_id), Some(span_id)) = (
1577 event.headers.get(EMIT_TRACE_ID_HEADER),
1578 event.headers.get(EMIT_SPAN_ID_HEADER),
1579 ) {
1580 links.push(
1581 crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1582 BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1583 ),
1584 );
1585 }
1586 links
1587}
1588
1589fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1590 let mut links = Vec::new();
1591 for event in events {
1592 links.extend(emit_links_from_event(event));
1593 }
1594 links
1595}
1596
1597fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1598 let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1599 serde_json::json!({
1600 "count": events.len(),
1601 "constituent_event_ids": constituent_ids,
1602 })
1603}
1604
1605#[derive(Clone, Debug, PartialEq, Eq)]
1613pub struct ChannelSelector {
1614 scope: ChannelScope,
1615 scope_id_pattern: ScopeIdPattern,
1616 name: String,
1617}
1618
1619#[derive(Clone, Debug, PartialEq, Eq)]
1620enum ScopeIdPattern {
1621 Current,
1624 Exact(String),
1626 Wildcard,
1629}
1630
1631impl ChannelSelector {
1632 pub fn parse(input: &str) -> Result<Self, String> {
1642 let input = input.trim();
1643 let rest = input
1644 .strip_prefix("channel:")
1645 .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1646 if rest.is_empty() {
1647 return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1648 }
1649
1650 let (head, tail_opt) = match rest.split_once(':') {
1651 Some((head, tail)) => (head, Some(tail)),
1652 None => (rest, None),
1653 };
1654 let parsed_scope = ChannelScope::parse(head).ok();
1655 match (parsed_scope, tail_opt) {
1656 (None, _) => {
1658 let name = rest.to_string();
1659 validate_selector_name(&name)?;
1660 Ok(Self {
1661 scope: ChannelScope::Tenant,
1662 scope_id_pattern: ScopeIdPattern::Current,
1663 name,
1664 })
1665 }
1666 (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1667 if !name.is_empty() =>
1668 {
1669 if name.contains(':') {
1670 return Err(format!(
1671 "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1672 scope.as_str()
1673 ));
1674 }
1675 validate_selector_name(name)?;
1676 Ok(Self {
1677 scope,
1678 scope_id_pattern: ScopeIdPattern::Current,
1679 name: name.to_string(),
1680 })
1681 }
1682 (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1683 if !tail.is_empty() =>
1684 {
1685 let Some((scope_id, name)) = tail.split_once(':') else {
1686 if matches!(scope, ChannelScope::Tenant) {
1688 validate_selector_name(tail)?;
1689 return Ok(Self {
1690 scope,
1691 scope_id_pattern: ScopeIdPattern::Current,
1692 name: tail.to_string(),
1693 });
1694 }
1695 return Err(format!(
1696 "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1697 ));
1698 };
1699 if scope_id.is_empty() || name.is_empty() {
1700 return Err(format!(
1701 "channel selector `{input}`: scope id and name must be non-empty"
1702 ));
1703 }
1704 validate_selector_name(name)?;
1705 let pattern = if scope_id == "*" {
1706 ScopeIdPattern::Wildcard
1707 } else {
1708 ScopeIdPattern::Exact(scope_id.to_string())
1709 };
1710 Ok(Self {
1711 scope,
1712 scope_id_pattern: pattern,
1713 name: name.to_string(),
1714 })
1715 }
1716 (Some(scope), _) => Err(format!(
1717 "channel selector `{input}`: {} scope requires `<name>` segment",
1718 scope.as_str()
1719 )),
1720 }
1721 }
1722
1723 pub fn scope(&self) -> &'static str {
1724 self.scope.as_str()
1725 }
1726
1727 pub fn name(&self) -> &str {
1728 &self.name
1729 }
1730
1731 pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1735 if self.scope.as_str() != scope || self.name != name {
1736 return false;
1737 }
1738 match &self.scope_id_pattern {
1739 ScopeIdPattern::Current => match self.scope {
1740 ChannelScope::Tenant => scope_id == current_tenant,
1741 ChannelScope::Session | ChannelScope::Pipeline => {
1742 true
1747 }
1748 ChannelScope::Org => false,
1749 },
1750 ScopeIdPattern::Exact(value) => scope_id == value,
1751 ScopeIdPattern::Wildcard => match self.scope {
1752 ChannelScope::Tenant => true,
1753 _ => false,
1755 },
1756 }
1757 }
1758}
1759
1760fn validate_selector_name(name: &str) -> Result<(), String> {
1761 if name.trim().is_empty()
1762 || name.contains(':')
1763 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1764 {
1765 return Err(format!("channel selector name `{name}` is malformed"));
1766 }
1767 Ok(())
1768}
1769
1770async fn dispatch_channel_emit_to_triggers(
1784 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
1785 resolved: &ResolvedChannel,
1786 payload: ChannelEventPayload,
1787 emit_link: Option<crate::tracing::SpanLink>,
1788) -> Result<(), VmError> {
1789 let bindings = crate::triggers::registry::channel_bindings_matching(
1792 resolved.scope.as_str(),
1793 &resolved.scope_id,
1794 &payload.name,
1795 );
1796
1797 flush_expired_aggregations_inner(ctx).await;
1804
1805 if bindings.is_empty() {
1806 return Ok(());
1807 }
1808 let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1809 return Ok(());
1811 };
1812 let log = active_event_log()
1813 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1814 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1815 for binding in bindings {
1816 if let Some(filter_str) = binding.filter.as_ref() {
1820 if !channel_filter_matches(filter_str, &payload.payload) {
1821 continue;
1822 }
1823 }
1824 let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1825
1826 if let Some(aggregation_config) = binding.aggregation.as_ref() {
1830 let partition_key = crate::triggers::aggregation::partition_key_for_event(
1831 aggregation_config,
1832 &payload.payload,
1833 );
1834 let binding_key = binding.binding_key();
1835 let outcome = crate::triggers::aggregation::accumulate(
1836 &binding_key,
1837 aggregation_config,
1838 partition_key.as_deref(),
1839 event,
1840 );
1841 if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1842 let links = emit_links_from_batch(&events);
1846 let batch_summary = batch_summary_for_transcript(&events);
1847 let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1848 {
1849 Ok(batched) => batched,
1850 Err(error) => {
1851 return Err(VmError::Runtime(format!(
1852 "emit_channel aggregation batch: {error}"
1853 )));
1854 }
1855 };
1856 fire_channel_match(
1857 &dispatcher,
1858 binding.clone(),
1859 batched,
1860 resolved,
1861 links,
1862 Some(batch_summary),
1863 )
1864 .await;
1865 }
1866 continue;
1867 }
1868
1869 let links = emit_links_from_event(&event);
1870 fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1871 }
1872 Ok(())
1873}
1874
1875async fn fire_channel_match(
1880 dispatcher: &crate::triggers::Dispatcher,
1881 binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1882 event: TriggerEvent,
1883 resolved: &ResolvedChannel,
1884 links: Vec<crate::tracing::SpanLink>,
1885 batch_summary: Option<serde_json::Value>,
1886) {
1887 let trigger_id = binding.id.as_str().to_string();
1888 let handler_kind = binding.handler.kind().to_string();
1889 let event_id = if event.dedupe_key.is_empty() {
1895 event.id.0.clone()
1896 } else {
1897 event.dedupe_key.clone()
1898 };
1899 let mut match_span = ChannelSpanGuard::start_detached(
1900 crate::tracing::SpanKind::ChannelMatch,
1901 format!("channel.match {}", resolved.resolved_name),
1902 links,
1903 );
1904 match_span.set_metadata("event_id", serde_json::json!(event_id));
1905 match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
1906 match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
1907 match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
1908 if let Some(summary) = batch_summary.as_ref() {
1909 match_span.set_metadata("batch", summary.clone());
1910 }
1911 let span_id = crate::tracing::current_span_id().unwrap_or(0);
1912 let matched_at_ms = harn_clock::offset_datetime_to_ms(crate::clock_mock::now_utc());
1913 let matched_in_session_id = crate::agent_sessions::current_session_id()
1914 .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
1915 emit_channel_match_transcript(
1916 &trigger_id,
1917 &handler_kind,
1918 resolved,
1919 &event_id,
1920 matched_at_ms,
1921 matched_in_session_id.as_deref(),
1922 span_id,
1923 batch_summary.clone(),
1924 );
1925 let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
1931 let binding_key = binding.binding_key();
1932 let batch_info = batch_info_from_summary(batch_summary.as_ref());
1933 record_channel_match_receipt(
1934 &trigger_id,
1935 &binding_key,
1936 &handler_kind,
1937 resolved,
1938 &event_id,
1939 matched_in_session_id.as_deref(),
1940 batch_info,
1941 span_id,
1942 &dispatch_outcome,
1943 )
1944 .await;
1945 drop(dispatch_outcome);
1950 match_span.end();
1951}
1952
1953pub(crate) async fn flush_expired_aggregations_inner(ctx: Option<&crate::vm::AsyncBuiltinCtx>) {
1958 let expirations = crate::triggers::aggregation::drain_expired_aggregations();
1959 if expirations.is_empty() {
1960 return;
1961 }
1962 let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1963 return;
1964 };
1965 let log = active_event_log()
1966 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1967 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1968 for expired in expirations {
1969 if matches!(
1970 expired.action,
1971 crate::triggers::aggregation::ExpireAction::Discard
1972 ) {
1973 continue;
1974 }
1975 let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
1978 continue;
1979 };
1980 let Ok(version) = version_str.parse::<u32>() else {
1981 continue;
1982 };
1983 let Ok(binding) =
1984 crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
1985 else {
1986 continue;
1987 };
1988 let resolved_for_match = resolved_from_first_event(&expired.events);
1992 let links = emit_links_from_batch(&expired.events);
1993 let batch_summary = batch_summary_for_transcript(&expired.events);
1994 let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
1995 {
1996 Ok(batched) => batched,
1997 Err(_) => continue,
1998 };
1999 match resolved_for_match {
2000 Some(resolved) => {
2001 fire_channel_match(
2002 &dispatcher,
2003 binding,
2004 batched,
2005 &resolved,
2006 links,
2007 Some(batch_summary),
2008 )
2009 .await;
2010 }
2011 None => {
2012 let _ = dispatcher.dispatch(&binding, batched).await;
2013 }
2014 }
2015 }
2016}
2017
2018fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2024 let first = events.first()?;
2025 let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2026 else {
2027 return None;
2028 };
2029 let scope = ChannelScope::parse(&payload.scope).ok()?;
2030 let topic = Topic::new(format!(
2031 "channels.{}.{}.{}",
2032 payload.scope,
2033 sanitize_topic_component(&payload.scope_id),
2034 sanitize_topic_component(&payload.name),
2035 ))
2036 .ok()?;
2037 Some(ResolvedChannel {
2038 scope,
2039 scope_id: payload.scope_id.clone(),
2040 resolved_name: payload.name_resolved.clone(),
2041 topic,
2042 retention: retention_for_scope(scope),
2043 })
2044}
2045
2046fn build_channel_trigger_event(
2047 payload: &ChannelEventPayload,
2048 emit_link: Option<&crate::tracing::SpanLink>,
2049) -> TriggerEvent {
2050 let mut event = TriggerEvent::new(
2051 ProviderId::from("channel"),
2052 "channel.emit",
2053 None,
2054 payload.id.clone(),
2055 payload.tenant_id.clone().map(TenantId::new),
2056 BTreeMap::new(),
2057 ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2058 SignatureStatus::Unsigned,
2059 );
2060 event.headers.insert(
2061 "harn_channel_name".to_string(),
2062 payload.name_resolved.clone(),
2063 );
2064 event
2065 .headers
2066 .insert("harn_channel_scope".to_string(), payload.scope.clone());
2067 event.headers.insert(
2068 "harn_channel_scope_id".to_string(),
2069 payload.scope_id.clone(),
2070 );
2071 if let Some(link) = emit_link {
2076 event
2077 .headers
2078 .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2079 event
2080 .headers
2081 .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2082 }
2083 event
2084}
2085
2086fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2094 let trimmed = filter_raw.trim();
2095 if trimmed.is_empty() {
2096 return true;
2097 }
2098 let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2099 Ok(value) => value,
2100 Err(_) => return true,
2101 };
2102 let Some(map) = parsed.as_object() else {
2103 return true;
2104 };
2105 map.iter()
2106 .all(|(key, expected)| match payload_path(payload, key) {
2107 Some(actual) => actual == expected,
2108 None => false,
2109 })
2110}
2111
2112fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2113 let mut current = value;
2114 for segment in path.split('.') {
2115 if segment.is_empty() {
2116 return None;
2117 }
2118 current = match current {
2119 serde_json::Value::Object(map) => map.get(segment)?,
2120 _ => return None,
2121 };
2122 }
2123 Some(current)
2124}
2125
2126#[cfg(test)]
2127mod tests;